use async_trait::async_trait;
use joerl::{
Actor, ActorContext, ActorSystem, ExitReason, Message, Signal,
supervisor::{ChildSpec, RestartIntensity, RestartStrategy, SupervisorSpec, spawn_supervisor},
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::time::{Duration, sleep};
struct TestWorker {
id: String,
started_count: Arc<AtomicUsize>,
stopped_count: Arc<AtomicUsize>,
should_fail: Arc<AtomicBool>,
}
impl TestWorker {
fn new(
id: impl Into<String>,
started_count: Arc<AtomicUsize>,
stopped_count: Arc<AtomicUsize>,
should_fail: Arc<AtomicBool>,
) -> Self {
Self {
id: id.into(),
started_count,
stopped_count,
should_fail,
}
}
}
#[async_trait]
impl Actor for TestWorker {
async fn started(&mut self, _ctx: &mut ActorContext) {
self.started_count.fetch_add(1, Ordering::SeqCst);
if self.should_fail.load(Ordering::SeqCst) {
_ctx.stop(ExitReason::Panic(format!("worker {} failed", self.id)));
}
}
async fn stopped(&mut self, _reason: &ExitReason, _ctx: &mut ActorContext) {
self.stopped_count.fetch_add(1, Ordering::SeqCst);
}
async fn handle_message(&mut self, msg: Message, ctx: &mut ActorContext) {
if let Some(cmd) = msg.downcast_ref::<&str>() {
match *cmd {
"fail" => {
ctx.stop(ExitReason::Panic("commanded to fail".to_string()));
}
"exit_normal" => {
ctx.stop(ExitReason::Normal);
}
_ => {}
}
}
}
}
struct SignalCounter {
exit_count: Arc<AtomicUsize>,
down_count: Arc<AtomicUsize>,
}
#[async_trait]
impl Actor for SignalCounter {
async fn started(&mut self, ctx: &mut ActorContext) {
ctx.trap_exit(true);
}
async fn handle_signal(&mut self, signal: Signal, _ctx: &mut ActorContext) {
match signal {
Signal::Exit { .. } => {
self.exit_count.fetch_add(1, Ordering::SeqCst);
}
Signal::Down { .. } => {
self.down_count.fetch_add(1, Ordering::SeqCst);
}
_ => {}
}
}
async fn handle_message(&mut self, _msg: Message, _ctx: &mut ActorContext) {}
}
#[tokio::test]
async fn test_supervisor_starts_children() {
let system = Arc::new(ActorSystem::new());
let worker_a_started = Arc::new(AtomicUsize::new(0));
let worker_b_started = Arc::new(AtomicUsize::new(0));
let worker_a_stopped = Arc::new(AtomicUsize::new(0));
let worker_b_stopped = Arc::new(AtomicUsize::new(0));
let spec = SupervisorSpec::new(RestartStrategy::OneForOne)
.child(ChildSpec::new("worker_a", {
let started = Arc::clone(&worker_a_started);
let stopped = Arc::clone(&worker_a_stopped);
move || {
Box::new(TestWorker::new(
"A",
Arc::clone(&started),
Arc::clone(&stopped),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("worker_b", {
let started = Arc::clone(&worker_b_started);
let stopped = Arc::clone(&worker_b_stopped);
move || {
Box::new(TestWorker::new(
"B",
Arc::clone(&started),
Arc::clone(&stopped),
Arc::new(AtomicBool::new(false)),
))
}
}));
let supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
assert_eq!(worker_a_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_b_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_a_stopped.load(Ordering::SeqCst), 0);
assert_eq!(worker_b_stopped.load(Ordering::SeqCst), 0);
assert!(system.is_actor_alive(supervisor.pid()));
}
#[tokio::test]
async fn test_supervisor_oneforone_restart() {
let system = Arc::new(ActorSystem::new());
let worker_a_started = Arc::new(AtomicUsize::new(0));
let worker_b_started = Arc::new(AtomicUsize::new(0));
let worker_c_started = Arc::new(AtomicUsize::new(0));
let worker_a_stopped = Arc::new(AtomicUsize::new(0));
let worker_b_stopped = Arc::new(AtomicUsize::new(0));
let worker_c_stopped = Arc::new(AtomicUsize::new(0));
let should_fail_b = Arc::new(AtomicBool::new(false));
let spec = SupervisorSpec::new(RestartStrategy::OneForOne)
.child(ChildSpec::new("worker_a", {
let started = Arc::clone(&worker_a_started);
let stopped = Arc::clone(&worker_a_stopped);
move || {
Box::new(TestWorker::new(
"A",
Arc::clone(&started),
Arc::clone(&stopped),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("worker_b", {
let started = Arc::clone(&worker_b_started);
let stopped = Arc::clone(&worker_b_stopped);
let fail = Arc::clone(&should_fail_b);
move || {
Box::new(TestWorker::new(
"B",
Arc::clone(&started),
Arc::clone(&stopped),
Arc::clone(&fail),
))
}
}))
.child(ChildSpec::new("worker_c", {
let started = Arc::clone(&worker_c_started);
let stopped = Arc::clone(&worker_c_stopped);
move || {
Box::new(TestWorker::new(
"C",
Arc::clone(&started),
Arc::clone(&stopped),
Arc::new(AtomicBool::new(false)),
))
}
}));
let _supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
assert_eq!(worker_a_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_b_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_c_started.load(Ordering::SeqCst), 1);
should_fail_b.store(true, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
sleep(Duration::from_millis(200)).await;
}
#[tokio::test]
async fn test_supervisor_oneforall_restart() {
let system = Arc::new(ActorSystem::new());
let worker_a_started = Arc::new(AtomicUsize::new(0));
let worker_b_started = Arc::new(AtomicUsize::new(0));
let worker_c_started = Arc::new(AtomicUsize::new(0));
let spec = SupervisorSpec::new(RestartStrategy::OneForAll)
.child(ChildSpec::new("worker_a", {
let started = Arc::clone(&worker_a_started);
move || {
Box::new(TestWorker::new(
"A",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("worker_b", {
let started = Arc::clone(&worker_b_started);
move || {
Box::new(TestWorker::new(
"B",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("worker_c", {
let started = Arc::clone(&worker_c_started);
move || {
Box::new(TestWorker::new(
"C",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}));
let _supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
assert_eq!(worker_a_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_b_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_c_started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_supervisor_restforone_restart() {
let system = Arc::new(ActorSystem::new());
let worker_a_started = Arc::new(AtomicUsize::new(0));
let worker_b_started = Arc::new(AtomicUsize::new(0));
let worker_c_started = Arc::new(AtomicUsize::new(0));
let spec = SupervisorSpec::new(RestartStrategy::RestForOne)
.child(ChildSpec::new("worker_a", {
let started = Arc::clone(&worker_a_started);
move || {
Box::new(TestWorker::new(
"A",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("worker_b", {
let started = Arc::clone(&worker_b_started);
move || {
Box::new(TestWorker::new(
"B",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("worker_c", {
let started = Arc::clone(&worker_c_started);
move || {
Box::new(TestWorker::new(
"C",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}));
let _supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
assert_eq!(worker_a_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_b_started.load(Ordering::SeqCst), 1);
assert_eq!(worker_c_started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_supervisor_restart_intensity() {
let system = Arc::new(ActorSystem::new());
let worker_started = Arc::new(AtomicUsize::new(0));
let should_fail = Arc::new(AtomicBool::new(true));
let spec = SupervisorSpec::new(RestartStrategy::OneForOne)
.intensity(RestartIntensity {
max_restarts: 2,
within_seconds: 5,
})
.child(ChildSpec::new("flaky_worker", {
let started = Arc::clone(&worker_started);
let fail = Arc::clone(&should_fail);
move || {
Box::new(TestWorker::new(
"Flaky",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::clone(&fail),
))
}
}));
let supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
let initial_starts = worker_started.load(Ordering::SeqCst);
assert!(
initial_starts >= 1,
"Worker should have started at least once"
);
sleep(Duration::from_millis(300)).await;
let final_starts = worker_started.load(Ordering::SeqCst);
let supervisor_alive = system.is_actor_alive(supervisor.pid());
if supervisor_alive {
assert!(
final_starts >= initial_starts,
"Worker restarts should be tracked"
);
} else {
assert!(
final_starts >= 1,
"Worker should have started before supervisor stopped"
);
}
}
#[tokio::test]
async fn test_supervisor_normal_exit_no_restart() {
let system = Arc::new(ActorSystem::new());
let worker_started = Arc::new(AtomicUsize::new(0));
let worker_stopped = Arc::new(AtomicUsize::new(0));
let spec = SupervisorSpec::new(RestartStrategy::OneForOne).child(ChildSpec::new("worker", {
let started = Arc::clone(&worker_started);
let stopped = Arc::clone(&worker_stopped);
move || {
Box::new(TestWorker::new(
"W",
Arc::clone(&started),
Arc::clone(&stopped),
Arc::new(AtomicBool::new(false)),
))
}
}));
let _supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
assert_eq!(worker_started.load(Ordering::SeqCst), 1);
sleep(Duration::from_millis(200)).await;
assert_eq!(worker_started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_supervisor_traps_exits() {
let system = Arc::new(ActorSystem::new());
let worker_started = Arc::new(AtomicUsize::new(0));
let spec = SupervisorSpec::new(RestartStrategy::OneForOne).child(ChildSpec::new("worker", {
let started = Arc::clone(&worker_started);
move || {
Box::new(TestWorker::new(
"W",
Arc::clone(&started),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}));
let supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
assert!(system.is_actor_alive(supervisor.pid()));
assert_eq!(worker_started.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_supervisor_spec_builder() {
let spec = SupervisorSpec::new(RestartStrategy::OneForOne)
.intensity(RestartIntensity {
max_restarts: 5,
within_seconds: 10,
})
.child(ChildSpec::new("test", || {
Box::new(TestWorker::new(
"T",
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}));
assert_eq!(spec.strategy, RestartStrategy::OneForOne);
assert_eq!(spec.intensity.max_restarts, 5);
assert_eq!(spec.intensity.within_seconds, 10);
assert_eq!(spec.children.len(), 1);
}
#[tokio::test]
async fn test_supervisor_multiple_children() {
let system = Arc::new(ActorSystem::new());
let count1 = Arc::new(AtomicUsize::new(0));
let count2 = Arc::new(AtomicUsize::new(0));
let count3 = Arc::new(AtomicUsize::new(0));
let count4 = Arc::new(AtomicUsize::new(0));
let count5 = Arc::new(AtomicUsize::new(0));
let spec = SupervisorSpec::new(RestartStrategy::OneForOne)
.child(ChildSpec::new("w1", {
let c = Arc::clone(&count1);
move || {
Box::new(TestWorker::new(
"1",
Arc::clone(&c),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("w2", {
let c = Arc::clone(&count2);
move || {
Box::new(TestWorker::new(
"2",
Arc::clone(&c),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("w3", {
let c = Arc::clone(&count3);
move || {
Box::new(TestWorker::new(
"3",
Arc::clone(&c),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("w4", {
let c = Arc::clone(&count4);
move || {
Box::new(TestWorker::new(
"4",
Arc::clone(&c),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}))
.child(ChildSpec::new("w5", {
let c = Arc::clone(&count5);
move || {
Box::new(TestWorker::new(
"5",
Arc::clone(&c),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}
}));
let supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(150)).await;
assert_eq!(count1.load(Ordering::SeqCst), 1);
assert_eq!(count2.load(Ordering::SeqCst), 1);
assert_eq!(count3.load(Ordering::SeqCst), 1);
assert_eq!(count4.load(Ordering::SeqCst), 1);
assert_eq!(count5.load(Ordering::SeqCst), 1);
assert!(system.is_actor_alive(supervisor.pid()));
}
#[tokio::test]
async fn test_supervisor_restart_strategy_equality() {
assert_eq!(RestartStrategy::OneForOne, RestartStrategy::OneForOne);
assert_ne!(RestartStrategy::OneForOne, RestartStrategy::OneForAll);
assert_ne!(RestartStrategy::OneForOne, RestartStrategy::RestForOne);
}
#[tokio::test]
async fn test_restart_intensity_defaults() {
let intensity = RestartIntensity::default();
assert_eq!(intensity.max_restarts, 3);
assert_eq!(intensity.within_seconds, 5);
}
#[tokio::test]
async fn test_child_spec_creation() {
let _spec = ChildSpec::new("test_child", || {
Box::new(TestWorker::new(
"T",
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
});
}
#[tokio::test]
async fn test_empty_supervisor() {
let system = Arc::new(ActorSystem::new());
let spec = SupervisorSpec::new(RestartStrategy::OneForOne);
let supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(50)).await;
assert!(system.is_actor_alive(supervisor.pid()));
}
#[tokio::test]
async fn test_supervisor_with_signal_counter() {
let system = Arc::new(ActorSystem::new());
let exit_count = Arc::new(AtomicUsize::new(0));
let down_count = Arc::new(AtomicUsize::new(0));
let _observer = system.spawn(SignalCounter {
exit_count: Arc::clone(&exit_count),
down_count: Arc::clone(&down_count),
});
let spec =
SupervisorSpec::new(RestartStrategy::OneForOne).child(ChildSpec::new("worker", || {
Box::new(TestWorker::new(
"W",
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicUsize::new(0)),
Arc::new(AtomicBool::new(false)),
))
}));
let _supervisor = spawn_supervisor(&system, spec);
sleep(Duration::from_millis(100)).await;
}