1use ractor::concurrency::{sleep, Duration, Instant, JoinHandle};
2use ractor::{
3 call, Actor, ActorCell, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr,
4 SupervisionEvent,
5};
6use std::collections::HashMap;
7
8use crate::core::{
9 ChildFailureState, ChildSpec, CoreSupervisorOptions, RestartLog, SupervisorCore,
10 SupervisorError,
11};
12use crate::ExitReason;
13
14#[derive(Debug, Clone)]
15pub struct DynamicSupervisorOptions {
16 pub max_children: Option<usize>,
17 pub max_restarts: usize,
18 pub max_window: Duration,
19 pub reset_after: Option<Duration>,
20}
21
22#[derive(Clone, Debug)]
23pub struct DynamicSupervisorState {
24 pub child_failure_state: HashMap<String, ChildFailureState>,
25 pub restart_log: Vec<RestartLog>,
26 pub options: DynamicSupervisorOptions,
27 pub active_children: HashMap<String, ActiveChild>,
28}
29
30#[derive(Clone, Debug)]
31pub struct ActiveChild {
32 pub spec: ChildSpec,
33 pub cell: ActorCell,
34}
35
36pub enum DynamicSupervisorMsg {
37 SpawnChild {
38 spec: ChildSpec,
39 reply: Option<RpcReplyPort<Result<(), ActorProcessingErr>>>,
40 },
41 TerminateChild {
42 child_id: String,
43 reply: Option<RpcReplyPort<()>>,
44 },
45 InspectState(RpcReplyPort<DynamicSupervisorState>),
46}
47
48impl CoreSupervisorOptions<()> for DynamicSupervisorOptions {
49 fn max_restarts(&self) -> usize {
50 self.max_restarts
51 }
52
53 fn max_window(&self) -> Duration {
54 self.max_window
55 }
56
57 fn reset_after(&self) -> Option<Duration> {
58 self.reset_after
59 }
60
61 fn strategy(&self) {}
62}
63
64impl SupervisorCore for DynamicSupervisorState {
65 type Message = DynamicSupervisorMsg;
66 type Options = DynamicSupervisorOptions;
67 type Strategy = (); fn child_failure_state(&mut self) -> &mut HashMap<String, ChildFailureState> {
70 &mut self.child_failure_state
71 }
72
73 fn restart_log(&mut self) -> &mut Vec<RestartLog> {
74 &mut self.restart_log
75 }
76
77 fn options(&self) -> &DynamicSupervisorOptions {
78 &self.options
79 }
80
81 fn restart_msg(
82 &self,
83 child_spec: &ChildSpec,
84 _strategy: (),
85 _myself: ActorRef<Self::Message>,
86 ) -> Self::Message {
87 DynamicSupervisorMsg::SpawnChild {
88 spec: child_spec.clone(),
89 reply: None,
90 }
91 }
92}
93
94type DynamicSupervisorArguments = DynamicSupervisorOptions;
95
96#[cfg_attr(feature = "async-trait", ractor::async_trait)]
97impl Actor for DynamicSupervisor {
98 type Msg = DynamicSupervisorMsg;
99 type State = DynamicSupervisorState;
100 type Arguments = DynamicSupervisorArguments;
101
102 async fn pre_start(
103 &self,
104 _myself: ActorRef<Self::Msg>,
105 options: Self::Arguments,
106 ) -> Result<Self::State, ActorProcessingErr> {
107 Ok(DynamicSupervisorState {
108 child_failure_state: HashMap::new(),
109 restart_log: Vec::new(),
110 active_children: HashMap::new(),
111 options,
112 })
113 }
114
115 async fn handle(
116 &self,
117 myself: ActorRef<Self::Msg>,
118 msg: Self::Msg,
119 state: &mut Self::State,
120 ) -> Result<(), ActorProcessingErr> {
121 let res = match msg {
122 DynamicSupervisorMsg::SpawnChild { spec, reply } => {
123 let mut res = self
124 .handle_spawn_child(&spec, reply.is_some(), state, myself.clone())
125 .await;
126
127 if let Some(reply) = reply {
128 reply.send(res)?;
129 res = Ok(()); }
131 res
132 }
133 DynamicSupervisorMsg::TerminateChild { child_id, reply } => {
134 self.handle_terminate_child(&child_id, state, myself.clone())
135 .await;
136 if let Some(reply) = reply {
137 reply.send(())?;
138 }
139 Ok(())
140 }
141 DynamicSupervisorMsg::InspectState(reply) => {
142 reply.send(state.clone())?;
143 Ok(())
144 }
145 };
146
147 #[cfg(test)]
148 {
149 store_final_state(myself, state).await;
150 }
151
152 res
153 }
154
155 async fn handle_supervisor_evt(
156 &self,
157 myself: ActorRef<Self::Msg>,
158 evt: SupervisionEvent,
159 state: &mut Self::State,
160 ) -> Result<(), ActorProcessingErr> {
161 match evt {
162 SupervisionEvent::ActorStarted(cell) => {
163 let child_id = cell
164 .get_name()
165 .ok_or(SupervisorError::ChildNameNotSet { pid: cell.get_id() })?;
166 log::info!("Started child: {}", child_id);
167 if state.active_children.contains_key(&child_id) {
168 state
170 .child_failure_state
171 .entry(child_id.clone())
172 .or_insert_with(|| ChildFailureState {
173 restart_count: 0,
174 last_fail_instant: Instant::now(),
175 });
176 }
177 }
178 SupervisionEvent::ActorTerminated(cell, _final_state, reason) => {
179 self.handle_child_restart(cell, false, state, myself, &ExitReason::Reason(reason))?;
180 }
181 SupervisionEvent::ActorFailed(cell, err) => {
182 self.handle_child_restart(cell, true, state, myself, &ExitReason::Error(err))?;
183 }
184 SupervisionEvent::ProcessGroupChanged(_group) => {}
185 }
186 Ok(())
187 }
188
189 async fn post_stop(
192 &self,
193 _myself: ActorRef<Self::Msg>,
194 _state: &mut Self::State,
195 ) -> Result<(), ActorProcessingErr> {
196 #[cfg(test)]
197 {
198 store_final_state(_myself, _state).await;
199 }
200 Ok(())
201 }
202}
203
204pub struct DynamicSupervisor;
205
206impl DynamicSupervisor {
207 pub async fn spawn(
208 name: ActorName,
209 startup_args: DynamicSupervisorArguments,
210 ) -> Result<(ActorRef<DynamicSupervisorMsg>, JoinHandle<()>), SpawnErr> {
211 Actor::spawn(Some(name), DynamicSupervisor, startup_args).await
212 }
213
214 pub async fn spawn_linked<T: Actor>(
215 name: ActorName,
216 handler: T,
217 startup_args: T::Arguments,
218 supervisor: ActorCell,
219 ) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
220 Actor::spawn_linked(Some(name), handler, startup_args, supervisor).await
221 }
222
223 pub async fn spawn_child(
224 sup_ref: ActorRef<DynamicSupervisorMsg>,
225 spec: ChildSpec,
226 ) -> Result<(), ActorProcessingErr> {
227 call!(sup_ref, |reply| {
228 DynamicSupervisorMsg::SpawnChild {
229 spec,
230 reply: Some(reply),
231 }
232 })?
233 }
234
235 pub async fn terminate_child(
236 sup_ref: ActorRef<DynamicSupervisorMsg>,
237 child_id: String,
238 ) -> Result<(), ActorProcessingErr> {
239 call!(sup_ref, |reply| {
240 DynamicSupervisorMsg::TerminateChild {
241 child_id,
242 reply: Some(reply),
243 }
244 })?;
245
246 Ok(())
247 }
248
249 async fn handle_spawn_child(
250 &self,
251 spec: &ChildSpec,
252 first_start: bool,
253 state: &mut DynamicSupervisorState,
254 myself: ActorRef<DynamicSupervisorMsg>,
255 ) -> Result<(), ActorProcessingErr> {
256 if !first_start {
257 state.track_global_restart(&spec.id)?;
258 sleep(Duration::from_millis(10)).await;
259 }
260
261 if let Some(max) = state.options.max_children {
263 if state.active_children.len() >= max {
264 return Err(SupervisorError::Meltdown {
265 reason: "max_children exceeded".to_string(),
266 }
267 .into());
268 }
269 }
270
271 let result = spec
273 .spawn_fn
274 .call(myself.get_cell().clone(), spec.id.clone())
275 .await
276 .map_err(|e| SupervisorError::ChildSpawnError {
277 child_id: spec.id.clone(),
278 reason: e.to_string(),
279 });
280
281 match result {
282 Ok(child_cell) => {
283 state.active_children.insert(
285 spec.id.clone(),
286 ActiveChild {
287 cell: child_cell.clone(),
288 spec: spec.clone(),
289 },
290 );
291
292 state
294 .child_failure_state
295 .entry(spec.id.clone())
296 .or_insert_with(|| ChildFailureState {
297 restart_count: 0,
298 last_fail_instant: Instant::now(),
299 });
300 }
301 Err(err) => {
302 state
303 .handle_child_restart(spec, true, myself, &ExitReason::Error(err.into()))
304 .map_err(|e| SupervisorError::ChildSpawnError {
305 child_id: spec.id.clone(),
306 reason: e.to_string(),
307 })?;
308 }
309 }
310
311 Ok(())
312 }
313
314 async fn handle_terminate_child(
315 &self,
316 child_id: &str,
317 state: &mut DynamicSupervisorState,
318 myself: ActorRef<DynamicSupervisorMsg>,
319 ) {
320 if let Some(child) = state.active_children.remove(child_id) {
321 child.cell.unlink(myself.get_cell());
322 child.cell.kill();
323 }
324 }
325
326 fn handle_child_restart(
327 &self,
328 cell: ActorCell,
329 abnormal: bool,
330 state: &mut DynamicSupervisorState,
331 myself: ActorRef<DynamicSupervisorMsg>,
332 reason: &ExitReason,
333 ) -> Result<(), ActorProcessingErr> {
334 let child_id = cell
335 .get_name()
336 .ok_or(SupervisorError::ChildNameNotSet { pid: cell.get_id() })?;
337
338 let child =
339 state
340 .active_children
341 .remove(&child_id)
342 .ok_or(SupervisorError::ChildNotFound {
343 child_id: child_id.clone(),
344 })?;
345
346 state.handle_child_restart(&child.spec, abnormal, myself, reason)?;
347
348 Ok(())
349 }
350}
351
352#[cfg(test)]
354static SUPERVISOR_FINAL: std::sync::OnceLock<
355 tokio::sync::Mutex<HashMap<String, DynamicSupervisorState>>,
356> = std::sync::OnceLock::new();
357
358#[cfg(test)]
359async fn store_final_state(myself: ActorRef<DynamicSupervisorMsg>, state: &DynamicSupervisorState) {
360 let mut map = SUPERVISOR_FINAL
361 .get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
362 .lock()
363 .await;
364 if let Some(name) = myself.get_name() {
365 map.insert(name, state.clone());
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::core::{ChildBackoffFn, ChildSpec, Restart};
373 use crate::SpawnFn;
374 use ractor::concurrency::{sleep, Duration, Instant};
375 use ractor::{call_t, Actor, ActorCell, ActorRef, ActorStatus};
376 use serial_test::serial;
377 use std::collections::HashMap;
378 use std::sync::atomic::{AtomicU64, Ordering};
379 use std::sync::Arc;
380
381 #[cfg(test)]
382 static ACTOR_CALL_COUNT: std::sync::OnceLock<
383 tokio::sync::Mutex<std::collections::HashMap<String, u64>>,
384 > = std::sync::OnceLock::new();
385
386 async fn before_each() {
387 if let Some(map) = SUPERVISOR_FINAL.get() {
389 let mut map = map.lock().await;
390 map.clear();
391 }
392 if let Some(map) = ACTOR_CALL_COUNT.get() {
394 let mut map = map.lock().await;
395 map.clear();
396 }
397 sleep(Duration::from_millis(10)).await;
398 }
399
400 async fn increment_actor_count(child_id: &str) {
401 let mut map = ACTOR_CALL_COUNT
402 .get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
403 .lock()
404 .await;
405 *map.entry(child_id.to_string()).or_default() += 1;
406 }
407
408 async fn read_final_supervisor_state(sup_name: &str) -> DynamicSupervisorState {
410 let map = SUPERVISOR_FINAL
411 .get()
412 .expect("SUPERVISOR_FINAL not initialized!")
413 .lock()
414 .await;
415 map.get(sup_name)
416 .cloned()
417 .unwrap_or_else(|| panic!("No final state for supervisor '{sup_name}'"))
418 }
419
420 async fn read_actor_call_count(child_id: &str) -> u64 {
421 let map = ACTOR_CALL_COUNT
422 .get()
423 .expect("ACTOR_CALL_COUNT not initialized!")
424 .lock()
425 .await;
426 *map.get(child_id)
427 .unwrap_or_else(|| panic!("No actor call count for child '{child_id}'"))
428 }
429
430 #[derive(Clone)]
432 pub enum ChildBehavior {
433 DelayedFail {
434 ms: u64,
435 },
436 DelayedNormal {
437 ms: u64,
438 },
439 ImmediateFail,
440 ImmediateNormal,
441 CountedFails {
442 delay_ms: u64,
443 fail_count: u64,
444 current: Arc<AtomicU64>,
445 },
446 FailWaitFail {
447 initial_fails: u64,
448 wait_ms: u64,
449 final_fails: u64,
450 current: Arc<AtomicU64>,
451 },
452 }
453
454 pub struct TestChild;
455
456 #[cfg_attr(feature = "async-trait", ractor::async_trait)]
457 impl Actor for TestChild {
458 type Msg = ();
459 type State = ChildBehavior;
460 type Arguments = ChildBehavior;
461
462 async fn pre_start(
463 &self,
464 myself: ActorRef<Self::Msg>,
465 arg: Self::Arguments,
466 ) -> Result<Self::State, ractor::ActorProcessingErr> {
467 increment_actor_count(myself.get_name().unwrap().as_str()).await;
469 match arg {
470 ChildBehavior::DelayedFail { ms } => {
471 myself.send_after(Duration::from_millis(ms), || ());
472 }
473 ChildBehavior::DelayedNormal { ms } => {
474 myself.send_after(Duration::from_millis(ms), || ());
475 }
476 ChildBehavior::ImmediateFail => panic!("Immediate fail => ActorFailed"),
477 ChildBehavior::ImmediateNormal => myself.stop(None),
478 ChildBehavior::CountedFails { delay_ms, .. } => {
479 myself.send_after(Duration::from_millis(delay_ms), || ());
480 }
481 ChildBehavior::FailWaitFail { .. } => {
482 myself.cast(())?;
484 }
485 }
486 Ok(arg)
487 }
488
489 async fn handle(
490 &self,
491 myself: ActorRef<Self::Msg>,
492 _msg: Self::Msg,
493 state: &mut Self::State,
494 ) -> Result<(), ractor::ActorProcessingErr> {
495 match state {
496 ChildBehavior::DelayedFail { .. } => panic!("Delayed fail => ActorFailed"),
497 ChildBehavior::DelayedNormal { .. } => myself.stop(None),
498 ChildBehavior::ImmediateFail => panic!("ImmediateFail => ActorFailed"),
499 ChildBehavior::ImmediateNormal => myself.stop(None),
500 ChildBehavior::CountedFails {
501 fail_count,
502 current,
503 ..
504 } => {
505 let old = current.fetch_add(1, Ordering::SeqCst);
506 let newv = old + 1;
507 if newv <= *fail_count {
508 panic!("CountedFails => fail #{newv}");
509 }
510 }
511 ChildBehavior::FailWaitFail {
512 initial_fails,
513 wait_ms,
514 final_fails,
515 current,
516 } => {
517 let so_far = current.fetch_add(1, Ordering::SeqCst) + 1;
518 if so_far <= *initial_fails {
519 panic!("FailWaitFail => initial fail #{so_far}");
520 } else if so_far == *initial_fails + 1 {
521 myself.send_after(Duration::from_millis(*wait_ms), || ());
523 } else {
524 let n = so_far - (*initial_fails + 1);
525 if n <= *final_fails {
526 panic!("FailWaitFail => final fail #{n}");
527 }
528 }
529 }
530 }
531 Ok(())
532 }
533 }
534
535 fn get_running_children(
536 sup_ref: &ActorRef<DynamicSupervisorMsg>,
537 ) -> HashMap<String, ActorCell> {
538 sup_ref
539 .get_children()
540 .into_iter()
541 .filter_map(|c| {
542 if c.get_status() == ActorStatus::Running {
543 c.get_name().map(|n| (n, c))
544 } else {
545 None
546 }
547 })
548 .collect()
549 }
550
551 async fn spawn_test_child(
553 sup_cell: ActorCell,
554 id: String,
555 behavior: ChildBehavior,
556 ) -> Result<ActorCell, SpawnErr> {
557 let (ch_ref, _join) =
558 DynamicSupervisor::spawn_linked(id, TestChild, behavior, sup_cell).await?;
559 Ok(ch_ref.get_cell())
560 }
561
562 fn make_child_spec(id: &str, restart: Restart, behavior: ChildBehavior) -> ChildSpec {
564 ChildSpec {
565 id: id.to_string(),
566 restart,
567 spawn_fn: SpawnFn::new(move |sup_cell, child_id| {
568 spawn_test_child(sup_cell, child_id, behavior.clone())
569 }),
570 backoff_fn: None, reset_after: None,
572 }
573 }
574
575 async fn inspect_supervisor(
577 sup_ref: &ActorRef<DynamicSupervisorMsg>,
578 ) -> DynamicSupervisorState {
579 call_t!(sup_ref, DynamicSupervisorMsg::InspectState, 500).unwrap()
580 }
581
582 #[ractor::concurrency::test]
585 #[serial]
586 async fn test_transient_child_normal_exit() -> Result<(), ActorProcessingErr> {
587 before_each().await;
588 let options = DynamicSupervisorOptions {
589 max_children: None,
590 max_restarts: 5,
591 max_window: Duration::from_secs(5),
592 reset_after: None,
593 };
594 let (sup_ref, sup_handle) =
595 DynamicSupervisor::spawn("test_dynamic_normal_exit".into(), options).await?;
596 let child_spec = make_child_spec(
598 "normal-dynamic",
599 Restart::Transient,
600 ChildBehavior::DelayedNormal { ms: 200 },
601 );
602 DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
603 sleep(Duration::from_millis(300)).await;
605 let sup_state = inspect_supervisor(&sup_ref).await;
607 assert_eq!(
608 sup_ref.get_status(),
609 ActorStatus::Running,
610 "Supervisor still running"
611 );
612 assert!(
613 sup_state.active_children.is_empty(),
614 "Child should have exited normally"
615 );
616 assert!(
617 get_running_children(&sup_ref).is_empty(),
618 "No children running"
619 );
620 assert!(sup_state.restart_log.is_empty(), "No restarts expected");
621 sup_ref.stop(None);
623 let _ = sup_handle.await;
624 let final_st = read_final_supervisor_state("test_dynamic_normal_exit").await;
626 assert!(final_st.restart_log.is_empty());
627 assert_eq!(
628 read_actor_call_count("normal-dynamic").await,
629 1,
630 "Spawned exactly once"
631 );
632 Ok(())
633 }
634
635 #[ractor::concurrency::test]
638 #[serial]
639 async fn test_permanent_child_meltdown() -> Result<(), ActorProcessingErr> {
640 before_each().await;
641 let options = DynamicSupervisorOptions {
642 max_children: None,
644 max_restarts: 1,
645 max_window: Duration::from_secs(2),
646 reset_after: None,
647 };
648 let (sup_ref, sup_handle) =
649 DynamicSupervisor::spawn("test_permanent_child_meltdown".into(), options).await?;
650 let fail_child_spec = make_child_spec(
651 "fail-child",
652 Restart::Permanent,
653 ChildBehavior::ImmediateFail,
654 );
655 DynamicSupervisor::spawn_child(sup_ref.clone(), fail_child_spec).await?;
656 let _ = sup_handle.await;
657 assert_eq!(
658 sup_ref.get_status(),
659 ActorStatus::Stopped,
660 "Supervisor meltdown expected"
661 );
662 let final_state = read_final_supervisor_state("test_permanent_child_meltdown").await;
664 assert!(
666 final_state.restart_log.len() == 2,
667 "Expected at least 2 restarts leading to meltdown"
668 );
669 assert_eq!(read_actor_call_count("fail-child").await, 2);
671 Ok(())
672 }
673
674 #[ractor::concurrency::test]
677 #[serial]
678 async fn test_temporary_child_fail_no_restart() -> Result<(), ActorProcessingErr> {
679 before_each().await;
680 let options = DynamicSupervisorOptions {
681 max_children: None,
682 max_restarts: 10,
683 max_window: Duration::from_secs(10),
684 reset_after: None,
685 };
686 let (sup_ref, sup_handle) =
687 DynamicSupervisor::spawn("test_temporary_child_fail_no_restart".into(), options)
688 .await?;
689 let temp_child = make_child_spec(
690 "temp-fail",
691 Restart::Temporary,
692 ChildBehavior::DelayedFail { ms: 200 },
693 );
694 DynamicSupervisor::spawn_child(sup_ref.clone(), temp_child).await?;
695 sleep(Duration::from_millis(400)).await;
697 assert_eq!(sup_ref.get_status(), ActorStatus::Running);
699 let sup_state = inspect_supervisor(&sup_ref).await;
700 assert!(
701 sup_state.active_children.is_empty(),
702 "Temporary child not restarted"
703 );
704 assert!(
705 get_running_children(&sup_ref).is_empty(),
706 "No children running"
707 );
708 sup_ref.stop(None);
709 let _ = sup_handle.await;
710 let final_st = read_final_supervisor_state("test_temporary_child_fail_no_restart").await;
711 assert_eq!(final_st.restart_log.len(), 0, "No restarts occurred");
712 assert_eq!(read_actor_call_count("temp-fail").await, 1);
713 Ok(())
714 }
715
716 #[ractor::concurrency::test]
719 #[serial]
720 async fn test_max_restarts_in_time_window() -> Result<(), ActorProcessingErr> {
721 before_each().await;
722 let options = DynamicSupervisorOptions {
724 max_children: None,
725 max_restarts: 2,
726 max_window: Duration::from_secs(1),
727 reset_after: None,
728 };
729 let (sup_ref, sup_handle) =
730 DynamicSupervisor::spawn("test_dynamic_max_restarts_in_time_window".into(), options)
731 .await?;
732 let child_spec =
733 make_child_spec("fastfail", Restart::Permanent, ChildBehavior::ImmediateFail);
734 DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
736 let _ = sup_handle.await;
738 assert_eq!(sup_ref.get_status(), ActorStatus::Stopped);
739 let final_state =
740 read_final_supervisor_state("test_dynamic_max_restarts_in_time_window").await;
741 assert_eq!(
742 final_state.restart_log.len(),
743 3,
744 "Should see 3 fails in meltdown window"
745 );
746 assert_eq!(read_actor_call_count("fastfail").await, 3);
747 Ok(())
748 }
749
750 #[ractor::concurrency::test]
753 #[serial]
754 async fn test_supervisor_restart_counter_reset_after() -> Result<(), ActorProcessingErr> {
755 before_each().await;
756 let options = DynamicSupervisorOptions {
759 max_children: None,
760 max_restarts: 2,
761 max_window: Duration::from_secs(10),
762 reset_after: Some(Duration::from_secs(2)),
763 };
764 let (sup_ref, sup_handle) = DynamicSupervisor::spawn(
765 "test_supervisor_restart_counter_reset_after".into(),
766 options,
767 )
768 .await?;
769 let behavior = ChildBehavior::FailWaitFail {
772 initial_fails: 2,
773 wait_ms: 3000,
774 final_fails: 1,
775 current: Arc::new(AtomicU64::new(0)),
776 };
777 let child_spec = make_child_spec("reset-test", Restart::Permanent, behavior);
778 DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
779 sleep(Duration::from_secs(4)).await;
781 assert_eq!(sup_ref.get_status(), ActorStatus::Running);
783 sup_ref.stop(None);
784 let _ = sup_handle.await;
785 let final_st =
787 read_final_supervisor_state("test_supervisor_restart_counter_reset_after").await;
788 assert_eq!(
789 final_st.restart_log.len(),
790 1,
791 "Only the final fail is in meltdown log"
792 );
793 assert_eq!(read_actor_call_count("reset-test").await, 4);
795 Ok(())
796 }
797
798 #[ractor::concurrency::test]
802 #[serial]
803 async fn test_child_level_restart_counter_reset_after() -> Result<(), ActorProcessingErr> {
804 before_each().await;
805 let options = DynamicSupervisorOptions {
807 max_children: None,
808 max_restarts: 5,
809 max_window: Duration::from_secs(30),
810 reset_after: None,
811 };
812 let (sup_ref, sup_handle) = DynamicSupervisor::spawn(
813 "test_dynamic_child_level_restart_counter_reset_after".into(),
814 options,
815 )
816 .await?;
817 let behavior = ChildBehavior::FailWaitFail {
819 initial_fails: 2,
820 wait_ms: 3000,
821 final_fails: 1,
822 current: Arc::new(AtomicU64::new(0)),
823 };
824 let mut child_spec = make_child_spec("child-reset", Restart::Permanent, behavior);
825 child_spec.reset_after = Some(Duration::from_secs(2));
827 DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
828 sleep(Duration::from_secs(5)).await;
830 assert_eq!(sup_ref.get_status(), ActorStatus::Running);
832 sup_ref.stop(None);
833 let _ = sup_handle.await;
834 let final_st =
835 read_final_supervisor_state("test_dynamic_child_level_restart_counter_reset_after")
836 .await;
837 let cfs = final_st.child_failure_state.get("child-reset").unwrap();
838 assert_eq!(cfs.restart_count, 1);
840 assert_eq!(read_actor_call_count("child-reset").await, 4);
842 Ok(())
843 }
844
845 #[ractor::concurrency::test]
847 #[serial]
848 async fn test_child_level_backoff_fn_delays_restart() -> Result<(), ActorProcessingErr> {
849 before_each().await;
850 let options = DynamicSupervisorOptions {
853 max_children: None,
854 max_restarts: 1,
855 max_window: Duration::from_secs(10),
856 reset_after: None,
857 };
858 let (sup_ref, sup_handle) =
859 DynamicSupervisor::spawn("test_dynamic_child_backoff".to_string(), options).await?;
860 let backoff_fn: ChildBackoffFn = ChildBackoffFn::new(|_id, count, _last, _child_reset| {
862 if count <= 1 {
863 None
864 } else {
865 Some(Duration::from_secs(1))
866 }
867 });
868 let mut child_spec = make_child_spec(
869 "backoff-child",
870 Restart::Permanent,
871 ChildBehavior::ImmediateFail,
872 );
873 child_spec.backoff_fn = Some(backoff_fn);
874 let start_instant = Instant::now();
875 DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
876 let _ = sup_handle.await;
878 let elapsed = start_instant.elapsed();
879 assert!(
881 elapsed >= Duration::from_secs(1),
882 "Expected at least 1s of backoff before meltdown"
883 );
884 let final_st = read_final_supervisor_state("test_dynamic_child_backoff").await;
885 assert_eq!(final_st.restart_log.len(), 2, "two fails => meltdown on #2");
886 assert_eq!(read_actor_call_count("backoff-child").await, 2);
887 Ok(())
888 }
889
890 #[ractor::concurrency::test]
893 #[serial]
894 async fn test_exceed_max_children() -> Result<(), ActorProcessingErr> {
895 before_each().await;
896 let options = DynamicSupervisorOptions {
897 max_children: Some(1), max_restarts: 999,
899 max_window: Duration::from_secs(999),
900 reset_after: None,
901 };
902 let (sup_ref, sup_handle) =
903 DynamicSupervisor::spawn("test_exceed_max_children".to_string(), options).await?;
904 let spec_1 = make_child_spec(
905 "allowed-child",
906 Restart::Permanent,
907 ChildBehavior::DelayedNormal { ms: 500 },
908 );
909 let spec_2 = make_child_spec(
910 "unallowed-child",
911 Restart::Permanent,
912 ChildBehavior::DelayedNormal { ms: 500 },
913 );
914 DynamicSupervisor::spawn_child(sup_ref.clone(), spec_1).await?;
916 let result = DynamicSupervisor::spawn_child(sup_ref.clone(), spec_2).await;
918 assert!(
919 result.is_err(),
920 "Spawning a second child should fail due to max_children=1"
921 );
922 let final_st = read_final_supervisor_state("test_exceed_max_children").await;
923 assert_eq!(
924 final_st.active_children.len(),
925 1,
926 "Second child should not have been spawned"
927 );
928 assert!(
929 get_running_children(&sup_ref).len() == 1,
930 "Only one child running"
931 );
932 sup_ref.stop(None);
933 let _ = sup_handle.await;
934 Ok(())
935 }
936
937 #[ractor::concurrency::test]
939 #[serial]
940 async fn test_terminate_child() -> Result<(), ActorProcessingErr> {
941 before_each().await;
942 let options = DynamicSupervisorOptions {
943 max_children: None,
944 max_restarts: 10,
945 max_window: Duration::from_secs(10),
946 reset_after: None,
947 };
948 let (sup_ref, sup_handle) =
949 DynamicSupervisor::spawn("test_terminate_child".into(), options).await?;
950 let child_spec = make_child_spec(
951 "kill-me",
952 Restart::Permanent,
953 ChildBehavior::DelayedNormal { ms: 9999 },
954 );
955 DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
956 let st_before = inspect_supervisor(&sup_ref).await;
958 assert!(st_before.active_children.contains_key("kill-me"));
959 assert!(
960 get_running_children(&sup_ref).len() == 1,
961 "Child is running"
962 );
963 DynamicSupervisor::terminate_child(sup_ref.clone(), "kill-me".to_string()).await?;
965 let st_after = inspect_supervisor(&sup_ref).await;
967 assert!(!st_after.active_children.contains_key("kill-me"));
968 assert!(
969 get_running_children(&sup_ref).is_empty(),
970 "No child is running"
971 );
972 assert_eq!(sup_ref.get_status(), ActorStatus::Running);
973 sup_ref.stop(None);
975 let _ = sup_handle.await;
976 let final_st = read_final_supervisor_state("test_terminate_child").await;
977 assert!(final_st.restart_log.is_empty());
979 assert_eq!(read_actor_call_count("kill-me").await, 1);
981 Ok(())
982 }
983}