ractor_supervisor/
dynamic.rs

1use ractor::concurrency::{sleep, Duration, Instant, JoinHandle};
2use ractor::{
3    call, Actor, ActorCell, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr,
4    SupervisionEvent,
5};
6use std::collections::HashMap;
7
8use crate::core::{
9    ChildFailureState, ChildSpec, CoreSupervisorOptions, RestartLog, SupervisorCore,
10    SupervisorError,
11};
12use crate::ExitReason;
13
14#[derive(Debug, Clone)]
15pub struct DynamicSupervisorOptions {
16    pub max_children: Option<usize>,
17    pub max_restarts: usize,
18    pub max_window: Duration,
19    pub reset_after: Option<Duration>,
20}
21
22#[derive(Clone, Debug)]
23pub struct DynamicSupervisorState {
24    pub child_failure_state: HashMap<String, ChildFailureState>,
25    pub restart_log: Vec<RestartLog>,
26    pub options: DynamicSupervisorOptions,
27    pub active_children: HashMap<String, ActiveChild>,
28}
29
30#[derive(Clone, Debug)]
31pub struct ActiveChild {
32    pub spec: ChildSpec,
33    pub cell: ActorCell,
34}
35
36pub enum DynamicSupervisorMsg {
37    SpawnChild {
38        spec: ChildSpec,
39        reply: Option<RpcReplyPort<Result<(), ActorProcessingErr>>>,
40    },
41    TerminateChild {
42        child_id: String,
43        reply: Option<RpcReplyPort<()>>,
44    },
45    InspectState(RpcReplyPort<DynamicSupervisorState>),
46}
47
48impl CoreSupervisorOptions<()> for DynamicSupervisorOptions {
49    fn max_restarts(&self) -> usize {
50        self.max_restarts
51    }
52
53    fn max_window(&self) -> Duration {
54        self.max_window
55    }
56
57    fn reset_after(&self) -> Option<Duration> {
58        self.reset_after
59    }
60
61    fn strategy(&self) {}
62}
63
64impl SupervisorCore for DynamicSupervisorState {
65    type Message = DynamicSupervisorMsg;
66    type Options = DynamicSupervisorOptions;
67    type Strategy = (); // Uses implicit OneForOne
68
69    fn child_failure_state(&mut self) -> &mut HashMap<String, ChildFailureState> {
70        &mut self.child_failure_state
71    }
72
73    fn restart_log(&mut self) -> &mut Vec<RestartLog> {
74        &mut self.restart_log
75    }
76
77    fn options(&self) -> &DynamicSupervisorOptions {
78        &self.options
79    }
80
81    fn restart_msg(
82        &self,
83        child_spec: &ChildSpec,
84        _strategy: (),
85        _myself: ActorRef<Self::Message>,
86    ) -> Self::Message {
87        DynamicSupervisorMsg::SpawnChild {
88            spec: child_spec.clone(),
89            reply: None,
90        }
91    }
92}
93
94type DynamicSupervisorArguments = DynamicSupervisorOptions;
95
96#[cfg_attr(feature = "async-trait", ractor::async_trait)]
97impl Actor for DynamicSupervisor {
98    type Msg = DynamicSupervisorMsg;
99    type State = DynamicSupervisorState;
100    type Arguments = DynamicSupervisorArguments;
101
102    async fn pre_start(
103        &self,
104        _myself: ActorRef<Self::Msg>,
105        options: Self::Arguments,
106    ) -> Result<Self::State, ActorProcessingErr> {
107        Ok(DynamicSupervisorState {
108            child_failure_state: HashMap::new(),
109            restart_log: Vec::new(),
110            active_children: HashMap::new(),
111            options,
112        })
113    }
114
115    async fn handle(
116        &self,
117        myself: ActorRef<Self::Msg>,
118        msg: Self::Msg,
119        state: &mut Self::State,
120    ) -> Result<(), ActorProcessingErr> {
121        let res = match msg {
122            DynamicSupervisorMsg::SpawnChild { spec, reply } => {
123                let mut res = self
124                    .handle_spawn_child(&spec, reply.is_some(), state, myself.clone())
125                    .await;
126
127                if let Some(reply) = reply {
128                    reply.send(res)?;
129                    res = Ok(()); // Clear the error for the main handler
130                }
131                res
132            }
133            DynamicSupervisorMsg::TerminateChild { child_id, reply } => {
134                self.handle_terminate_child(&child_id, state, myself.clone())
135                    .await;
136                if let Some(reply) = reply {
137                    reply.send(())?;
138                }
139                Ok(())
140            }
141            DynamicSupervisorMsg::InspectState(reply) => {
142                reply.send(state.clone())?;
143                Ok(())
144            }
145        };
146
147        #[cfg(test)]
148        {
149            store_final_state(myself, state).await;
150        }
151
152        res
153    }
154
155    async fn handle_supervisor_evt(
156        &self,
157        myself: ActorRef<Self::Msg>,
158        evt: SupervisionEvent,
159        state: &mut Self::State,
160    ) -> Result<(), ActorProcessingErr> {
161        match evt {
162            SupervisionEvent::ActorStarted(cell) => {
163                let child_id = cell
164                    .get_name()
165                    .ok_or(SupervisorError::ChildNameNotSet { pid: cell.get_id() })?;
166                log::info!("Started child: {}", child_id);
167                if state.active_children.contains_key(&child_id) {
168                    // This is a child we know about, so we track it
169                    state
170                        .child_failure_state
171                        .entry(child_id.clone())
172                        .or_insert_with(|| ChildFailureState {
173                            restart_count: 0,
174                            last_fail_instant: Instant::now(),
175                        });
176                }
177            }
178            SupervisionEvent::ActorTerminated(cell, _final_state, reason) => {
179                self.handle_child_restart(cell, false, state, myself, &ExitReason::Reason(reason))?;
180            }
181            SupervisionEvent::ActorFailed(cell, err) => {
182                self.handle_child_restart(cell, true, state, myself, &ExitReason::Error(err))?;
183            }
184            SupervisionEvent::ProcessGroupChanged(_group) => {}
185        }
186        Ok(())
187    }
188
189    /// Called if the supervisor stops normally (e.g. `.stop(None)`).
190    /// For meltdown stops, we skip this, but we still store final state for testing.
191    async fn post_stop(
192        &self,
193        _myself: ActorRef<Self::Msg>,
194        _state: &mut Self::State,
195    ) -> Result<(), ActorProcessingErr> {
196        #[cfg(test)]
197        {
198            store_final_state(_myself, _state).await;
199        }
200        Ok(())
201    }
202}
203
204pub struct DynamicSupervisor;
205
206impl DynamicSupervisor {
207    pub async fn spawn(
208        name: ActorName,
209        startup_args: DynamicSupervisorArguments,
210    ) -> Result<(ActorRef<DynamicSupervisorMsg>, JoinHandle<()>), SpawnErr> {
211        Actor::spawn(Some(name), DynamicSupervisor, startup_args).await
212    }
213
214    pub async fn spawn_linked<T: Actor>(
215        name: ActorName,
216        handler: T,
217        startup_args: T::Arguments,
218        supervisor: ActorCell,
219    ) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
220        Actor::spawn_linked(Some(name), handler, startup_args, supervisor).await
221    }
222
223    pub async fn spawn_child(
224        sup_ref: ActorRef<DynamicSupervisorMsg>,
225        spec: ChildSpec,
226    ) -> Result<(), ActorProcessingErr> {
227        call!(sup_ref, |reply| {
228            DynamicSupervisorMsg::SpawnChild {
229                spec,
230                reply: Some(reply),
231            }
232        })?
233    }
234
235    pub async fn terminate_child(
236        sup_ref: ActorRef<DynamicSupervisorMsg>,
237        child_id: String,
238    ) -> Result<(), ActorProcessingErr> {
239        call!(sup_ref, |reply| {
240            DynamicSupervisorMsg::TerminateChild {
241                child_id,
242                reply: Some(reply),
243            }
244        })?;
245
246        Ok(())
247    }
248
249    async fn handle_spawn_child(
250        &self,
251        spec: &ChildSpec,
252        first_start: bool,
253        state: &mut DynamicSupervisorState,
254        myself: ActorRef<DynamicSupervisorMsg>,
255    ) -> Result<(), ActorProcessingErr> {
256        if !first_start {
257            state.track_global_restart(&spec.id)?;
258            sleep(Duration::from_millis(10)).await;
259        }
260
261        // Check max children
262        if let Some(max) = state.options.max_children {
263            if state.active_children.len() >= max {
264                return Err(SupervisorError::Meltdown {
265                    reason: "max_children exceeded".to_string(),
266                }
267                .into());
268            }
269        }
270
271        // Spawn child
272        let result = spec
273            .spawn_fn
274            .call(myself.get_cell().clone(), spec.id.clone())
275            .await
276            .map_err(|e| SupervisorError::ChildSpawnError {
277                child_id: spec.id.clone(),
278                reason: e.to_string(),
279            });
280
281        match result {
282            Ok(child_cell) => {
283                // (1) Track the child in `active_children`
284                state.active_children.insert(
285                    spec.id.clone(),
286                    ActiveChild {
287                        cell: child_cell.clone(),
288                        spec: spec.clone(),
289                    },
290                );
291
292                // (2) ALSO populate child_failure_state, so meltdown logic works
293                state
294                    .child_failure_state
295                    .entry(spec.id.clone())
296                    .or_insert_with(|| ChildFailureState {
297                        restart_count: 0,
298                        last_fail_instant: Instant::now(),
299                    });
300            }
301            Err(err) => {
302                state
303                    .handle_child_restart(spec, true, myself, &ExitReason::Error(err.into()))
304                    .map_err(|e| SupervisorError::ChildSpawnError {
305                        child_id: spec.id.clone(),
306                        reason: e.to_string(),
307                    })?;
308            }
309        }
310
311        Ok(())
312    }
313
314    async fn handle_terminate_child(
315        &self,
316        child_id: &str,
317        state: &mut DynamicSupervisorState,
318        myself: ActorRef<DynamicSupervisorMsg>,
319    ) {
320        if let Some(child) = state.active_children.remove(child_id) {
321            child.cell.unlink(myself.get_cell());
322            child.cell.kill();
323        }
324    }
325
326    fn handle_child_restart(
327        &self,
328        cell: ActorCell,
329        abnormal: bool,
330        state: &mut DynamicSupervisorState,
331        myself: ActorRef<DynamicSupervisorMsg>,
332        reason: &ExitReason,
333    ) -> Result<(), ActorProcessingErr> {
334        let child_id = cell
335            .get_name()
336            .ok_or(SupervisorError::ChildNameNotSet { pid: cell.get_id() })?;
337
338        let child =
339            state
340                .active_children
341                .remove(&child_id)
342                .ok_or(SupervisorError::ChildNotFound {
343                    child_id: child_id.clone(),
344                })?;
345
346        state.handle_child_restart(&child.spec, abnormal, myself, reason)?;
347
348        Ok(())
349    }
350}
351
352/// A global map for test usage, storing final states after each handle call and in `post_stop`.
353#[cfg(test)]
354static SUPERVISOR_FINAL: std::sync::OnceLock<
355    tokio::sync::Mutex<HashMap<String, DynamicSupervisorState>>,
356> = std::sync::OnceLock::new();
357
358#[cfg(test)]
359async fn store_final_state(myself: ActorRef<DynamicSupervisorMsg>, state: &DynamicSupervisorState) {
360    let mut map = SUPERVISOR_FINAL
361        .get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
362        .lock()
363        .await;
364    if let Some(name) = myself.get_name() {
365        map.insert(name, state.clone());
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::core::{ChildBackoffFn, ChildSpec, Restart};
373    use crate::SpawnFn;
374    use ractor::concurrency::{sleep, Duration, Instant};
375    use ractor::{call_t, Actor, ActorCell, ActorRef, ActorStatus};
376    use serial_test::serial;
377    use std::collections::HashMap;
378    use std::sync::atomic::{AtomicU64, Ordering};
379    use std::sync::Arc;
380
381    #[cfg(test)]
382    static ACTOR_CALL_COUNT: std::sync::OnceLock<
383        tokio::sync::Mutex<std::collections::HashMap<String, u64>>,
384    > = std::sync::OnceLock::new();
385
386    async fn before_each() {
387        // Clear the final supervisor state map (test usage)
388        if let Some(map) = SUPERVISOR_FINAL.get() {
389            let mut map = map.lock().await;
390            map.clear();
391        }
392        // Clear our actor call counts
393        if let Some(map) = ACTOR_CALL_COUNT.get() {
394            let mut map = map.lock().await;
395            map.clear();
396        }
397        sleep(Duration::from_millis(10)).await;
398    }
399
400    async fn increment_actor_count(child_id: &str) {
401        let mut map = ACTOR_CALL_COUNT
402            .get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
403            .lock()
404            .await;
405        *map.entry(child_id.to_string()).or_default() += 1;
406    }
407
408    /// Utility to read the final state of a named supervisor after it stops.
409    async fn read_final_supervisor_state(sup_name: &str) -> DynamicSupervisorState {
410        let map = SUPERVISOR_FINAL
411            .get()
412            .expect("SUPERVISOR_FINAL not initialized!")
413            .lock()
414            .await;
415        map.get(sup_name)
416            .cloned()
417            .unwrap_or_else(|| panic!("No final state for supervisor '{sup_name}'"))
418    }
419
420    async fn read_actor_call_count(child_id: &str) -> u64 {
421        let map = ACTOR_CALL_COUNT
422            .get()
423            .expect("ACTOR_CALL_COUNT not initialized!")
424            .lock()
425            .await;
426        *map.get(child_id)
427            .unwrap_or_else(|| panic!("No actor call count for child '{child_id}'"))
428    }
429
430    // Child behaviors for tests
431    #[derive(Clone)]
432    pub enum ChildBehavior {
433        DelayedFail {
434            ms: u64,
435        },
436        DelayedNormal {
437            ms: u64,
438        },
439        ImmediateFail,
440        ImmediateNormal,
441        CountedFails {
442            delay_ms: u64,
443            fail_count: u64,
444            current: Arc<AtomicU64>,
445        },
446        FailWaitFail {
447            initial_fails: u64,
448            wait_ms: u64,
449            final_fails: u64,
450            current: Arc<AtomicU64>,
451        },
452    }
453
454    pub struct TestChild;
455
456    #[cfg_attr(feature = "async-trait", ractor::async_trait)]
457    impl Actor for TestChild {
458        type Msg = ();
459        type State = ChildBehavior;
460        type Arguments = ChildBehavior;
461
462        async fn pre_start(
463            &self,
464            myself: ActorRef<Self::Msg>,
465            arg: Self::Arguments,
466        ) -> Result<Self::State, ractor::ActorProcessingErr> {
467            // Track how many times this particular child-id was started
468            increment_actor_count(myself.get_name().unwrap().as_str()).await;
469            match arg {
470                ChildBehavior::DelayedFail { ms } => {
471                    myself.send_after(Duration::from_millis(ms), || ());
472                }
473                ChildBehavior::DelayedNormal { ms } => {
474                    myself.send_after(Duration::from_millis(ms), || ());
475                }
476                ChildBehavior::ImmediateFail => panic!("Immediate fail => ActorFailed"),
477                ChildBehavior::ImmediateNormal => myself.stop(None),
478                ChildBehavior::CountedFails { delay_ms, .. } => {
479                    myself.send_after(Duration::from_millis(delay_ms), || ());
480                }
481                ChildBehavior::FailWaitFail { .. } => {
482                    // Kick off our chain of fails by sending a first message
483                    myself.cast(())?;
484                }
485            }
486            Ok(arg)
487        }
488
489        async fn handle(
490            &self,
491            myself: ActorRef<Self::Msg>,
492            _msg: Self::Msg,
493            state: &mut Self::State,
494        ) -> Result<(), ractor::ActorProcessingErr> {
495            match state {
496                ChildBehavior::DelayedFail { .. } => panic!("Delayed fail => ActorFailed"),
497                ChildBehavior::DelayedNormal { .. } => myself.stop(None),
498                ChildBehavior::ImmediateFail => panic!("ImmediateFail => ActorFailed"),
499                ChildBehavior::ImmediateNormal => myself.stop(None),
500                ChildBehavior::CountedFails {
501                    fail_count,
502                    current,
503                    ..
504                } => {
505                    let old = current.fetch_add(1, Ordering::SeqCst);
506                    let newv = old + 1;
507                    if newv <= *fail_count {
508                        panic!("CountedFails => fail #{newv}");
509                    }
510                }
511                ChildBehavior::FailWaitFail {
512                    initial_fails,
513                    wait_ms,
514                    final_fails,
515                    current,
516                } => {
517                    let so_far = current.fetch_add(1, Ordering::SeqCst) + 1;
518                    if so_far <= *initial_fails {
519                        panic!("FailWaitFail => initial fail #{so_far}");
520                    } else if so_far == *initial_fails + 1 {
521                        // wait some ms => schedule next message => final fails
522                        myself.send_after(Duration::from_millis(*wait_ms), || ());
523                    } else {
524                        let n = so_far - (*initial_fails + 1);
525                        if n <= *final_fails {
526                            panic!("FailWaitFail => final fail #{n}");
527                        }
528                    }
529                }
530            }
531            Ok(())
532        }
533    }
534
535    fn get_running_children(
536        sup_ref: &ActorRef<DynamicSupervisorMsg>,
537    ) -> HashMap<String, ActorCell> {
538        sup_ref
539            .get_children()
540            .into_iter()
541            .filter_map(|c| {
542                if c.get_status() == ActorStatus::Running {
543                    c.get_name().map(|n| (n, c))
544                } else {
545                    None
546                }
547            })
548            .collect()
549    }
550
551    // Helper for spawning our test child
552    async fn spawn_test_child(
553        sup_cell: ActorCell,
554        id: String,
555        behavior: ChildBehavior,
556    ) -> Result<ActorCell, SpawnErr> {
557        let (ch_ref, _join) =
558            DynamicSupervisor::spawn_linked(id, TestChild, behavior, sup_cell).await?;
559        Ok(ch_ref.get_cell())
560    }
561
562    // Reusable child spec
563    fn make_child_spec(id: &str, restart: Restart, behavior: ChildBehavior) -> ChildSpec {
564        ChildSpec {
565            id: id.to_string(),
566            restart,
567            spawn_fn: SpawnFn::new(move |sup_cell, child_id| {
568                spawn_test_child(sup_cell, child_id, behavior.clone())
569            }),
570            backoff_fn: None, // by default no per-child backoff
571            reset_after: None,
572        }
573    }
574
575    /// Helper to read the dynamic supervisor's state via `InspectState`.
576    async fn inspect_supervisor(
577        sup_ref: &ActorRef<DynamicSupervisorMsg>,
578    ) -> DynamicSupervisorState {
579        call_t!(sup_ref, DynamicSupervisorMsg::InspectState, 500).unwrap()
580    }
581
582    /// Basic test: spawn a child that exits normally (`DelayedNormal`).
583    /// No meltdown, no restarts.
584    #[ractor::concurrency::test]
585    #[serial]
586    async fn test_transient_child_normal_exit() -> Result<(), ActorProcessingErr> {
587        before_each().await;
588        let options = DynamicSupervisorOptions {
589            max_children: None,
590            max_restarts: 5,
591            max_window: Duration::from_secs(5),
592            reset_after: None,
593        };
594        let (sup_ref, sup_handle) =
595            DynamicSupervisor::spawn("test_dynamic_normal_exit".into(), options).await?;
596        // Spawn a child that will exit after 200ms
597        let child_spec = make_child_spec(
598            "normal-dynamic",
599            Restart::Transient,
600            ChildBehavior::DelayedNormal { ms: 200 },
601        );
602        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
603        // Let the child exit
604        sleep(Duration::from_millis(300)).await;
605        // Confirm child is gone; no meltdown triggered
606        let sup_state = inspect_supervisor(&sup_ref).await;
607        assert_eq!(
608            sup_ref.get_status(),
609            ActorStatus::Running,
610            "Supervisor still running"
611        );
612        assert!(
613            sup_state.active_children.is_empty(),
614            "Child should have exited normally"
615        );
616        assert!(
617            get_running_children(&sup_ref).is_empty(),
618            "No children running"
619        );
620        assert!(sup_state.restart_log.is_empty(), "No restarts expected");
621        // Stop supervisor
622        sup_ref.stop(None);
623        let _ = sup_handle.await;
624        // Validate final state
625        let final_st = read_final_supervisor_state("test_dynamic_normal_exit").await;
626        assert!(final_st.restart_log.is_empty());
627        assert_eq!(
628            read_actor_call_count("normal-dynamic").await,
629            1,
630            "Spawned exactly once"
631        );
632        Ok(())
633    }
634
635    /// If a permanent child fails, it should restart until `max_restarts` is exceeded,
636    /// at which point meltdown stops the supervisor.
637    #[ractor::concurrency::test]
638    #[serial]
639    async fn test_permanent_child_meltdown() -> Result<(), ActorProcessingErr> {
640        before_each().await;
641        let options = DynamicSupervisorOptions {
642            // meltdown after 1 (the second) fail in <= 2s
643            max_children: None,
644            max_restarts: 1,
645            max_window: Duration::from_secs(2),
646            reset_after: None,
647        };
648        let (sup_ref, sup_handle) =
649            DynamicSupervisor::spawn("test_permanent_child_meltdown".into(), options).await?;
650        let fail_child_spec = make_child_spec(
651            "fail-child",
652            Restart::Permanent,
653            ChildBehavior::ImmediateFail,
654        );
655        DynamicSupervisor::spawn_child(sup_ref.clone(), fail_child_spec).await?;
656        let _ = sup_handle.await;
657        assert_eq!(
658            sup_ref.get_status(),
659            ActorStatus::Stopped,
660            "Supervisor meltdown expected"
661        );
662        // Final checks
663        let final_state = read_final_supervisor_state("test_permanent_child_meltdown").await;
664        // meltdown => 2 fails in the log
665        assert!(
666            final_state.restart_log.len() == 2,
667            "Expected at least 2 restarts leading to meltdown"
668        );
669        // The child was started twice
670        assert_eq!(read_actor_call_count("fail-child").await, 2);
671        Ok(())
672    }
673
674    /// If a `Temporary` child fails, it should never restart.
675    /// The supervisor remains running (no meltdown) unless the number of restarts is already at meltdown threshold.
676    #[ractor::concurrency::test]
677    #[serial]
678    async fn test_temporary_child_fail_no_restart() -> Result<(), ActorProcessingErr> {
679        before_each().await;
680        let options = DynamicSupervisorOptions {
681            max_children: None,
682            max_restarts: 10,
683            max_window: Duration::from_secs(10),
684            reset_after: None,
685        };
686        let (sup_ref, sup_handle) =
687            DynamicSupervisor::spawn("test_temporary_child_fail_no_restart".into(), options)
688                .await?;
689        let temp_child = make_child_spec(
690            "temp-fail",
691            Restart::Temporary,
692            ChildBehavior::DelayedFail { ms: 200 },
693        );
694        DynamicSupervisor::spawn_child(sup_ref.clone(), temp_child).await?;
695        // Let the child fail
696        sleep(Duration::from_millis(400)).await;
697        // Supervisor still running, child gone
698        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
699        let sup_state = inspect_supervisor(&sup_ref).await;
700        assert!(
701            sup_state.active_children.is_empty(),
702            "Temporary child not restarted"
703        );
704        assert!(
705            get_running_children(&sup_ref).is_empty(),
706            "No children running"
707        );
708        sup_ref.stop(None);
709        let _ = sup_handle.await;
710        let final_st = read_final_supervisor_state("test_temporary_child_fail_no_restart").await;
711        assert_eq!(final_st.restart_log.len(), 0, "No restarts occurred");
712        assert_eq!(read_actor_call_count("temp-fail").await, 1);
713        Ok(())
714    }
715
716    /// Ensure that if a meltdown threshold is set (max_restarts + max_window),
717    /// multiple quick fails in that window cause meltdown.
718    #[ractor::concurrency::test]
719    #[serial]
720    async fn test_max_restarts_in_time_window() -> Result<(), ActorProcessingErr> {
721        before_each().await;
722        // meltdown on 3 fails in <=1s => max_restarts=2 => meltdown on the 3rd
723        let options = DynamicSupervisorOptions {
724            max_children: None,
725            max_restarts: 2,
726            max_window: Duration::from_secs(1),
727            reset_after: None,
728        };
729        let (sup_ref, sup_handle) =
730            DynamicSupervisor::spawn("test_dynamic_max_restarts_in_time_window".into(), options)
731                .await?;
732        let child_spec =
733            make_child_spec("fastfail", Restart::Permanent, ChildBehavior::ImmediateFail);
734        // This child fails immediately on start => triggers multiple restarts quickly
735        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
736        // Wait for meltdown
737        let _ = sup_handle.await;
738        assert_eq!(sup_ref.get_status(), ActorStatus::Stopped);
739        let final_state =
740            read_final_supervisor_state("test_dynamic_max_restarts_in_time_window").await;
741        assert_eq!(
742            final_state.restart_log.len(),
743            3,
744            "Should see 3 fails in meltdown window"
745        );
746        assert_eq!(read_actor_call_count("fastfail").await, 3);
747        Ok(())
748    }
749
750    /// Tests the optional `reset_after` at the **supervisor** level,
751    /// ensuring that if there's a quiet period, the meltdown log is cleared.
752    #[ractor::concurrency::test]
753    #[serial]
754    async fn test_supervisor_restart_counter_reset_after() -> Result<(), ActorProcessingErr> {
755        before_each().await;
756        // meltdown on 3 fails in <= 10s (max_restarts=2 => meltdown on #3)
757        // but if quiet >=2s => meltdown log is cleared
758        let options = DynamicSupervisorOptions {
759            max_children: None,
760            max_restarts: 2,
761            max_window: Duration::from_secs(10),
762            reset_after: Some(Duration::from_secs(2)),
763        };
764        let (sup_ref, sup_handle) = DynamicSupervisor::spawn(
765            "test_supervisor_restart_counter_reset_after".into(),
766            options,
767        )
768        .await?;
769        // We'll use a child that fails 2 times quickly, then is quiet 3s, then fails again
770        // The meltdown log should get cleared after 3s => final fail sees meltdown log=1 => no meltdown
771        let behavior = ChildBehavior::FailWaitFail {
772            initial_fails: 2,
773            wait_ms: 3000,
774            final_fails: 1,
775            current: Arc::new(AtomicU64::new(0)),
776        };
777        let child_spec = make_child_spec("reset-test", Restart::Permanent, behavior);
778        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
779        // Let the child do its thing: 2 fails quickly => then quiet => final fail
780        sleep(Duration::from_secs(4)).await;
781        // The supervisor should still be alive
782        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
783        sup_ref.stop(None);
784        let _ = sup_handle.await;
785        // meltdown log is reset after the quiet period => only the final fail is logged
786        let final_st =
787            read_final_supervisor_state("test_supervisor_restart_counter_reset_after").await;
788        assert_eq!(
789            final_st.restart_log.len(),
790            1,
791            "Only the final fail is in meltdown log"
792        );
793        // The child started 4 times total
794        assert_eq!(read_actor_call_count("reset-test").await, 4);
795        Ok(())
796    }
797
798    /// Tests the child-level `reset_after`, ensuring that a quiet period
799    /// resets that **child's** fail count. The result is no meltdown if a subsequent fail
800    /// happens after the quiet window.
801    #[ractor::concurrency::test]
802    #[serial]
803    async fn test_child_level_restart_counter_reset_after() -> Result<(), ActorProcessingErr> {
804        before_each().await;
805        // We'll set a large meltdown threshold so we don't meltdown unless the child's restarts remain consecutive.
806        let options = DynamicSupervisorOptions {
807            max_children: None,
808            max_restarts: 5,
809            max_window: Duration::from_secs(30),
810            reset_after: None,
811        };
812        let (sup_ref, sup_handle) = DynamicSupervisor::spawn(
813            "test_dynamic_child_level_restart_counter_reset_after".into(),
814            options,
815        )
816        .await?;
817        // The child fails 2 times quickly => quiet 3s => final fail => from that child's perspective, it starts from 0 again
818        let behavior = ChildBehavior::FailWaitFail {
819            initial_fails: 2,
820            wait_ms: 3000,
821            final_fails: 1,
822            current: Arc::new(AtomicU64::new(0)),
823        };
824        let mut child_spec = make_child_spec("child-reset", Restart::Permanent, behavior);
825        // **Per-child** reset
826        child_spec.reset_after = Some(Duration::from_secs(2));
827        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
828        // Wait for child to do the fails & quiet period
829        sleep(Duration::from_secs(5)).await;
830        // No meltdown => supervisor still up
831        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
832        sup_ref.stop(None);
833        let _ = sup_handle.await;
834        let final_st =
835            read_final_supervisor_state("test_dynamic_child_level_restart_counter_reset_after")
836                .await;
837        let cfs = final_st.child_failure_state.get("child-reset").unwrap();
838        // The final fail saw a reset => the child's restart_count ended up at 1
839        assert_eq!(cfs.restart_count, 1);
840        // total spawns = 4
841        assert_eq!(read_actor_call_count("child-reset").await, 4);
842        Ok(())
843    }
844
845    /// Tests that a child-level backoff function can delay restarts.
846    #[ractor::concurrency::test]
847    #[serial]
848    async fn test_child_level_backoff_fn_delays_restart() -> Result<(), ActorProcessingErr> {
849        before_each().await;
850        // meltdown on 2nd fail => max_restarts=1 => meltdown on fail #2
851        // but we do a 1-second child-level backoff for the second spawn
852        let options = DynamicSupervisorOptions {
853            max_children: None,
854            max_restarts: 1,
855            max_window: Duration::from_secs(10),
856            reset_after: None,
857        };
858        let (sup_ref, sup_handle) =
859            DynamicSupervisor::spawn("test_dynamic_child_backoff".to_string(), options).await?;
860        // Our backoff function that returns Some(1s) after the first restart
861        let backoff_fn: ChildBackoffFn = ChildBackoffFn::new(|_id, count, _last, _child_reset| {
862            if count <= 1 {
863                None
864            } else {
865                Some(Duration::from_secs(1))
866            }
867        });
868        let mut child_spec = make_child_spec(
869            "backoff-child",
870            Restart::Permanent,
871            ChildBehavior::ImmediateFail,
872        );
873        child_spec.backoff_fn = Some(backoff_fn);
874        let start_instant = Instant::now();
875        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
876        // Wait for meltdown
877        let _ = sup_handle.await;
878        let elapsed = start_instant.elapsed();
879        // Because meltdown occurs on the second fail => we must have waited 1s for that second attempt
880        assert!(
881            elapsed >= Duration::from_secs(1),
882            "Expected at least 1s of backoff before meltdown"
883        );
884        let final_st = read_final_supervisor_state("test_dynamic_child_backoff").await;
885        assert_eq!(final_st.restart_log.len(), 2, "two fails => meltdown on #2");
886        assert_eq!(read_actor_call_count("backoff-child").await, 2);
887        Ok(())
888    }
889
890    /// Tests dynamic supervisor's `max_children`: attempting to spawn more children
891    /// than allowed should return an immediate `Meltdown` error (the supervisor stops).
892    #[ractor::concurrency::test]
893    #[serial]
894    async fn test_exceed_max_children() -> Result<(), ActorProcessingErr> {
895        before_each().await;
896        let options = DynamicSupervisorOptions {
897            max_children: Some(1), // only 1 child allowed
898            max_restarts: 999,
899            max_window: Duration::from_secs(999),
900            reset_after: None,
901        };
902        let (sup_ref, sup_handle) =
903            DynamicSupervisor::spawn("test_exceed_max_children".to_string(), options).await?;
904        let spec_1 = make_child_spec(
905            "allowed-child",
906            Restart::Permanent,
907            ChildBehavior::DelayedNormal { ms: 500 },
908        );
909        let spec_2 = make_child_spec(
910            "unallowed-child",
911            Restart::Permanent,
912            ChildBehavior::DelayedNormal { ms: 500 },
913        );
914        // first child is fine
915        DynamicSupervisor::spawn_child(sup_ref.clone(), spec_1).await?;
916        // second child => should fail
917        let result = DynamicSupervisor::spawn_child(sup_ref.clone(), spec_2).await;
918        assert!(
919            result.is_err(),
920            "Spawning a second child should fail due to max_children=1"
921        );
922        let final_st = read_final_supervisor_state("test_exceed_max_children").await;
923        assert_eq!(
924            final_st.active_children.len(),
925            1,
926            "Second child should not have been spawned"
927        );
928        assert!(
929            get_running_children(&sup_ref).len() == 1,
930            "Only one child running"
931        );
932        sup_ref.stop(None);
933        let _ = sup_handle.await;
934        Ok(())
935    }
936
937    /// Tests that `TerminateChild` kills a running child; no meltdown or restarts.
938    #[ractor::concurrency::test]
939    #[serial]
940    async fn test_terminate_child() -> Result<(), ActorProcessingErr> {
941        before_each().await;
942        let options = DynamicSupervisorOptions {
943            max_children: None,
944            max_restarts: 10,
945            max_window: Duration::from_secs(10),
946            reset_after: None,
947        };
948        let (sup_ref, sup_handle) =
949            DynamicSupervisor::spawn("test_terminate_child".into(), options).await?;
950        let child_spec = make_child_spec(
951            "kill-me",
952            Restart::Permanent,
953            ChildBehavior::DelayedNormal { ms: 9999 },
954        );
955        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
956        // Confirm child is present
957        let st_before = inspect_supervisor(&sup_ref).await;
958        assert!(st_before.active_children.contains_key("kill-me"));
959        assert!(
960            get_running_children(&sup_ref).len() == 1,
961            "Child is running"
962        );
963        // Terminate
964        DynamicSupervisor::terminate_child(sup_ref.clone(), "kill-me".to_string()).await?;
965        // Check that child is gone, no meltdown
966        let st_after = inspect_supervisor(&sup_ref).await;
967        assert!(!st_after.active_children.contains_key("kill-me"));
968        assert!(
969            get_running_children(&sup_ref).is_empty(),
970            "No child is running"
971        );
972        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
973        // Stop the supervisor
974        sup_ref.stop(None);
975        let _ = sup_handle.await;
976        let final_st = read_final_supervisor_state("test_terminate_child").await;
977        // No restarts
978        assert!(final_st.restart_log.is_empty());
979        // Only started once
980        assert_eq!(read_actor_call_count("kill-me").await, 1);
981        Ok(())
982    }
983}