use ractor::concurrency::{Instant, JoinHandle};
use ractor::{
    call, Actor, ActorCell, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr,
    SupervisionEvent,
};
use std::collections::HashMap;
use crate::core::{
    ChildFailureState, ChildSpec, CoreSupervisorOptions, RestartLog, SupervisorCore,
    SupervisorError,
};
#[derive(Debug, Clone)]
pub struct DynamicSupervisorOptions {
    pub max_children: Option<usize>,
    pub max_restarts: usize,
    pub max_seconds: usize,
    pub restart_counter_reset_after: Option<u64>,
}
#[derive(Clone, Debug)]
pub struct DynamicSupervisorState {
    pub child_failure_state: HashMap<String, ChildFailureState>,
    pub restart_log: Vec<RestartLog>,
    pub options: DynamicSupervisorOptions,
    pub active_children: HashMap<String, ActiveChild>,
}
#[derive(Clone, Debug)]
pub struct ActiveChild {
    pub spec: ChildSpec,
    pub cell: ActorCell,
}
pub enum DynamicSupervisorMsg {
    SpawnChild {
        spec: ChildSpec,
        reply: Option<RpcReplyPort<Result<(), ActorProcessingErr>>>,
    },
    TerminateChild {
        child_id: String,
        reply: Option<RpcReplyPort<()>>,
    },
    InspectState(RpcReplyPort<DynamicSupervisorState>),
}
impl CoreSupervisorOptions<()> for DynamicSupervisorOptions {
    fn max_restarts(&self) -> usize {
        self.max_restarts
    }
    fn max_seconds(&self) -> usize {
        self.max_seconds
    }
    fn restart_counter_reset_after(&self) -> Option<u64> {
        self.restart_counter_reset_after
    }
    fn strategy(&self) {}
}
impl SupervisorCore for DynamicSupervisorState {
    type Message = DynamicSupervisorMsg;
    type Options = DynamicSupervisorOptions;
    type Strategy = (); fn child_failure_state(&mut self) -> &mut HashMap<String, ChildFailureState> {
        &mut self.child_failure_state
    }
    fn restart_log(&mut self) -> &mut Vec<RestartLog> {
        &mut self.restart_log
    }
    fn options(&self) -> &DynamicSupervisorOptions {
        &self.options
    }
    fn restart_msg(
        &self,
        child_spec: &ChildSpec,
        _strategy: (),
        _myself: ActorRef<Self::Message>,
    ) -> Self::Message {
        DynamicSupervisorMsg::SpawnChild {
            spec: child_spec.clone(),
            reply: None,
        }
    }
}
type DynamicSupervisorArguments = DynamicSupervisorOptions;
#[ractor::async_trait]
impl Actor for DynamicSupervisor {
    type Msg = DynamicSupervisorMsg;
    type State = DynamicSupervisorState;
    type Arguments = DynamicSupervisorArguments;
    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        options: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(DynamicSupervisorState {
            child_failure_state: HashMap::new(),
            restart_log: Vec::new(),
            active_children: HashMap::new(),
            options,
        })
    }
    async fn handle(
        &self,
        myself: ActorRef<Self::Msg>,
        msg: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        let res = match msg {
            DynamicSupervisorMsg::SpawnChild { spec, reply } => {
                let mut res = self
                    .handle_spawn_child(&spec, reply.is_some(), state, myself.clone())
                    .await;
                if let Some(reply) = reply {
                    reply.send(res)?;
                    res = Ok(()); }
                res
            }
            DynamicSupervisorMsg::TerminateChild { child_id, reply } => {
                self.handle_terminate_child(&child_id, state, myself.clone())
                    .await;
                if let Some(reply) = reply {
                    reply.send(())?;
                }
                Ok(())
            }
            DynamicSupervisorMsg::InspectState(reply) => {
                reply.send(state.clone())?;
                Ok(())
            }
        };
        #[cfg(test)]
        {
            store_final_state(myself, state).await;
        }
        res
    }
    async fn handle_supervisor_evt(
        &self,
        myself: ActorRef<Self::Msg>,
        evt: SupervisionEvent,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        match evt {
            SupervisionEvent::ActorStarted(cell) => {
                let child_id = cell
                    .get_name()
                    .ok_or(SupervisorError::ChildNameNotSet { pid: cell.get_id() })?;
                if state.active_children.contains_key(&child_id) {
                    state
                        .child_failure_state
                        .entry(child_id.clone())
                        .or_insert_with(|| ChildFailureState {
                            restart_count: 0,
                            last_fail_instant: Instant::now(),
                        });
                }
            }
            SupervisionEvent::ActorTerminated(cell, ..) => {
                self.handle_child_restart(cell, false, state, myself)?;
            }
            SupervisionEvent::ActorFailed(cell, ..) => {
                self.handle_child_restart(cell, true, state, myself)?;
            }
            SupervisionEvent::ProcessGroupChanged(_group) => {}
        }
        Ok(())
    }
    async fn post_stop(
        &self,
        _myself: ActorRef<Self::Msg>,
        _state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        #[cfg(test)]
        {
            store_final_state(_myself, _state).await;
        }
        Ok(())
    }
}
pub struct DynamicSupervisor;
impl DynamicSupervisor {
    pub async fn spawn(
        name: ActorName,
        startup_args: DynamicSupervisorArguments,
    ) -> Result<(ActorRef<DynamicSupervisorMsg>, JoinHandle<()>), SpawnErr> {
        Actor::spawn(Some(name), DynamicSupervisor, startup_args).await
    }
    pub async fn spawn_linked<T: Actor>(
        name: ActorName,
        handler: T,
        startup_args: T::Arguments,
        supervisor: ActorCell,
    ) -> Result<(ActorRef<T::Msg>, JoinHandle<()>), SpawnErr> {
        Actor::spawn_linked(Some(name), handler, startup_args, supervisor).await
    }
    pub async fn spawn_child(
        sup_ref: ActorRef<DynamicSupervisorMsg>,
        spec: ChildSpec,
    ) -> Result<(), ActorProcessingErr> {
        call!(sup_ref, |reply| {
            DynamicSupervisorMsg::SpawnChild {
                spec,
                reply: Some(reply),
            }
        })?
    }
    pub async fn terminate_child(
        sup_ref: ActorRef<DynamicSupervisorMsg>,
        child_id: String,
    ) -> Result<(), ActorProcessingErr> {
        call!(sup_ref, |reply| {
            DynamicSupervisorMsg::TerminateChild {
                child_id,
                reply: Some(reply),
            }
        })?;
        Ok(())
    }
    async fn handle_spawn_child(
        &self,
        spec: &ChildSpec,
        first_start: bool,
        state: &mut DynamicSupervisorState,
        myself: ActorRef<DynamicSupervisorMsg>,
    ) -> Result<(), ActorProcessingErr> {
        if !first_start {
            state.track_global_restart(&spec.id)?;
        }
        if let Some(max) = state.options.max_children {
            if state.active_children.len() >= max {
                return Err(SupervisorError::Meltdown {
                    reason: "max_children exceeded".to_string(),
                }
                .into());
            }
        }
        let result = (spec.spawn_fn)(myself.get_cell().clone(), spec.id.clone())
            .await
            .map_err(|e| SupervisorError::ChildSpawnError {
                child_id: spec.id.clone(),
                reason: e.to_string(),
            });
        match result {
            Ok(child_cell) => {
                state.active_children.insert(
                    spec.id.clone(),
                    ActiveChild {
                        cell: child_cell.clone(),
                        spec: spec.clone(),
                    },
                );
                state
                    .child_failure_state
                    .entry(spec.id.clone())
                    .or_insert_with(|| ChildFailureState {
                        restart_count: 0,
                        last_fail_instant: Instant::now(),
                    });
            }
            Err(_) => {
                state
                    .handle_child_restart(spec, true, myself)
                    .map_err(|e| SupervisorError::ChildSpawnError {
                        child_id: spec.id.clone(),
                        reason: e.to_string(),
                    })?;
            }
        }
        Ok(())
    }
    async fn handle_terminate_child(
        &self,
        child_id: &str,
        state: &mut DynamicSupervisorState,
        myself: ActorRef<DynamicSupervisorMsg>,
    ) {
        if let Some(child) = state.active_children.remove(child_id) {
            child.cell.unlink(myself.get_cell());
            child.cell.kill();
        }
    }
    fn handle_child_restart(
        &self,
        cell: ActorCell,
        abnormal: bool,
        state: &mut DynamicSupervisorState,
        myself: ActorRef<DynamicSupervisorMsg>,
    ) -> Result<(), ActorProcessingErr> {
        let child_id = cell
            .get_name()
            .ok_or(SupervisorError::ChildNameNotSet { pid: cell.get_id() })?;
        let child =
            state
                .active_children
                .remove(&child_id)
                .ok_or(SupervisorError::ChildNotFound {
                    child_id: child_id.clone(),
                })?;
        state.handle_child_restart(&child.spec, abnormal, myself)?;
        Ok(())
    }
}
#[cfg(test)]
static SUPERVISOR_FINAL: std::sync::OnceLock<
    tokio::sync::Mutex<HashMap<String, DynamicSupervisorState>>,
> = std::sync::OnceLock::new();
#[cfg(test)]
async fn store_final_state(myself: ActorRef<DynamicSupervisorMsg>, state: &DynamicSupervisorState) {
    let mut map = SUPERVISOR_FINAL
        .get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
        .lock()
        .await;
    if let Some(name) = myself.get_name() {
        map.insert(name, state.clone());
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::{ChildBackoffFn, ChildSpec, Restart};
    use futures_util::FutureExt;
    use ractor::concurrency::{sleep, Duration, Instant};
    use ractor::{call_t, Actor, ActorCell, ActorRef, ActorStatus};
    use serial_test::serial;
    use std::collections::HashMap;
    use std::sync::atomic::{AtomicU64, Ordering};
    use std::sync::Arc;
    #[cfg(test)]
    static ACTOR_CALL_COUNT: std::sync::OnceLock<
        tokio::sync::Mutex<std::collections::HashMap<String, u64>>,
    > = std::sync::OnceLock::new();
    async fn before_each() {
        match super::SUPERVISOR_FINAL.get() {
            None => {}
            Some(map) => {
                let mut map = map.lock().await;
                map.clear();
            }
        }
        match ACTOR_CALL_COUNT.get() {
            None => {}
            Some(map) => {
                let mut map = map.lock().await;
                map.clear();
            }
        }
        sleep(Duration::from_millis(10)).await;
    }
    async fn increment_actor_count(child_id: &str) {
        let mut map = ACTOR_CALL_COUNT
            .get_or_init(|| tokio::sync::Mutex::new(std::collections::HashMap::new()))
            .lock()
            .await;
        *map.entry(child_id.to_string()).or_default() += 1;
    }
    async fn read_final_supervisor_state(sup_name: &str) -> DynamicSupervisorState {
        let map = super::SUPERVISOR_FINAL
            .get()
            .expect("SUPERVISOR_FINAL not initialized!")
            .lock()
            .await;
        map.get(sup_name)
            .cloned()
            .unwrap_or_else(|| panic!("No final state for supervisor '{sup_name}'"))
    }
    async fn read_actor_call_count(child_id: &str) -> u64 {
        let map = ACTOR_CALL_COUNT
            .get()
            .expect("ACTOR_CALL_COUNT not initialized!")
            .lock()
            .await;
        *map.get(child_id)
            .unwrap_or_else(|| panic!("No actor call count for child '{child_id}'"))
    }
    #[derive(Clone)]
    pub enum ChildBehavior {
        DelayedFail {
            ms: u64,
        },
        DelayedNormal {
            ms: u64,
        },
        ImmediateFail,
        ImmediateNormal,
        CountedFails {
            delay_ms: u64,
            fail_count: u64,
            current: Arc<AtomicU64>,
        },
        FailWaitFail {
            initial_fails: u64,
            wait_ms: u64,
            final_fails: u64,
            current: Arc<AtomicU64>,
        },
    }
    pub struct TestChild;
    #[ractor::async_trait]
    impl Actor for TestChild {
        type Msg = ();
        type State = ChildBehavior;
        type Arguments = ChildBehavior;
        async fn pre_start(
            &self,
            myself: ActorRef<Self::Msg>,
            arg: Self::Arguments,
        ) -> Result<Self::State, ractor::ActorProcessingErr> {
            increment_actor_count(myself.get_name().unwrap().as_str()).await;
            match arg {
                ChildBehavior::DelayedFail { ms } => {
                    myself.send_after(Duration::from_millis(ms), || ());
                }
                ChildBehavior::DelayedNormal { ms } => {
                    myself.send_after(Duration::from_millis(ms), || ());
                }
                ChildBehavior::ImmediateFail => {
                    panic!("Immediate fail => ActorFailed");
                }
                ChildBehavior::ImmediateNormal => {
                    myself.stop(None);
                }
                ChildBehavior::CountedFails { delay_ms, .. } => {
                    myself.send_after(Duration::from_millis(delay_ms), || ());
                }
                ChildBehavior::FailWaitFail { .. } => {
                    myself.cast(())?;
                }
            }
            Ok(arg)
        }
        async fn handle(
            &self,
            myself: ActorRef<Self::Msg>,
            _msg: Self::Msg,
            state: &mut Self::State,
        ) -> Result<(), ractor::ActorProcessingErr> {
            match state {
                ChildBehavior::DelayedFail { .. } => {
                    panic!("Delayed fail => ActorFailed");
                }
                ChildBehavior::DelayedNormal { .. } => {
                    myself.stop(None);
                }
                ChildBehavior::ImmediateFail => {
                    panic!("ImmediateFail => ActorFailed");
                }
                ChildBehavior::ImmediateNormal => {
                    myself.stop(None);
                }
                ChildBehavior::CountedFails {
                    fail_count,
                    current,
                    ..
                } => {
                    let old = current.fetch_add(1, Ordering::SeqCst);
                    let newv = old + 1;
                    if newv <= *fail_count {
                        panic!("CountedFails => fail #{newv}");
                    }
                }
                ChildBehavior::FailWaitFail {
                    initial_fails,
                    wait_ms,
                    final_fails,
                    current,
                } => {
                    let so_far = current.fetch_add(1, Ordering::SeqCst) + 1;
                    if so_far <= *initial_fails {
                        panic!("FailWaitFail => initial fail #{so_far}");
                    } else if so_far == *initial_fails + 1 {
                        myself.send_after(Duration::from_millis(*wait_ms), || ());
                    } else {
                        let n = so_far - (*initial_fails + 1);
                        if n <= *final_fails {
                            panic!("FailWaitFail => final fail #{n}");
                        }
                    }
                }
            }
            Ok(())
        }
    }
    fn get_running_children(
        sup_ref: &ActorRef<DynamicSupervisorMsg>,
    ) -> HashMap<String, ActorCell> {
        sup_ref
            .get_children()
            .into_iter()
            .filter_map(|c| {
                if c.get_status() == ActorStatus::Running {
                    c.get_name().map(|n| (n, c))
                } else {
                    None
                }
            })
            .collect()
    }
    async fn spawn_test_child(
        sup_cell: ActorCell,
        id: String,
        behavior: ChildBehavior,
    ) -> Result<ActorCell, SpawnErr> {
        let (ch_ref, _join) =
            DynamicSupervisor::spawn_linked(id, TestChild, behavior, sup_cell).await?;
        Ok(ch_ref.get_cell())
    }
    fn make_child_spec(id: &str, restart: Restart, behavior: ChildBehavior) -> ChildSpec {
        ChildSpec {
            id: id.to_string(),
            restart,
            spawn_fn: Arc::new(move |sup_cell, child_id| {
                spawn_test_child(sup_cell, child_id, behavior.clone()).boxed()
            }),
            backoff_fn: None, restart_counter_reset_after: None,
        }
    }
    async fn inspect_supervisor(
        sup_ref: &ActorRef<DynamicSupervisorMsg>,
    ) -> DynamicSupervisorState {
        call_t!(sup_ref, DynamicSupervisorMsg::InspectState, 500).unwrap()
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_transient_child_normal_exit() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 5,
            max_seconds: 5,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_dynamic_normal_exit".into(), options).await?;
        let child_spec = make_child_spec(
            "normal-dynamic",
            Restart::Transient,
            ChildBehavior::DelayedNormal { ms: 200 },
        );
        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
        sleep(Duration::from_millis(300)).await;
        let sup_state = inspect_supervisor(&sup_ref).await;
        assert_eq!(
            sup_ref.get_status(),
            ActorStatus::Running,
            "Supervisor still running"
        );
        assert!(
            sup_state.active_children.is_empty(),
            "Child should have exited normally"
        );
        assert!(
            get_running_children(&sup_ref).is_empty(),
            "No children running"
        );
        assert!(sup_state.restart_log.is_empty(), "No restarts expected");
        sup_ref.stop(None);
        let _ = sup_handle.await;
        let final_st = read_final_supervisor_state("test_dynamic_normal_exit").await;
        assert!(final_st.restart_log.is_empty());
        assert_eq!(
            read_actor_call_count("normal-dynamic").await,
            1,
            "Spawned exactly once"
        );
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_permanent_child_meltdown() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 1,
            max_seconds: 2,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_permanent_child_meltdown".into(), options).await?;
        let fail_child_spec = make_child_spec(
            "fail-child",
            Restart::Permanent,
            ChildBehavior::ImmediateFail,
        );
        DynamicSupervisor::spawn_child(sup_ref.clone(), fail_child_spec).await?;
        let _ = sup_handle.await;
        assert_eq!(
            sup_ref.get_status(),
            ActorStatus::Stopped,
            "Supervisor meltdown expected"
        );
        let final_state = read_final_supervisor_state("test_permanent_child_meltdown").await;
        assert!(
            final_state.restart_log.len() == 2,
            "Expected at least 2 restarts leading to meltdown"
        );
        assert_eq!(read_actor_call_count("fail-child").await, 2);
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_temporary_child_fail_no_restart() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 10,
            max_seconds: 10,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_temporary_child_fail_no_restart".into(), options)
                .await?;
        let temp_child = make_child_spec(
            "temp-fail",
            Restart::Temporary,
            ChildBehavior::DelayedFail { ms: 200 },
        );
        DynamicSupervisor::spawn_child(sup_ref.clone(), temp_child).await?;
        sleep(Duration::from_millis(400)).await;
        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
        let sup_state = inspect_supervisor(&sup_ref).await;
        assert!(
            sup_state.active_children.is_empty(),
            "Temporary child not restarted"
        );
        assert!(
            get_running_children(&sup_ref).is_empty(),
            "No children running"
        );
        sup_ref.stop(None);
        let _ = sup_handle.await;
        let final_st = read_final_supervisor_state("test_temporary_child_fail_no_restart").await;
        assert_eq!(final_st.restart_log.len(), 0, "No restarts occurred");
        assert_eq!(read_actor_call_count("temp-fail").await, 1);
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_max_restarts_in_time_window() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 2,
            max_seconds: 1,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_dynamic_max_restarts_in_time_window".into(), options)
                .await?;
        let child_spec =
            make_child_spec("fastfail", Restart::Permanent, ChildBehavior::ImmediateFail);
        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
        let _ = sup_handle.await;
        assert_eq!(sup_ref.get_status(), ActorStatus::Stopped);
        let final_state =
            read_final_supervisor_state("test_dynamic_max_restarts_in_time_window").await;
        assert_eq!(
            final_state.restart_log.len(),
            3,
            "Should see 3 fails in meltdown window"
        );
        assert_eq!(read_actor_call_count("fastfail").await, 3);
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_supervisor_restart_counter_reset_after() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 2,
            max_seconds: 10,
            restart_counter_reset_after: Some(2),
        };
        let (sup_ref, sup_handle) = DynamicSupervisor::spawn(
            "test_supervisor_restart_counter_reset_after".into(),
            options,
        )
        .await?;
        let behavior = ChildBehavior::FailWaitFail {
            initial_fails: 2,
            wait_ms: 3000,
            final_fails: 1,
            current: Arc::new(AtomicU64::new(0)),
        };
        let child_spec = make_child_spec("reset-test", Restart::Permanent, behavior);
        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
        sleep(Duration::from_secs(4)).await;
        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
        sup_ref.stop(None);
        let _ = sup_handle.await;
        let final_st =
            read_final_supervisor_state("test_supervisor_restart_counter_reset_after").await;
        assert_eq!(
            final_st.restart_log.len(),
            1,
            "Only the final fail is in meltdown log"
        );
        assert_eq!(read_actor_call_count("reset-test").await, 4);
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_child_level_restart_counter_reset_after() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 5,
            max_seconds: 30,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) = DynamicSupervisor::spawn(
            "test_dynamic_child_level_restart_counter_reset_after".into(),
            options,
        )
        .await?;
        let behavior = ChildBehavior::FailWaitFail {
            initial_fails: 2,
            wait_ms: 3000,
            final_fails: 1,
            current: Arc::new(AtomicU64::new(0)),
        };
        let mut child_spec = make_child_spec("child-reset", Restart::Permanent, behavior);
        child_spec.restart_counter_reset_after = Some(2);
        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
        sleep(Duration::from_secs(5)).await;
        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
        sup_ref.stop(None);
        let _ = sup_handle.await;
        let final_st =
            read_final_supervisor_state("test_dynamic_child_level_restart_counter_reset_after")
                .await;
        let cfs = final_st.child_failure_state.get("child-reset").unwrap();
        assert_eq!(cfs.restart_count, 1);
        assert_eq!(read_actor_call_count("child-reset").await, 4);
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_child_level_backoff_fn_delays_restart() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 1,
            max_seconds: 10,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_dynamic_child_backoff".to_string(), options).await?;
        let backoff_fn: ChildBackoffFn = Arc::new(|_id, count, _last, _child_reset| {
            if count <= 1 {
                None
            } else {
                Some(Duration::from_secs(1))
            }
        });
        let mut child_spec = make_child_spec(
            "backoff-child",
            Restart::Permanent,
            ChildBehavior::ImmediateFail,
        );
        child_spec.backoff_fn = Some(backoff_fn);
        let start_instant = Instant::now();
        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
        let _ = sup_handle.await;
        let elapsed = start_instant.elapsed();
        assert!(
            elapsed >= Duration::from_secs(1),
            "Expected at least 1s of backoff before meltdown"
        );
        let final_st = read_final_supervisor_state("test_dynamic_child_backoff").await;
        assert_eq!(final_st.restart_log.len(), 2, "two fails => meltdown on #2");
        assert_eq!(read_actor_call_count("backoff-child").await, 2);
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_exceed_max_children() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: Some(1), max_restarts: 999,
            max_seconds: 999,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_exceed_max_children".to_string(), options).await?;
        let spec_1 = make_child_spec(
            "allowed-child",
            Restart::Permanent,
            ChildBehavior::DelayedNormal { ms: 500 },
        );
        let spec_2 = make_child_spec(
            "unallowed-child",
            Restart::Permanent,
            ChildBehavior::DelayedNormal { ms: 500 },
        );
        DynamicSupervisor::spawn_child(sup_ref.clone(), spec_1).await?;
        let result = DynamicSupervisor::spawn_child(sup_ref.clone(), spec_2).await;
        assert!(
            result.is_err(),
            "Spawning a second child should fail due to max_children=1"
        );
        let final_st = read_final_supervisor_state("test_exceed_max_children").await;
        assert_eq!(
            final_st.active_children.len(),
            1,
            "Second child should not have been spawned"
        );
        assert!(
            get_running_children(&sup_ref).len() == 1,
            "Only one child running"
        );
        sup_ref.stop(None);
        let _ = sup_handle.await;
        Ok(())
    }
    #[ractor::concurrency::test]
    #[serial]
    async fn test_terminate_child() -> Result<(), ActorProcessingErr> {
        before_each().await;
        let options = DynamicSupervisorOptions {
            max_children: None,
            max_restarts: 10,
            max_seconds: 10,
            restart_counter_reset_after: None,
        };
        let (sup_ref, sup_handle) =
            DynamicSupervisor::spawn("test_terminate_child".into(), options).await?;
        let child_spec = make_child_spec(
            "kill-me",
            Restart::Permanent,
            ChildBehavior::DelayedNormal { ms: 9999 },
        );
        DynamicSupervisor::spawn_child(sup_ref.clone(), child_spec).await?;
        let st_before = inspect_supervisor(&sup_ref).await;
        assert!(st_before.active_children.contains_key("kill-me"));
        assert!(
            get_running_children(&sup_ref).len() == 1,
            "Child is running"
        );
        DynamicSupervisor::terminate_child(sup_ref.clone(), "kill-me".to_string()).await?;
        let st_after = inspect_supervisor(&sup_ref).await;
        assert!(!st_after.active_children.contains_key("kill-me"));
        assert!(
            get_running_children(&sup_ref).is_empty(),
            "No child is running"
        );
        assert_eq!(sup_ref.get_status(), ActorStatus::Running);
        sup_ref.stop(None);
        let _ = sup_handle.await;
        let final_st = read_final_supervisor_state("test_terminate_child").await;
        assert!(final_st.restart_log.is_empty());
        assert_eq!(read_actor_call_count("kill-me").await, 1);
        Ok(())
    }
}