use std::sync::{Arc, Mutex};
use reifydb_runtime::actor::system::dst::StepResult;
use super::helpers::*;
#[test]
fn immediate_stop_on_first_message() {
let system = test_system();
let handle = system.spawn("counter", CounterActor);
handle.actor_ref.send(CounterMessage::Stop).unwrap();
assert!(matches!(
system.step(),
StepResult::Stopped {
actor_id: 0
}
));
assert_eq!(system.alive_count(), 0);
assert!(handle.actor_ref.send(CounterMessage::Inc).is_err());
}
#[test]
fn post_stop_called_on_directive_stop() {
let system = test_system();
let stopped = Arc::new(Mutex::new(false));
let handle = system.spawn(
"ps",
PostStopActor {
stopped: stopped.clone(),
},
);
handle.actor_ref.send(PostStopMessage::Stop).unwrap();
system.run_until_idle();
assert!(*stopped.lock().unwrap());
}
#[test]
fn post_stop_called_on_panic() {
let system = test_system();
let stopped = Arc::new(Mutex::new(false));
let handle = system.spawn(
"ps",
PostStopActor {
stopped: stopped.clone(),
},
);
handle.actor_ref.send(PostStopMessage::Boom).unwrap();
system.run_until_idle();
assert!(*stopped.lock().unwrap());
}
#[test]
fn post_stop_called_on_shutdown() {
let system = test_system();
let stopped = Arc::new(Mutex::new(false));
let _handle = system.spawn(
"ps",
PostStopActor {
stopped: stopped.clone(),
},
);
system.shutdown();
assert!(*stopped.lock().unwrap());
}
#[test]
fn send_to_dead_actor_returns_closed() {
let system = test_system();
let handle = system.spawn("counter", CounterActor);
handle.actor_ref.send(CounterMessage::Stop).unwrap();
system.run_until_idle();
let result = handle.actor_ref.send(CounterMessage::Inc);
assert!(result.is_err());
}
#[test]
fn panic_does_not_kill_system() {
let system = test_system();
let log = new_log();
let panicker = system.spawn("panicker", PanicActor);
let logger = system.spawn(
"logger",
LogActor {
log: log.clone(),
},
);
panicker.actor_ref.send(PanicMessage::Boom).unwrap();
logger.actor_ref.send("still_alive".into()).unwrap();
system.run_until_idle();
assert_eq!(log_contents(&log), vec!["still_alive"]);
assert_eq!(system.alive_count(), 1); }
#[test]
fn panic_payload_is_captured() {
let system = test_system();
let handle = system.spawn("panicker", PanicActor);
handle.actor_ref.send(PanicMessage::Boom).unwrap();
match system.step() {
StepResult::Panicked {
actor_id: 0,
payload,
} => {
let msg = payload.downcast_ref::<&str>().unwrap();
assert_eq!(*msg, "actor boom");
}
_ => panic!("expected StepResult::Panicked"),
}
}
#[test]
fn panic_during_init_must_be_captured() {
let system = test_system();
let _handle = system.spawn("init_panicker", InitPanicActor);
match system.step() {
StepResult::Panicked {
actor_id: 0,
payload,
} => {
let msg = payload.downcast_ref::<&str>().unwrap();
assert_eq!(*msg, "init boom");
}
_ => panic!("Expected StepResult::Panicked when init panics"),
}
assert_eq!(system.alive_count(), 0);
}
#[test]
fn panic_during_post_stop_must_be_captured() {
let system = test_system();
let handle = system.spawn("post_stop_panicker", PostStopPanicActor);
handle.actor_ref.send(()).unwrap();
let res1 = system.step();
assert!(matches!(
res1,
StepResult::Stopped {
actor_id: 0
}
));
match system.step() {
StepResult::Panicked {
actor_id: 0,
payload,
} => {
let msg = payload.downcast_ref::<&str>().unwrap();
assert_eq!(*msg, "post_stop boom");
}
_ => panic!("Expected StepResult::Panicked when post_stop panics"),
}
}
#[test]
fn shutdown_cancels_all_actors() {
let system = test_system();
let h1 = system.spawn("a", CounterActor);
let h2 = system.spawn("b", CounterActor);
let h3 = system.spawn("c", CounterActor);
assert_eq!(system.alive_count(), 3);
system.shutdown();
assert_eq!(system.alive_count(), 0);
assert!(h1.actor_ref.send(CounterMessage::Inc).is_err());
assert!(h2.actor_ref.send(CounterMessage::Inc).is_err());
assert!(h3.actor_ref.send(CounterMessage::Inc).is_err());
}
#[test]
fn cancellation_token_propagated_on_shutdown() {
let system = test_system();
let cancel = system.cancellation_token();
let _h = system.spawn("a", CounterActor);
assert!(!cancel.is_cancelled());
system.shutdown();
assert!(cancel.is_cancelled());
}
#[test]
fn spawn_during_handling() {
let system = test_system();
let log = new_log();
let handle = system.spawn(
"parent",
SpawnChildActor {
log: log.clone(),
},
);
handle.actor_ref.send(SpawnChildMessage::SpawnAndSend("child_msg".into())).unwrap();
system.run_until_idle();
assert_eq!(log_contents(&log), vec!["child_msg"]);
}
#[test]
fn spawn_during_shutdown_must_fail() {
let system = test_system();
system.shutdown();
assert!(system.is_cancelled());
let handle = system.spawn("late_comer", CounterActor);
assert!(
handle.actor_ref.send(CounterMessage::Inc).is_err(),
"Should not be able to send to actor spawned after shutdown"
);
match system.step() {
StepResult::Idle => {}
StepResult::Processed {
actor_id,
} if actor_id == 1 => {
panic!("Actor spawned after shutdown was processed!");
}
_ => {}
}
}