Skip to main content

ash_flare/
supervisor_stateful.rs

1//! Stateful supervisor with shared in-memory key-value store
2//!
3//! This module provides a parallel implementation of the supervisor tree with built-in
4//! shared state via `WorkerContext`. Workers receive the context as a parameter in their
5//! factory functions, allowing them to share data through a concurrent in-memory store.
6//!
7//! # Choosing Between Stateful and Regular Supervisors
8//!
9//! Use [`StatefulSupervisorSpec`] when:
10//! - Workers need to share state (counters, caches, configuration)
11//! - You need coordination between workers (flags, semaphores)
12//! - State should survive worker restarts
13//! - You want built-in concurrency-safe storage without external dependencies
14//!
15//! Use [`SupervisorSpec`](crate::SupervisorSpec) when:
16//! - Workers are stateless or manage their own state independently
17//! - No data sharing is required between workers
18//! - You want minimal overhead (no shared context)
19//! - Workers communicate through channels or external systems
20//!
21//! # Key Differences
22//!
23//! | Feature | `StatefulSupervisorSpec` | `SupervisorSpec` |
24//! |---------|--------------------------|------------------|
25//! | Worker Factory | `Fn(Arc<WorkerContext>) -> W` | `Fn() -> W` |
26//! | Shared State | ✅ Built-in `WorkerContext` | ❌ None |
27//! | Use Case | Coordinated workers | Independent workers |
28//! | Overhead | Slightly higher (context management) | Minimal |
29//!
30//! # Example: When to Use Stateful
31//!
32//! ```rust,no_run
33//! use ash_flare::{StatefulSupervisorSpec, StatefulSupervisorHandle, Worker, WorkerContext};
34//! use async_trait::async_trait;
35//! use std::sync::Arc;
36//!
37//! #[derive(Debug)]
38//! struct CounterWorker {
39//!     id: String,
40//!     context: Arc<WorkerContext>,
41//! }
42//!
43//! #[async_trait]
44//! impl Worker for CounterWorker {
45//!     type Error = std::io::Error;
46//!     
47//!     async fn run(&mut self) -> Result<(), Self::Error> {
48//!         // Workers can share and update counters
49//!         self.context.update("global_count", |v| {
50//!             let count = v.and_then(|v| v.as_u64()).unwrap_or(0);
51//!             Some(serde_json::json!(count + 1))
52//!         });
53//!         Ok(())
54//!     }
55//! }
56//!
57//! # #[tokio::main]
58//! # async fn main() {
59//! let spec = StatefulSupervisorSpec::new("counter-supervisor")
60//!     .with_worker("counter-1", |ctx| CounterWorker {
61//!         id: "counter-1".to_owned(),
62//!         context: ctx
63//!     }, ash_flare::RestartPolicy::Permanent);
64//!
65//! let handle = StatefulSupervisorHandle::start(spec);
66//! // Workers share state through the context
67//! # handle.shutdown().await.ok();
68//! # }
69//! ```
70
71use crate::restart::{RestartIntensity, RestartPolicy, RestartStrategy, RestartTracker};
72use crate::supervisor_common::{WorkerTermination, run_worker};
73use crate::types::{ChildExitReason, ChildId, ChildInfo, ChildType, WorkerContext};
74use crate::worker::Worker;
75use std::fmt;
76use std::sync::Arc;
77use tokio::sync::{mpsc, oneshot};
78use tokio::task::JoinHandle;
79
80// ============================================================================
81// Worker Specification (Stateful)
82// ============================================================================
83
84/// Specification for creating and restarting a stateful worker
85pub(crate) struct StatefulWorkerSpec<W: Worker> {
86    pub id: ChildId,
87    pub worker_factory: Arc<dyn Fn(Arc<WorkerContext>) -> W + Send + Sync>,
88    pub restart_policy: RestartPolicy,
89    pub context: Arc<WorkerContext>,
90}
91
92impl<W: Worker> Clone for StatefulWorkerSpec<W> {
93    fn clone(&self) -> Self {
94        Self {
95            id: self.id.clone(),
96            worker_factory: Arc::clone(&self.worker_factory),
97            restart_policy: self.restart_policy,
98            context: Arc::clone(&self.context),
99        }
100    }
101}
102
103impl<W: Worker> StatefulWorkerSpec<W> {
104    pub(crate) fn new(
105        id: impl Into<String>,
106        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
107        restart_policy: RestartPolicy,
108        context: Arc<WorkerContext>,
109    ) -> Self {
110        Self {
111            id: id.into(),
112            worker_factory: Arc::new(factory),
113            restart_policy,
114            context,
115        }
116    }
117
118    pub(crate) fn create_worker(&self) -> W {
119        (self.worker_factory)(Arc::clone(&self.context))
120    }
121}
122
123impl<W: Worker> fmt::Debug for StatefulWorkerSpec<W> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("StatefulWorkerSpec")
126            .field("id", &self.id)
127            .field("restart_policy", &self.restart_policy)
128            .finish_non_exhaustive()
129    }
130}
131
132// ============================================================================
133// Worker Process (Stateful)
134// ============================================================================
135
136/// Running stateful worker process with its specification and task handle
137pub(crate) struct StatefulWorkerProcess<W: Worker> {
138    pub spec: StatefulWorkerSpec<W>,
139    pub handle: Option<JoinHandle<()>>,
140}
141
142impl<W: Worker> StatefulWorkerProcess<W> {
143    pub(crate) fn spawn<Cmd>(
144        spec: StatefulWorkerSpec<W>,
145        supervisor_name: String,
146        control_tx: mpsc::UnboundedSender<Cmd>,
147    ) -> Self
148    where
149        Cmd: From<WorkerTermination> + Send + 'static,
150    {
151        let worker = spec.create_worker();
152        let worker_id = spec.id.clone();
153        let handle = tokio::spawn(async move {
154            run_worker(supervisor_name, worker_id, worker, control_tx, None).await;
155        });
156
157        Self {
158            spec,
159            handle: Some(handle),
160        }
161    }
162
163    /// Spawns a worker with linked initialization handshake
164    pub(crate) fn spawn_with_link<Cmd>(
165        spec: StatefulWorkerSpec<W>,
166        supervisor_name: String,
167        control_tx: mpsc::UnboundedSender<Cmd>,
168        init_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
169    ) -> Self
170    where
171        Cmd: From<WorkerTermination> + Send + 'static,
172    {
173        let worker = spec.create_worker();
174        let worker_id = spec.id.clone();
175        let handle = tokio::spawn(async move {
176            run_worker(
177                supervisor_name,
178                worker_id,
179                worker,
180                control_tx,
181                Some(init_tx),
182            )
183            .await;
184        });
185
186        Self {
187            spec,
188            handle: Some(handle),
189        }
190    }
191
192    pub(crate) async fn stop(&mut self) {
193        if let Some(handle) = self.handle.take() {
194            handle.abort();
195            // Ignoring result as handle may have already completed
196            drop(handle.await);
197        }
198    }
199}
200
201impl<W: Worker> Drop for StatefulWorkerProcess<W> {
202    fn drop(&mut self) {
203        if let Some(handle) = self.handle.take() {
204            handle.abort();
205        }
206    }
207}
208
209// ============================================================================
210// Supervisor Specification (Stateful)
211// ============================================================================
212
213/// Specification for a child (either worker or supervisor)
214pub(crate) enum StatefulChildSpec<W: Worker> {
215    Worker(StatefulWorkerSpec<W>),
216    Supervisor(Arc<StatefulSupervisorSpec<W>>),
217}
218
219impl<W: Worker> Clone for StatefulChildSpec<W> {
220    fn clone(&self) -> Self {
221        match self {
222            StatefulChildSpec::Worker(w) => StatefulChildSpec::Worker(w.clone()),
223            StatefulChildSpec::Supervisor(s) => StatefulChildSpec::Supervisor(Arc::clone(s)),
224        }
225    }
226}
227
228/// Describes a stateful supervisor and its children in a tree structure.
229pub struct StatefulSupervisorSpec<W: Worker> {
230    pub(crate) name: String,
231    pub(crate) children: Vec<StatefulChildSpec<W>>,
232    pub(crate) restart_strategy: RestartStrategy,
233    pub(crate) restart_intensity: RestartIntensity,
234    pub(crate) context: Arc<WorkerContext>,
235}
236
237impl<W: Worker> Clone for StatefulSupervisorSpec<W> {
238    fn clone(&self) -> Self {
239        Self {
240            name: self.name.clone(),
241            children: self.children.clone(),
242            restart_strategy: self.restart_strategy,
243            restart_intensity: self.restart_intensity,
244            context: Arc::clone(&self.context),
245        }
246    }
247}
248
249impl<W: Worker> StatefulSupervisorSpec<W> {
250    /// Creates a new stateful supervisor specification with the provided name.
251    /// Automatically initializes an empty WorkerContext for sharing state between workers.
252    pub fn new(name: impl Into<String>) -> Self {
253        Self {
254            name: name.into(),
255            children: Vec::new(),
256            restart_strategy: RestartStrategy::default(),
257            restart_intensity: RestartIntensity::default(),
258            context: Arc::new(WorkerContext::new()),
259        }
260    }
261
262    /// Sets the restart strategy for this supervisor.
263    pub fn with_restart_strategy(mut self, strategy: RestartStrategy) -> Self {
264        self.restart_strategy = strategy;
265        self
266    }
267
268    /// Sets the restart intensity for this supervisor.
269    pub fn with_restart_intensity(mut self, intensity: RestartIntensity) -> Self {
270        self.restart_intensity = intensity;
271        self
272    }
273
274    /// Adds a stateful worker child to this supervisor specification.
275    /// The factory function receives a `WorkerContext` parameter for accessing shared state.
276    pub fn with_worker(
277        mut self,
278        id: impl Into<String>,
279        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
280        restart_policy: RestartPolicy,
281    ) -> Self {
282        self.children
283            .push(StatefulChildSpec::Worker(StatefulWorkerSpec::new(
284                id,
285                factory,
286                restart_policy,
287                Arc::clone(&self.context),
288            )));
289        self
290    }
291
292    /// Adds a nested stateful supervisor child to this supervisor specification.
293    pub fn with_supervisor(mut self, supervisor: StatefulSupervisorSpec<W>) -> Self {
294        self.children
295            .push(StatefulChildSpec::Supervisor(Arc::new(supervisor)));
296        self
297    }
298
299    /// Returns a reference to the WorkerContext for this supervisor tree.
300    pub fn context(&self) -> &Arc<WorkerContext> {
301        &self.context
302    }
303}
304
305// ============================================================================
306// Child Management (Stateful)
307// ============================================================================
308
309/// Represents either a worker or a nested supervisor in the supervision tree
310pub(crate) enum StatefulChild<W: Worker> {
311    Worker(StatefulWorkerProcess<W>),
312    Supervisor {
313        handle: StatefulSupervisorHandle<W>,
314        spec: Arc<StatefulSupervisorSpec<W>>,
315    },
316}
317
318impl<W: Worker> StatefulChild<W> {
319    #[inline]
320    pub fn id(&self) -> &str {
321        match self {
322            StatefulChild::Worker(w) => &w.spec.id,
323            StatefulChild::Supervisor { spec, .. } => &spec.name,
324        }
325    }
326
327    #[inline]
328    pub fn child_type(&self) -> ChildType {
329        match self {
330            StatefulChild::Worker(_) => ChildType::Worker,
331            StatefulChild::Supervisor { .. } => ChildType::Supervisor,
332        }
333    }
334
335    #[inline]
336    pub fn restart_policy(&self) -> Option<RestartPolicy> {
337        match self {
338            StatefulChild::Worker(w) => Some(w.spec.restart_policy),
339            StatefulChild::Supervisor { .. } => Some(RestartPolicy::Permanent),
340        }
341    }
342
343    pub async fn shutdown(&mut self) {
344        match self {
345            StatefulChild::Worker(w) => w.stop().await,
346            StatefulChild::Supervisor { handle, .. } => {
347                let _ = handle.shutdown().await;
348            }
349        }
350    }
351}
352
353/// Holds information needed to restart a child after termination
354pub(crate) enum StatefulRestartInfo<W: Worker> {
355    Worker(StatefulWorkerSpec<W>),
356    Supervisor(Arc<StatefulSupervisorSpec<W>>),
357}
358
359// ============================================================================
360// Supervisor Error (Stateful)
361// ============================================================================
362
363/// Errors returned by stateful supervisor operations.
364#[derive(Debug)]
365pub enum StatefulSupervisorError {
366    /// Supervisor has no children
367    NoChildren(String),
368    /// All children have failed
369    AllChildrenFailed(String),
370    /// Supervisor is shutting down
371    ShuttingDown(String),
372    /// Child with this ID already exists
373    ChildAlreadyExists(String),
374    /// Child with this ID not found
375    ChildNotFound(String),
376    /// Child initialization failed
377    InitializationFailed {
378        /// ID of the child that failed to initialize
379        child_id: String,
380        /// Reason for initialization failure
381        reason: String,
382    },
383    /// Child initialization timed out
384    InitializationTimeout {
385        /// ID of the child that timed out
386        child_id: String,
387        /// Duration after which timeout occurred
388        timeout: std::time::Duration,
389    },
390}
391
392impl fmt::Display for StatefulSupervisorError {
393    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394        match self {
395            StatefulSupervisorError::NoChildren(name) => {
396                write!(f, "stateful supervisor '{}' has no children", name)
397            }
398            StatefulSupervisorError::AllChildrenFailed(name) => {
399                write!(
400                    f,
401                    "all children failed for stateful supervisor '{}' - restart intensity limit exceeded",
402                    name
403                )
404            }
405            StatefulSupervisorError::ShuttingDown(name) => {
406                write!(
407                    f,
408                    "stateful supervisor '{}' is shutting down - operation not permitted",
409                    name
410                )
411            }
412            StatefulSupervisorError::ChildAlreadyExists(id) => {
413                write!(
414                    f,
415                    "child with id '{}' already exists - use a unique identifier",
416                    id
417                )
418            }
419            StatefulSupervisorError::ChildNotFound(id) => {
420                write!(
421                    f,
422                    "child with id '{}' not found - it may have already terminated",
423                    id
424                )
425            }
426            StatefulSupervisorError::InitializationFailed { child_id, reason } => {
427                write!(f, "child '{child_id}' initialization failed: {reason}")
428            }
429            StatefulSupervisorError::InitializationTimeout { child_id, timeout } => {
430                write!(
431                    f,
432                    "child '{}' initialization timed out after {:?}",
433                    child_id, timeout
434                )
435            }
436        }
437    }
438}
439
440impl std::error::Error for StatefulSupervisorError {}
441
442// ============================================================================
443// Supervisor Runtime (Stateful)
444// ============================================================================
445
446/// Internal commands sent to stateful supervisor runtime
447pub(crate) enum StatefulSupervisorCommand<W: Worker> {
448    StartChild {
449        spec: StatefulWorkerSpec<W>,
450        respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
451    },
452    StartChildLinked {
453        spec: StatefulWorkerSpec<W>,
454        timeout: std::time::Duration,
455        respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
456    },
457    TerminateChild {
458        id: ChildId,
459        respond_to: oneshot::Sender<Result<(), StatefulSupervisorError>>,
460    },
461    WhichChildren {
462        respond_to: oneshot::Sender<Result<Vec<ChildInfo>, StatefulSupervisorError>>,
463    },
464    GetRestartStrategy {
465        respond_to: oneshot::Sender<RestartStrategy>,
466    },
467    GetUptime {
468        respond_to: oneshot::Sender<u64>,
469    },
470    ChildTerminated {
471        id: ChildId,
472        reason: ChildExitReason,
473    },
474    Shutdown,
475}
476
477impl<W: Worker> From<WorkerTermination> for StatefulSupervisorCommand<W> {
478    fn from(term: WorkerTermination) -> Self {
479        StatefulSupervisorCommand::ChildTerminated {
480            id: term.id,
481            reason: term.reason,
482        }
483    }
484}
485
486/// Internal state machine that manages stateful supervisor lifecycle and child processes
487pub(crate) struct StatefulSupervisorRuntime<W: Worker> {
488    name: String,
489    children: Vec<StatefulChild<W>>,
490    control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
491    control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
492    restart_strategy: RestartStrategy,
493    restart_tracker: RestartTracker,
494    created_at: std::time::Instant,
495}
496
497impl<W: Worker> StatefulSupervisorRuntime<W> {
498    pub(crate) fn new(
499        spec: StatefulSupervisorSpec<W>,
500        control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
501        control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
502    ) -> Self {
503        let mut children = Vec::with_capacity(spec.children.len());
504
505        for child_spec in spec.children {
506            match child_spec {
507                StatefulChildSpec::Worker(worker_spec) => {
508                    let worker = StatefulWorkerProcess::spawn(
509                        worker_spec,
510                        spec.name.clone(),
511                        control_tx.clone(),
512                    );
513                    children.push(StatefulChild::Worker(worker));
514                }
515                StatefulChildSpec::Supervisor(supervisor_spec) => {
516                    let supervisor = StatefulSupervisorHandle::start((*supervisor_spec).clone());
517                    children.push(StatefulChild::Supervisor {
518                        handle: supervisor,
519                        spec: Arc::clone(&supervisor_spec),
520                    });
521                }
522            }
523        }
524
525        Self {
526            name: spec.name,
527            children,
528            control_rx,
529            control_tx,
530            restart_strategy: spec.restart_strategy,
531            restart_tracker: RestartTracker::new(spec.restart_intensity),
532            created_at: std::time::Instant::now(),
533        }
534    }
535
536    pub(crate) async fn run(mut self) {
537        while let Some(command) = self.control_rx.recv().await {
538            match command {
539                StatefulSupervisorCommand::StartChild { spec, respond_to } => {
540                    let result = self.handle_start_child(spec).await;
541                    let _ = respond_to.send(result);
542                }
543                StatefulSupervisorCommand::StartChildLinked {
544                    spec,
545                    timeout,
546                    respond_to,
547                } => {
548                    let result = self.handle_start_child_linked(spec, timeout).await;
549                    let _ = respond_to.send(result);
550                }
551                StatefulSupervisorCommand::TerminateChild { id, respond_to } => {
552                    let result = self.handle_terminate_child(&id).await;
553                    let _ = respond_to.send(result);
554                }
555                StatefulSupervisorCommand::WhichChildren { respond_to } => {
556                    let result = self.handle_which_children();
557                    let _ = respond_to.send(result);
558                }
559                StatefulSupervisorCommand::GetRestartStrategy { respond_to } => {
560                    let _ = respond_to.send(self.restart_strategy);
561                }
562                StatefulSupervisorCommand::GetUptime { respond_to } => {
563                    let uptime = self.created_at.elapsed().as_secs();
564                    let _ = respond_to.send(uptime);
565                }
566                StatefulSupervisorCommand::ChildTerminated { id, reason } => {
567                    self.handle_child_terminated(id, reason).await;
568                }
569                StatefulSupervisorCommand::Shutdown => {
570                    self.shutdown_children().await;
571                    return;
572                }
573            }
574        }
575
576        self.shutdown_children().await;
577    }
578
579    async fn handle_start_child(
580        &mut self,
581        spec: StatefulWorkerSpec<W>,
582    ) -> Result<ChildId, StatefulSupervisorError> {
583        // Check if child with same ID already exists
584        if self.children.iter().any(|c| c.id() == spec.id) {
585            return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
586        }
587
588        let id = spec.id.clone();
589        let worker = StatefulWorkerProcess::spawn(spec, self.name.clone(), self.control_tx.clone());
590
591        self.children.push(StatefulChild::Worker(worker));
592        tracing::debug!(
593            supervisor = %self.name,
594            child = %id,
595            "dynamically started child"
596        );
597
598        Ok(id)
599    }
600
601    async fn handle_start_child_linked(
602        &mut self,
603        spec: StatefulWorkerSpec<W>,
604        timeout: std::time::Duration,
605    ) -> Result<ChildId, StatefulSupervisorError> {
606        // Check if child with same ID already exists
607        if self.children.iter().any(|c| c.id() == spec.id) {
608            return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
609        }
610
611        let id = spec.id.clone();
612        let (init_tx, init_rx) = oneshot::channel();
613
614        let worker = StatefulWorkerProcess::spawn_with_link(
615            spec,
616            self.name.clone(),
617            self.control_tx.clone(),
618            init_tx,
619        );
620
621        // Wait for initialization with timeout
622        let init_result = tokio::time::timeout(timeout, init_rx).await;
623
624        match init_result {
625            Ok(Ok(Ok(()))) => {
626                // Initialization succeeded
627                self.children.push(StatefulChild::Worker(worker));
628                tracing::debug!(
629                    supervisor = %self.name,
630                    child = %id,
631                    "linked child started successfully"
632                );
633                Ok(id)
634            }
635            Ok(Ok(Err(reason))) => {
636                // Initialization failed - worker sent error
637                tracing::error!(
638                    supervisor = %self.name,
639                    child = %id,
640                    reason = %reason,
641                    "linked child initialization failed"
642                );
643                // Note: init failures do NOT trigger restart policies
644                Err(StatefulSupervisorError::InitializationFailed {
645                    child_id: id,
646                    reason,
647                })
648            }
649            Ok(Err(_)) => {
650                // Channel closed - worker panicked before sending result
651                tracing::error!(
652                    supervisor = %self.name,
653                    child = %id,
654                    "linked child panicked during initialization"
655                );
656                Err(StatefulSupervisorError::InitializationFailed {
657                    child_id: id,
658                    reason: "worker panicked during initialization".to_owned(),
659                })
660            }
661            Err(_) => {
662                // Timeout
663                tracing::error!(
664                    supervisor = %self.name,
665                    child = %id,
666                    timeout_secs = ?timeout.as_secs(),
667                    "linked child initialization timed out"
668                );
669                Err(StatefulSupervisorError::InitializationTimeout {
670                    child_id: id,
671                    timeout,
672                })
673            }
674        }
675    }
676
677    async fn handle_terminate_child(&mut self, id: &str) -> Result<(), StatefulSupervisorError> {
678        let position = self
679            .children
680            .iter()
681            .position(|c| c.id() == id)
682            .ok_or_else(|| StatefulSupervisorError::ChildNotFound(id.to_owned()))?;
683
684        let mut child = self.children.remove(position);
685        child.shutdown().await;
686
687        tracing::debug!(
688            supervisor = %self.name,
689            child = %id,
690            "terminated child"
691        );
692        Ok(())
693    }
694
695    fn handle_which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
696        let info = self
697            .children
698            .iter()
699            .map(|child| ChildInfo {
700                id: child.id().to_owned(),
701                child_type: child.child_type(),
702                restart_policy: child.restart_policy(),
703            })
704            .collect();
705
706        Ok(info)
707    }
708
709    async fn handle_child_terminated(&mut self, id: ChildId, reason: ChildExitReason) {
710        tracing::debug!(
711            supervisor = %self.name,
712            child = %id,
713            reason = ?reason,
714            "child terminated"
715        );
716
717        let position = match self.children.iter().position(|c| c.id() == id) {
718            Some(pos) => pos,
719            None => {
720                tracing::warn!(
721                    supervisor = %self.name,
722                    child = %id,
723                    "terminated child not found in list"
724                );
725                return;
726            }
727        };
728
729        // Determine if we should restart based on policy and reason
730        let should_restart = match &self.children[position] {
731            StatefulChild::Worker(w) => match w.spec.restart_policy {
732                RestartPolicy::Permanent => true,
733                RestartPolicy::Temporary => false,
734                RestartPolicy::Transient => reason == ChildExitReason::Abnormal,
735            },
736            StatefulChild::Supervisor { .. } => true, // Supervisors are always permanent
737        };
738
739        if !should_restart {
740            tracing::debug!(
741                supervisor = %self.name,
742                child = %id,
743                policy = ?self.children[position].restart_policy(),
744                reason = ?reason,
745                "not restarting child"
746            );
747            self.children.remove(position);
748            return;
749        }
750
751        // Check restart intensity
752        if self.restart_tracker.record_restart() {
753            tracing::error!(
754                supervisor = %self.name,
755                "restart intensity exceeded, shutting down"
756            );
757            self.shutdown_children().await;
758            return;
759        }
760
761        // Apply restart strategy
762        match self.restart_strategy {
763            RestartStrategy::OneForOne => {
764                self.restart_child(position).await;
765            }
766            RestartStrategy::OneForAll => {
767                self.restart_all_children().await;
768            }
769            RestartStrategy::RestForOne => {
770                self.restart_from(position).await;
771            }
772        }
773    }
774
775    async fn restart_child(&mut self, position: usize) {
776        // Extract spec info before shutdown
777        let restart_info = match &self.children[position] {
778            StatefulChild::Worker(worker) => StatefulRestartInfo::Worker(worker.spec.clone()),
779            StatefulChild::Supervisor { spec, .. } => {
780                StatefulRestartInfo::Supervisor(Arc::clone(spec))
781            }
782        };
783
784        // Shutdown old child
785        self.children[position].shutdown().await;
786
787        // Restart based on type
788        match restart_info {
789            StatefulRestartInfo::Worker(spec) => {
790                tracing::debug!(
791                    supervisor = %self.name,
792                    worker = %spec.id,
793                    "restarting worker"
794                );
795                let new_worker = StatefulWorkerProcess::spawn(
796                    spec.clone(),
797                    self.name.clone(),
798                    self.control_tx.clone(),
799                );
800                self.children[position] = StatefulChild::Worker(new_worker);
801                tracing::debug!(
802                    supervisor = %self.name,
803                    worker = %spec.id,
804                    "worker restarted"
805                );
806            }
807            StatefulRestartInfo::Supervisor(spec) => {
808                let name = spec.name.clone();
809                tracing::debug!(
810                    supervisor = %self.name,
811                    child_supervisor = %name,
812                    "restarting supervisor"
813                );
814                let new_handle = StatefulSupervisorHandle::start((*spec).clone());
815                self.children[position] = StatefulChild::Supervisor {
816                    handle: new_handle,
817                    spec,
818                };
819                tracing::debug!(
820                    supervisor = %self.name,
821                    child_supervisor = %name,
822                    "supervisor restarted"
823                );
824            }
825        }
826    }
827
828    async fn restart_all_children(&mut self) {
829        tracing::debug!(
830            supervisor = %self.name,
831            "restarting all children (one_for_all)"
832        );
833
834        // Shutdown all children
835        for child in &mut self.children {
836            child.shutdown().await;
837        }
838
839        // Restart all worker children
840        for child in &mut self.children {
841            if let StatefulChild::Worker(worker) = child {
842                let spec = worker.spec.clone();
843                let new_worker = StatefulWorkerProcess::spawn(
844                    spec.clone(),
845                    self.name.clone(),
846                    self.control_tx.clone(),
847                );
848                *child = StatefulChild::Worker(new_worker);
849                tracing::debug!(
850                    supervisor = %self.name,
851                    child = %spec.id,
852                    "child restarted"
853                );
854            }
855        }
856    }
857
858    async fn restart_from(&mut self, position: usize) {
859        tracing::debug!(
860            supervisor = %self.name,
861            position = %position,
862            "restarting from position (rest_for_one)"
863        );
864
865        for i in position..self.children.len() {
866            self.children[i].shutdown().await;
867
868            if let StatefulChild::Worker(worker) = &self.children[i] {
869                let spec = worker.spec.clone();
870                let new_worker = StatefulWorkerProcess::spawn(
871                    spec.clone(),
872                    self.name.clone(),
873                    self.control_tx.clone(),
874                );
875                self.children[i] = StatefulChild::Worker(new_worker);
876                tracing::debug!(
877                    supervisor = %self.name,
878                    child = %spec.id,
879                    "child restarted"
880                );
881            }
882        }
883    }
884
885    async fn shutdown_children(&mut self) {
886        for child in self.children.drain(..) {
887            let id = child.id().to_owned();
888            let mut child = child;
889            child.shutdown().await;
890            tracing::debug!(
891                supervisor = %self.name,
892                child = %id,
893                "shut down child"
894            );
895        }
896    }
897}
898
899// ============================================================================
900// Supervisor Handle (Stateful)
901// ============================================================================
902
903/// Handle used to interact with a running stateful supervisor tree.
904#[derive(Clone)]
905pub struct StatefulSupervisorHandle<W: Worker> {
906    pub(crate) name: Arc<String>,
907    pub(crate) control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
908}
909
910impl<W: Worker> StatefulSupervisorHandle<W> {
911    /// Spawns a stateful supervisor tree based on the provided specification.
912    pub fn start(spec: StatefulSupervisorSpec<W>) -> Self {
913        let (control_tx, control_rx) = mpsc::unbounded_channel();
914        let name_arc = Arc::new(spec.name.clone());
915        let runtime = StatefulSupervisorRuntime::new(spec, control_rx, control_tx.clone());
916
917        let runtime_name = Arc::clone(&name_arc);
918        tokio::spawn(async move {
919            runtime.run().await;
920            tracing::debug!(name = %*runtime_name, "supervisor stopped");
921        });
922
923        Self {
924            name: name_arc,
925            control_tx,
926        }
927    }
928
929    /// Dynamically starts a new child worker
930    pub async fn start_child(
931        &self,
932        id: impl Into<String>,
933        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
934        restart_policy: RestartPolicy,
935        context: Arc<WorkerContext>,
936    ) -> Result<ChildId, StatefulSupervisorError> {
937        let (result_tx, result_rx) = oneshot::channel();
938        let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
939
940        self.control_tx
941            .send(StatefulSupervisorCommand::StartChild {
942                spec,
943                respond_to: result_tx,
944            })
945            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
946
947        result_rx
948            .await
949            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
950    }
951
952    /// Dynamically starts a new child worker with linked initialization.
953    ///
954    /// This method waits for the worker's initialization to complete before returning.
955    /// If initialization fails or times out, an error is returned and the worker is not added.
956    ///
957    /// # Arguments
958    ///
959    /// * `id` - Unique identifier for the child
960    /// * `factory` - Factory function to create the worker
961    /// * `restart_policy` - How to handle worker termination after it starts running
962    /// * `context` - Shared context for stateful workers
963    /// * `timeout` - Maximum time to wait for initialization
964    ///
965    /// # Errors
966    ///
967    /// * `StatefulSupervisorError::InitializationFailed` - Worker initialization returned an error
968    /// * `StatefulSupervisorError::InitializationTimeout` - Worker didn't initialize within timeout
969    /// * `StatefulSupervisorError::ChildAlreadyExists` - A child with this ID already exists
970    /// * `StatefulSupervisorError::ShuttingDown` - Supervisor is shutting down
971    ///
972    /// # Note
973    ///
974    /// Initialization failures do NOT trigger restart policies. The worker must successfully
975    /// initialize before restart policies take effect.
976    pub async fn start_child_linked(
977        &self,
978        id: impl Into<String>,
979        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
980        restart_policy: RestartPolicy,
981        context: Arc<WorkerContext>,
982        timeout: std::time::Duration,
983    ) -> Result<ChildId, StatefulSupervisorError> {
984        let (result_tx, result_rx) = oneshot::channel();
985        let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
986
987        self.control_tx
988            .send(StatefulSupervisorCommand::StartChildLinked {
989                spec,
990                timeout,
991                respond_to: result_tx,
992            })
993            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
994
995        result_rx
996            .await
997            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
998    }
999
1000    /// Dynamically terminates a child
1001    pub async fn terminate_child(&self, id: &str) -> Result<(), StatefulSupervisorError> {
1002        let (result_tx, result_rx) = oneshot::channel();
1003
1004        self.control_tx
1005            .send(StatefulSupervisorCommand::TerminateChild {
1006                id: id.to_owned(),
1007                respond_to: result_tx,
1008            })
1009            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1010
1011        result_rx
1012            .await
1013            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1014    }
1015
1016    /// Returns information about all children
1017    pub async fn which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
1018        let (result_tx, result_rx) = oneshot::channel();
1019
1020        self.control_tx
1021            .send(StatefulSupervisorCommand::WhichChildren {
1022                respond_to: result_tx,
1023            })
1024            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1025
1026        result_rx
1027            .await
1028            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1029    }
1030
1031    /// Requests a graceful shutdown of the supervisor tree.
1032    pub async fn shutdown(&self) -> Result<(), StatefulSupervisorError> {
1033        self.control_tx
1034            .send(StatefulSupervisorCommand::Shutdown)
1035            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1036        Ok(())
1037    }
1038
1039    /// Returns the supervisor's name.
1040    pub fn name(&self) -> &str {
1041        self.name.as_str()
1042    }
1043
1044    /// Returns the supervisor's restart strategy.
1045    pub async fn restart_strategy(&self) -> Result<RestartStrategy, StatefulSupervisorError> {
1046        let (result_tx, result_rx) = oneshot::channel();
1047
1048        self.control_tx
1049            .send(StatefulSupervisorCommand::GetRestartStrategy {
1050                respond_to: result_tx,
1051            })
1052            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1053
1054        result_rx
1055            .await
1056            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1057    }
1058
1059    /// Returns the supervisor's uptime in seconds.
1060    pub async fn uptime(&self) -> Result<u64, StatefulSupervisorError> {
1061        let (result_tx, result_rx) = oneshot::channel();
1062
1063        self.control_tx
1064            .send(StatefulSupervisorCommand::GetUptime {
1065                respond_to: result_tx,
1066            })
1067            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1068
1069        result_rx
1070            .await
1071            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1072    }
1073}