Skip to main content

ash_flare/
supervisor_stateful.rs

1//! Stateful supervisor with shared in-memory key-value store
2//!
3//! This module provides a parallel implementation of the supervisor tree with built-in
4//! shared state via `WorkerContext`. Workers receive the context as a parameter in their
5//! factory functions, allowing them to share data through a concurrent in-memory store.
6//!
7//! # Choosing Between Stateful and Regular Supervisors
8//!
9//! Use [`StatefulSupervisorSpec`] when:
10//! - Workers need to share state (counters, caches, configuration)
11//! - You need coordination between workers (flags, semaphores)
12//! - State should survive worker restarts
13//! - You want built-in concurrency-safe storage without external dependencies
14//!
15//! Use [`SupervisorSpec`](crate::SupervisorSpec) when:
16//! - Workers are stateless or manage their own state independently
17//! - No data sharing is required between workers
18//! - You want minimal overhead (no shared context)
19//! - Workers communicate through channels or external systems
20//!
21//! # Key Differences
22//!
23//! | Feature | `StatefulSupervisorSpec` | `SupervisorSpec` |
24//! |---------|--------------------------|------------------|
25//! | Worker Factory | `Fn(Arc<WorkerContext>) -> W` | `Fn() -> W` |
26//! | Shared State | ✅ Built-in `WorkerContext` | ❌ None |
27//! | Use Case | Coordinated workers | Independent workers |
28//! | Overhead | Slightly higher (context management) | Minimal |
29//!
30//! # Example: When to Use Stateful
31//!
32//! ```rust,no_run
33//! use ash_flare::{StatefulSupervisorSpec, StatefulSupervisorHandle, Worker, WorkerContext};
34//! use async_trait::async_trait;
35//! use std::sync::Arc;
36//!
37//! #[derive(Debug)]
38//! struct CounterWorker {
39//!     id: String,
40//!     context: Arc<WorkerContext>,
41//! }
42//!
43//! #[async_trait]
44//! impl Worker for CounterWorker {
45//!     type Error = std::io::Error;
46//!     
47//!     async fn run(&mut self) -> Result<(), Self::Error> {
48//!         // Workers can share and update counters
49//!         self.context.update("global_count", |v| {
50//!             let count = v.and_then(|v| v.as_u64()).unwrap_or(0);
51//!             Some(serde_json::json!(count + 1))
52//!         });
53//!         Ok(())
54//!     }
55//! }
56//!
57//! # #[tokio::main]
58//! # async fn main() {
59//! let spec = StatefulSupervisorSpec::new("counter-supervisor")
60//!     .with_worker("counter-1", |ctx| CounterWorker {
61//!         id: "counter-1".to_owned(),
62//!         context: ctx
63//!     }, ash_flare::RestartPolicy::Permanent);
64//!
65//! let handle = StatefulSupervisorHandle::start(spec);
66//! // Workers share state through the context
67//! # handle.shutdown().await.ok();
68//! # }
69//! ```
70
71use crate::restart::{RestartIntensity, RestartPolicy, RestartStrategy, RestartTracker};
72use crate::supervisor_common::{WorkerTermination, run_worker};
73use crate::types::{ChildExitReason, ChildId, ChildInfo, ChildType, WorkerContext};
74use crate::worker::Worker;
75use std::fmt;
76use std::sync::Arc;
77use tokio::sync::{mpsc, oneshot};
78use tokio::task::JoinHandle;
79
80// ============================================================================
81// Worker Specification (Stateful)
82// ============================================================================
83
84/// Specification for creating and restarting a stateful worker
85pub(crate) struct StatefulWorkerSpec<W: Worker> {
86    pub id: ChildId,
87    pub worker_factory: Arc<dyn Fn(Arc<WorkerContext>) -> W + Send + Sync>,
88    pub restart_policy: RestartPolicy,
89    pub context: Arc<WorkerContext>,
90}
91
92impl<W: Worker> Clone for StatefulWorkerSpec<W> {
93    fn clone(&self) -> Self {
94        Self {
95            id: self.id.clone(),
96            worker_factory: Arc::clone(&self.worker_factory),
97            restart_policy: self.restart_policy,
98            context: Arc::clone(&self.context),
99        }
100    }
101}
102
103impl<W: Worker> StatefulWorkerSpec<W> {
104    pub(crate) fn new(
105        id: impl Into<String>,
106        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
107        restart_policy: RestartPolicy,
108        context: Arc<WorkerContext>,
109    ) -> Self {
110        Self {
111            id: id.into(),
112            worker_factory: Arc::new(factory),
113            restart_policy,
114            context,
115        }
116    }
117
118    pub(crate) fn create_worker(&self) -> W {
119        (self.worker_factory)(Arc::clone(&self.context))
120    }
121}
122
123impl<W: Worker> fmt::Debug for StatefulWorkerSpec<W> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("StatefulWorkerSpec")
126            .field("id", &self.id)
127            .field("restart_policy", &self.restart_policy)
128            .finish_non_exhaustive()
129    }
130}
131
132// ============================================================================
133// Worker Process (Stateful)
134// ============================================================================
135
136/// Running stateful worker process with its specification and task handle
137pub(crate) struct StatefulWorkerProcess<W: Worker> {
138    pub spec: StatefulWorkerSpec<W>,
139    pub handle: Option<JoinHandle<()>>,
140}
141
142impl<W: Worker> StatefulWorkerProcess<W> {
143    pub(crate) fn spawn<Cmd>(
144        spec: StatefulWorkerSpec<W>,
145        supervisor_name: String,
146        control_tx: mpsc::UnboundedSender<Cmd>,
147    ) -> Self
148    where
149        Cmd: From<WorkerTermination> + Send + 'static,
150    {
151        let worker = spec.create_worker();
152        let worker_id = spec.id.clone();
153        let handle = tokio::spawn(async move {
154            run_worker(supervisor_name, worker_id, worker, control_tx, None).await;
155        });
156
157        Self {
158            spec,
159            handle: Some(handle),
160        }
161    }
162
163    /// Spawns a worker with linked initialization handshake
164    pub(crate) fn spawn_with_link<Cmd>(
165        spec: StatefulWorkerSpec<W>,
166        supervisor_name: String,
167        control_tx: mpsc::UnboundedSender<Cmd>,
168        init_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
169    ) -> Self
170    where
171        Cmd: From<WorkerTermination> + Send + 'static,
172    {
173        let worker = spec.create_worker();
174        let worker_id = spec.id.clone();
175        let handle = tokio::spawn(async move {
176            run_worker(
177                supervisor_name,
178                worker_id,
179                worker,
180                control_tx,
181                Some(init_tx),
182            )
183            .await;
184        });
185
186        Self {
187            spec,
188            handle: Some(handle),
189        }
190    }
191
192    pub(crate) async fn stop(&mut self) {
193        if let Some(handle) = self.handle.take() {
194            handle.abort();
195            // Ignoring result as handle may have already completed
196            drop(handle.await);
197        }
198    }
199}
200
201impl<W: Worker> Drop for StatefulWorkerProcess<W> {
202    fn drop(&mut self) {
203        if let Some(handle) = self.handle.take() {
204            handle.abort();
205        }
206    }
207}
208
209// ============================================================================
210// Supervisor Specification (Stateful)
211// ============================================================================
212
213/// Specification for a child (either worker or supervisor)
214pub(crate) enum StatefulChildSpec<W: Worker> {
215    Worker(StatefulWorkerSpec<W>),
216    Supervisor(Arc<StatefulSupervisorSpec<W>>),
217}
218
219impl<W: Worker> Clone for StatefulChildSpec<W> {
220    fn clone(&self) -> Self {
221        match self {
222            StatefulChildSpec::Worker(w) => StatefulChildSpec::Worker(w.clone()),
223            StatefulChildSpec::Supervisor(s) => StatefulChildSpec::Supervisor(Arc::clone(s)),
224        }
225    }
226}
227
228/// Describes a stateful supervisor and its children in a tree structure.
229pub struct StatefulSupervisorSpec<W: Worker> {
230    pub(crate) name: String,
231    pub(crate) children: Vec<StatefulChildSpec<W>>,
232    pub(crate) restart_strategy: RestartStrategy,
233    pub(crate) restart_intensity: RestartIntensity,
234    pub(crate) context: Arc<WorkerContext>,
235}
236
237impl<W: Worker> Clone for StatefulSupervisorSpec<W> {
238    fn clone(&self) -> Self {
239        Self {
240            name: self.name.clone(),
241            children: self.children.clone(),
242            restart_strategy: self.restart_strategy,
243            restart_intensity: self.restart_intensity,
244            context: Arc::clone(&self.context),
245        }
246    }
247}
248
249impl<W: Worker> StatefulSupervisorSpec<W> {
250    /// Creates a new stateful supervisor specification with the provided name.
251    /// Automatically initializes an empty `WorkerContext` for sharing state between workers.
252    pub fn new(name: impl Into<String>) -> Self {
253        Self {
254            name: name.into(),
255            children: Vec::new(),
256            restart_strategy: RestartStrategy::default(),
257            restart_intensity: RestartIntensity::default(),
258            context: Arc::new(WorkerContext::new()),
259        }
260    }
261
262    /// Sets the restart strategy for this supervisor.
263    #[must_use]
264    pub fn with_restart_strategy(mut self, strategy: RestartStrategy) -> Self {
265        self.restart_strategy = strategy;
266        self
267    }
268
269    /// Sets the restart intensity for this supervisor.
270    #[must_use]
271    pub fn with_restart_intensity(mut self, intensity: RestartIntensity) -> Self {
272        self.restart_intensity = intensity;
273        self
274    }
275
276    /// Adds a stateful worker child to this supervisor specification.
277    /// The factory function receives a `WorkerContext` parameter for accessing shared state.
278    #[must_use]
279    pub fn with_worker(
280        mut self,
281        id: impl Into<String>,
282        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
283        restart_policy: RestartPolicy,
284    ) -> Self {
285        self.children
286            .push(StatefulChildSpec::Worker(StatefulWorkerSpec::new(
287                id,
288                factory,
289                restart_policy,
290                Arc::clone(&self.context),
291            )));
292        self
293    }
294
295    /// Adds a nested stateful supervisor child to this supervisor specification.
296    #[must_use]
297    pub fn with_supervisor(mut self, supervisor: StatefulSupervisorSpec<W>) -> Self {
298        self.children
299            .push(StatefulChildSpec::Supervisor(Arc::new(supervisor)));
300        self
301    }
302
303    /// Returns a reference to the `WorkerContext` for this supervisor tree.
304    #[must_use]
305    pub fn context(&self) -> &Arc<WorkerContext> {
306        &self.context
307    }
308}
309
310// ============================================================================
311// Child Management (Stateful)
312// ============================================================================
313
314/// Represents either a worker or a nested supervisor in the supervision tree
315pub(crate) enum StatefulChild<W: Worker> {
316    Worker(StatefulWorkerProcess<W>),
317    Supervisor {
318        handle: StatefulSupervisorHandle<W>,
319        spec: Arc<StatefulSupervisorSpec<W>>,
320    },
321}
322
323impl<W: Worker> StatefulChild<W> {
324    #[inline]
325    pub fn id(&self) -> &str {
326        match self {
327            StatefulChild::Worker(w) => &w.spec.id,
328            StatefulChild::Supervisor { spec, .. } => &spec.name,
329        }
330    }
331
332    #[inline]
333    pub fn child_type(&self) -> ChildType {
334        match self {
335            StatefulChild::Worker(_) => ChildType::Worker,
336            StatefulChild::Supervisor { .. } => ChildType::Supervisor,
337        }
338    }
339
340    #[inline]
341    #[allow(clippy::unnecessary_wraps)]
342    pub fn restart_policy(&self) -> Option<RestartPolicy> {
343        match self {
344            StatefulChild::Worker(w) => Some(w.spec.restart_policy),
345            StatefulChild::Supervisor { .. } => Some(RestartPolicy::Permanent),
346        }
347    }
348
349    pub async fn shutdown(&mut self) {
350        match self {
351            StatefulChild::Worker(w) => w.stop().await,
352            StatefulChild::Supervisor { handle, .. } => {
353                let _shutdown_result = handle.shutdown().await;
354            }
355        }
356    }
357}
358
359/// Holds information needed to restart a child after termination
360pub(crate) enum StatefulRestartInfo<W: Worker> {
361    Worker(StatefulWorkerSpec<W>),
362    Supervisor(Arc<StatefulSupervisorSpec<W>>),
363}
364
365// ============================================================================
366// Supervisor Error (Stateful)
367// ============================================================================
368
369/// Errors returned by stateful supervisor operations.
370#[derive(Debug)]
371pub enum StatefulSupervisorError {
372    /// Supervisor has no children
373    NoChildren(String),
374    /// All children have failed
375    AllChildrenFailed(String),
376    /// Supervisor is shutting down
377    ShuttingDown(String),
378    /// Child with this ID already exists
379    ChildAlreadyExists(String),
380    /// Child with this ID not found
381    ChildNotFound(String),
382    /// Child initialization failed
383    InitializationFailed {
384        /// ID of the child that failed to initialize
385        child_id: String,
386        /// Reason for initialization failure
387        reason: String,
388    },
389    /// Child initialization timed out
390    InitializationTimeout {
391        /// ID of the child that timed out
392        child_id: String,
393        /// Duration after which timeout occurred
394        timeout: std::time::Duration,
395    },
396}
397
398impl fmt::Display for StatefulSupervisorError {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        match self {
401            StatefulSupervisorError::NoChildren(name) => {
402                write!(f, "stateful supervisor '{name}' has no children")
403            }
404            StatefulSupervisorError::AllChildrenFailed(name) => {
405                write!(
406                    f,
407                    "all children failed for stateful supervisor '{name}' - restart intensity limit exceeded"
408                )
409            }
410            StatefulSupervisorError::ShuttingDown(name) => {
411                write!(
412                    f,
413                    "stateful supervisor '{name}' is shutting down - operation not permitted"
414                )
415            }
416            StatefulSupervisorError::ChildAlreadyExists(id) => {
417                write!(
418                    f,
419                    "child with id '{id}' already exists - use a unique identifier"
420                )
421            }
422            StatefulSupervisorError::ChildNotFound(id) => {
423                write!(
424                    f,
425                    "child with id '{id}' not found - it may have already terminated"
426                )
427            }
428            StatefulSupervisorError::InitializationFailed { child_id, reason } => {
429                write!(f, "child '{child_id}' initialization failed: {reason}")
430            }
431            StatefulSupervisorError::InitializationTimeout { child_id, timeout } => {
432                write!(
433                    f,
434                    "child '{child_id}' initialization timed out after {timeout:?}"
435                )
436            }
437        }
438    }
439}
440
441impl std::error::Error for StatefulSupervisorError {}
442
443// ============================================================================
444// Supervisor Runtime (Stateful)
445// ============================================================================
446
447/// Internal commands sent to stateful supervisor runtime
448pub(crate) enum StatefulSupervisorCommand<W: Worker> {
449    StartChild {
450        spec: StatefulWorkerSpec<W>,
451        respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
452    },
453    StartChildLinked {
454        spec: StatefulWorkerSpec<W>,
455        timeout: std::time::Duration,
456        respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
457    },
458    TerminateChild {
459        id: ChildId,
460        respond_to: oneshot::Sender<Result<(), StatefulSupervisorError>>,
461    },
462    WhichChildren {
463        respond_to: oneshot::Sender<Result<Vec<ChildInfo>, StatefulSupervisorError>>,
464    },
465    GetRestartStrategy {
466        respond_to: oneshot::Sender<RestartStrategy>,
467    },
468    GetUptime {
469        respond_to: oneshot::Sender<u64>,
470    },
471    ChildTerminated {
472        id: ChildId,
473        reason: ChildExitReason,
474    },
475    Shutdown,
476}
477
478impl<W: Worker> From<WorkerTermination> for StatefulSupervisorCommand<W> {
479    fn from(term: WorkerTermination) -> Self {
480        StatefulSupervisorCommand::ChildTerminated {
481            id: term.id,
482            reason: term.reason,
483        }
484    }
485}
486
487/// Internal state machine that manages stateful supervisor lifecycle and child processes
488pub(crate) struct StatefulSupervisorRuntime<W: Worker> {
489    name: String,
490    children: Vec<StatefulChild<W>>,
491    control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
492    control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
493    restart_strategy: RestartStrategy,
494    restart_tracker: RestartTracker,
495    created_at: std::time::Instant,
496}
497
498impl<W: Worker> StatefulSupervisorRuntime<W> {
499    pub(crate) fn new(
500        spec: StatefulSupervisorSpec<W>,
501        control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
502        control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
503    ) -> Self {
504        let mut children = Vec::with_capacity(spec.children.len());
505
506        for child_spec in spec.children {
507            match child_spec {
508                StatefulChildSpec::Worker(worker_spec) => {
509                    let worker = StatefulWorkerProcess::spawn(
510                        worker_spec,
511                        spec.name.clone(),
512                        control_tx.clone(),
513                    );
514                    children.push(StatefulChild::Worker(worker));
515                }
516                StatefulChildSpec::Supervisor(supervisor_spec) => {
517                    let supervisor = StatefulSupervisorHandle::start((*supervisor_spec).clone());
518                    children.push(StatefulChild::Supervisor {
519                        handle: supervisor,
520                        spec: Arc::clone(&supervisor_spec),
521                    });
522                }
523            }
524        }
525
526        Self {
527            name: spec.name,
528            children,
529            control_rx,
530            control_tx,
531            restart_strategy: spec.restart_strategy,
532            restart_tracker: RestartTracker::new(spec.restart_intensity),
533            created_at: std::time::Instant::now(),
534        }
535    }
536
537    pub(crate) async fn run(mut self) {
538        while let Some(command) = self.control_rx.recv().await {
539            match command {
540                StatefulSupervisorCommand::StartChild { spec, respond_to } => {
541                    let result = self.handle_start_child(spec);
542                    let _send = respond_to.send(result);
543                }
544                StatefulSupervisorCommand::StartChildLinked {
545                    spec,
546                    timeout,
547                    respond_to,
548                } => {
549                    let result = self.handle_start_child_linked(spec, timeout).await;
550                    let _send = respond_to.send(result);
551                }
552                StatefulSupervisorCommand::TerminateChild { id, respond_to } => {
553                    let result = self.handle_terminate_child(&id).await;
554                    let _send = respond_to.send(result);
555                }
556                StatefulSupervisorCommand::WhichChildren { respond_to } => {
557                    let result = self.handle_which_children();
558                    let _send = respond_to.send(result);
559                }
560                StatefulSupervisorCommand::GetRestartStrategy { respond_to } => {
561                    let _send = respond_to.send(self.restart_strategy);
562                }
563                StatefulSupervisorCommand::GetUptime { respond_to } => {
564                    let uptime = self.created_at.elapsed().as_secs();
565                    let _send = respond_to.send(uptime);
566                }
567                StatefulSupervisorCommand::ChildTerminated { id, reason } => {
568                    self.handle_child_terminated(id, reason).await;
569                }
570                StatefulSupervisorCommand::Shutdown => {
571                    self.shutdown_children().await;
572                    return;
573                }
574            }
575        }
576
577        self.shutdown_children().await;
578    }
579
580    fn handle_start_child(
581        &mut self,
582        spec: StatefulWorkerSpec<W>,
583    ) -> Result<ChildId, StatefulSupervisorError> {
584        // Check if child with same ID already exists
585        if self.children.iter().any(|c| c.id() == spec.id) {
586            return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
587        }
588
589        let id = spec.id.clone();
590        let worker = StatefulWorkerProcess::spawn(spec, self.name.clone(), self.control_tx.clone());
591
592        self.children.push(StatefulChild::Worker(worker));
593        tracing::debug!(
594            supervisor = %self.name,
595            child = %id,
596            "dynamically started child"
597        );
598
599        Ok(id)
600    }
601
602    async fn handle_start_child_linked(
603        &mut self,
604        spec: StatefulWorkerSpec<W>,
605        timeout: std::time::Duration,
606    ) -> Result<ChildId, StatefulSupervisorError> {
607        // Check if child with same ID already exists
608        if self.children.iter().any(|c| c.id() == spec.id) {
609            return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
610        }
611
612        let id = spec.id.clone();
613        let (init_tx, init_rx) = oneshot::channel();
614
615        let worker = StatefulWorkerProcess::spawn_with_link(
616            spec,
617            self.name.clone(),
618            self.control_tx.clone(),
619            init_tx,
620        );
621
622        // Wait for initialization with timeout
623        let init_result = tokio::time::timeout(timeout, init_rx).await;
624
625        match init_result {
626            Ok(Ok(Ok(()))) => {
627                // Initialization succeeded
628                self.children.push(StatefulChild::Worker(worker));
629                tracing::debug!(
630                    supervisor = %self.name,
631                    child = %id,
632                    "linked child started successfully"
633                );
634                Ok(id)
635            }
636            Ok(Ok(Err(reason))) => {
637                // Initialization failed - worker sent error
638                tracing::error!(
639                    supervisor = %self.name,
640                    child = %id,
641                    reason = %reason,
642                    "linked child initialization failed"
643                );
644                // Note: init failures do NOT trigger restart policies
645                Err(StatefulSupervisorError::InitializationFailed {
646                    child_id: id,
647                    reason,
648                })
649            }
650            Ok(Err(_)) => {
651                // Channel closed - worker panicked before sending result
652                tracing::error!(
653                    supervisor = %self.name,
654                    child = %id,
655                    "linked child panicked during initialization"
656                );
657                Err(StatefulSupervisorError::InitializationFailed {
658                    child_id: id,
659                    reason: "worker panicked during initialization".to_owned(),
660                })
661            }
662            Err(_) => {
663                // Timeout
664                tracing::error!(
665                    supervisor = %self.name,
666                    child = %id,
667                    timeout_secs = ?timeout.as_secs(),
668                    "linked child initialization timed out"
669                );
670                Err(StatefulSupervisorError::InitializationTimeout {
671                    child_id: id,
672                    timeout,
673                })
674            }
675        }
676    }
677
678    async fn handle_terminate_child(&mut self, id: &str) -> Result<(), StatefulSupervisorError> {
679        let position = self
680            .children
681            .iter()
682            .position(|c| c.id() == id)
683            .ok_or_else(|| StatefulSupervisorError::ChildNotFound(id.to_owned()))?;
684
685        let mut child = self.children.remove(position);
686        child.shutdown().await;
687
688        tracing::debug!(
689            supervisor = %self.name,
690            child = %id,
691            "terminated child"
692        );
693        Ok(())
694    }
695
696    #[allow(clippy::unnecessary_wraps)]
697    fn handle_which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
698        let info = self
699            .children
700            .iter()
701            .map(|child| ChildInfo {
702                id: child.id().to_owned(),
703                child_type: child.child_type(),
704                restart_policy: child.restart_policy(),
705            })
706            .collect();
707
708        Ok(info)
709    }
710
711    #[allow(clippy::indexing_slicing)]
712    async fn handle_child_terminated(&mut self, id: ChildId, reason: ChildExitReason) {
713        tracing::debug!(
714            supervisor = %self.name,
715            child = %id,
716            reason = ?reason,
717            "child terminated"
718        );
719
720        let Some(position) = self.children.iter().position(|c| c.id() == id) else {
721            tracing::warn!(
722                supervisor = %self.name,
723                child = %id,
724                "terminated child not found in list"
725            );
726            return;
727        };
728
729        // Determine if we should restart based on policy and reason
730        let should_restart = match &self.children[position] {
731            StatefulChild::Worker(w) => match w.spec.restart_policy {
732                RestartPolicy::Permanent => true,
733                RestartPolicy::Temporary => false,
734                RestartPolicy::Transient => reason == ChildExitReason::Abnormal,
735            },
736            StatefulChild::Supervisor { .. } => true, // Supervisors are always permanent
737        };
738
739        if !should_restart {
740            tracing::debug!(
741                supervisor = %self.name,
742                child = %id,
743                policy = ?self.children[position].restart_policy(),
744                reason = ?reason,
745                "not restarting child"
746            );
747            self.children.remove(position);
748            return;
749        }
750
751        // Check restart intensity
752        if self.restart_tracker.record_restart() {
753            tracing::error!(
754                supervisor = %self.name,
755                "restart intensity exceeded, shutting down"
756            );
757            self.shutdown_children().await;
758            return;
759        }
760
761        // Apply restart strategy
762        match self.restart_strategy {
763            RestartStrategy::OneForOne => {
764                self.restart_child(position).await;
765            }
766            RestartStrategy::OneForAll => {
767                self.restart_all_children().await;
768            }
769            RestartStrategy::RestForOne => {
770                self.restart_from(position).await;
771            }
772        }
773    }
774
775    #[allow(clippy::indexing_slicing)]
776    async fn restart_child(&mut self, position: usize) {
777        // Extract spec info before shutdown
778        let restart_info = match &self.children[position] {
779            StatefulChild::Worker(worker) => StatefulRestartInfo::Worker(worker.spec.clone()),
780            StatefulChild::Supervisor { spec, .. } => {
781                StatefulRestartInfo::Supervisor(Arc::clone(spec))
782            }
783        };
784
785        // Shutdown old child
786        self.children[position].shutdown().await;
787
788        // Restart based on type
789        match restart_info {
790            StatefulRestartInfo::Worker(spec) => {
791                tracing::debug!(
792                    supervisor = %self.name,
793                    worker = %spec.id,
794                    "restarting worker"
795                );
796                let new_worker = StatefulWorkerProcess::spawn(
797                    spec.clone(),
798                    self.name.clone(),
799                    self.control_tx.clone(),
800                );
801                self.children[position] = StatefulChild::Worker(new_worker);
802                tracing::debug!(
803                    supervisor = %self.name,
804                    worker = %spec.id,
805                    "worker restarted"
806                );
807            }
808            StatefulRestartInfo::Supervisor(spec) => {
809                let name = spec.name.clone();
810                tracing::debug!(
811                    supervisor = %self.name,
812                    child_supervisor = %name,
813                    "restarting supervisor"
814                );
815                let new_handle = StatefulSupervisorHandle::start((*spec).clone());
816                self.children[position] = StatefulChild::Supervisor {
817                    handle: new_handle,
818                    spec,
819                };
820                tracing::debug!(
821                    supervisor = %self.name,
822                    child_supervisor = %name,
823                    "supervisor restarted"
824                );
825            }
826        }
827    }
828
829    async fn restart_all_children(&mut self) {
830        tracing::debug!(
831            supervisor = %self.name,
832            "restarting all children (one_for_all)"
833        );
834
835        // Shutdown all children
836        for child in &mut self.children {
837            child.shutdown().await;
838        }
839
840        // Restart all worker children
841        for child in &mut self.children {
842            if let StatefulChild::Worker(worker) = child {
843                let spec = worker.spec.clone();
844                let new_worker = StatefulWorkerProcess::spawn(
845                    spec.clone(),
846                    self.name.clone(),
847                    self.control_tx.clone(),
848                );
849                *child = StatefulChild::Worker(new_worker);
850                tracing::debug!(
851                    supervisor = %self.name,
852                    child = %spec.id,
853                    "child restarted"
854                );
855            }
856        }
857    }
858
859    #[allow(clippy::indexing_slicing)]
860    async fn restart_from(&mut self, position: usize) {
861        tracing::debug!(
862            supervisor = %self.name,
863            position = %position,
864            "restarting from position (rest_for_one)"
865        );
866
867        for i in position..self.children.len() {
868            self.children[i].shutdown().await;
869
870            if let StatefulChild::Worker(worker) = &self.children[i] {
871                let spec = worker.spec.clone();
872                let new_worker = StatefulWorkerProcess::spawn(
873                    spec.clone(),
874                    self.name.clone(),
875                    self.control_tx.clone(),
876                );
877                self.children[i] = StatefulChild::Worker(new_worker);
878                tracing::debug!(
879                    supervisor = %self.name,
880                    child = %spec.id,
881                    "child restarted"
882                );
883            }
884        }
885    }
886
887    async fn shutdown_children(&mut self) {
888        for mut child in self.children.drain(..) {
889            let id = child.id().to_owned();
890            child.shutdown().await;
891            tracing::debug!(
892                supervisor = %self.name,
893                child = %id,
894                "shut down child"
895            );
896        }
897    }
898}
899
900// ============================================================================
901// Supervisor Handle (Stateful)
902// ============================================================================
903
904/// Handle used to interact with a running stateful supervisor tree.
905#[derive(Clone)]
906pub struct StatefulSupervisorHandle<W: Worker> {
907    pub(crate) name: Arc<String>,
908    pub(crate) control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
909}
910
911impl<W: Worker> StatefulSupervisorHandle<W> {
912    /// Spawns a stateful supervisor tree based on the provided specification.
913    #[must_use]
914    pub fn start(spec: StatefulSupervisorSpec<W>) -> Self {
915        let (control_tx, control_rx) = mpsc::unbounded_channel();
916        let name_arc = Arc::new(spec.name.clone());
917        let runtime = StatefulSupervisorRuntime::new(spec, control_rx, control_tx.clone());
918
919        let runtime_name = Arc::clone(&name_arc);
920        tokio::spawn(async move {
921            runtime.run().await;
922            tracing::debug!(name = %*runtime_name, "supervisor stopped");
923        });
924
925        Self {
926            name: name_arc,
927            control_tx,
928        }
929    }
930
931    /// Dynamically starts a new child worker
932    ///
933    /// # Errors
934    ///
935    /// Returns an error if the supervisor is shutting down or a child with this ID already exists.
936    pub async fn start_child(
937        &self,
938        id: impl Into<String>,
939        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
940        restart_policy: RestartPolicy,
941        context: Arc<WorkerContext>,
942    ) -> Result<ChildId, StatefulSupervisorError> {
943        let (result_tx, result_rx) = oneshot::channel();
944        let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
945
946        self.control_tx
947            .send(StatefulSupervisorCommand::StartChild {
948                spec,
949                respond_to: result_tx,
950            })
951            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
952
953        result_rx
954            .await
955            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
956    }
957
958    /// Dynamically starts a new child worker with linked initialization.
959    ///
960    /// This method waits for the worker's initialization to complete before returning.
961    /// If initialization fails or times out, an error is returned and the worker is not added.
962    ///
963    /// # Arguments
964    ///
965    /// * `id` - Unique identifier for the child
966    /// * `factory` - Factory function to create the worker
967    /// * `restart_policy` - How to handle worker termination after it starts running
968    /// * `context` - Shared context for stateful workers
969    /// * `timeout` - Maximum time to wait for initialization
970    ///
971    /// # Errors
972    ///
973    /// * `StatefulSupervisorError::InitializationFailed` - Worker initialization returned an error
974    /// * `StatefulSupervisorError::InitializationTimeout` - Worker didn't initialize within timeout
975    /// * `StatefulSupervisorError::ChildAlreadyExists` - A child with this ID already exists
976    /// * `StatefulSupervisorError::ShuttingDown` - Supervisor is shutting down
977    ///
978    /// # Note
979    ///
980    /// Initialization failures do NOT trigger restart policies. The worker must successfully
981    /// initialize before restart policies take effect.
982    pub async fn start_child_linked(
983        &self,
984        id: impl Into<String>,
985        factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
986        restart_policy: RestartPolicy,
987        context: Arc<WorkerContext>,
988        timeout: std::time::Duration,
989    ) -> Result<ChildId, StatefulSupervisorError> {
990        let (result_tx, result_rx) = oneshot::channel();
991        let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
992
993        self.control_tx
994            .send(StatefulSupervisorCommand::StartChildLinked {
995                spec,
996                timeout,
997                respond_to: result_tx,
998            })
999            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1000
1001        result_rx
1002            .await
1003            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1004    }
1005
1006    /// Dynamically terminates a child
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if the child is not found or the supervisor is shutting down.
1011    pub async fn terminate_child(&self, id: &str) -> Result<(), StatefulSupervisorError> {
1012        let (result_tx, result_rx) = oneshot::channel();
1013
1014        self.control_tx
1015            .send(StatefulSupervisorCommand::TerminateChild {
1016                id: id.to_owned(),
1017                respond_to: result_tx,
1018            })
1019            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1020
1021        result_rx
1022            .await
1023            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1024    }
1025
1026    /// Returns information about all children
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns an error if the supervisor is shutting down.
1031    pub async fn which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
1032        let (result_tx, result_rx) = oneshot::channel();
1033
1034        self.control_tx
1035            .send(StatefulSupervisorCommand::WhichChildren {
1036                respond_to: result_tx,
1037            })
1038            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1039
1040        result_rx
1041            .await
1042            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1043    }
1044
1045    /// Requests a graceful shutdown of the supervisor tree.
1046    ///
1047    /// # Errors
1048    ///
1049    /// Returns an error if the supervisor channel is already closed.
1050    #[allow(clippy::unused_async)]
1051    pub async fn shutdown(&self) -> Result<(), StatefulSupervisorError> {
1052        self.control_tx
1053            .send(StatefulSupervisorCommand::Shutdown)
1054            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1055        Ok(())
1056    }
1057
1058    /// Returns the supervisor's name.
1059    #[must_use]
1060    pub fn name(&self) -> &str {
1061        self.name.as_str()
1062    }
1063
1064    /// Returns the supervisor's restart strategy.
1065    ///
1066    /// # Errors
1067    ///
1068    /// Returns an error if the supervisor is shutting down.
1069    pub async fn restart_strategy(&self) -> Result<RestartStrategy, StatefulSupervisorError> {
1070        let (result_tx, result_rx) = oneshot::channel();
1071
1072        self.control_tx
1073            .send(StatefulSupervisorCommand::GetRestartStrategy {
1074                respond_to: result_tx,
1075            })
1076            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1077
1078        result_rx
1079            .await
1080            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1081    }
1082
1083    /// Returns the supervisor's uptime in seconds.
1084    ///
1085    /// # Errors
1086    ///
1087    /// Returns an error if the supervisor is shutting down.
1088    pub async fn uptime(&self) -> Result<u64, StatefulSupervisorError> {
1089        let (result_tx, result_rx) = oneshot::channel();
1090
1091        self.control_tx
1092            .send(StatefulSupervisorCommand::GetUptime {
1093                respond_to: result_tx,
1094            })
1095            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1096
1097        result_rx
1098            .await
1099            .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1100    }
1101}