Skip to main content

ash_flare/supervisor/
handle.rs

1//! Supervisor handle - public API for interacting with supervisors
2
3use super::error::SupervisorError;
4use super::runtime::{SupervisorCommand, SupervisorRuntime};
5use super::spec::SupervisorSpec;
6use crate::restart::RestartPolicy;
7use crate::types::{ChildId, ChildInfo};
8use crate::worker::{Worker, WorkerSpec};
9use std::sync::Arc;
10use tokio::sync::{mpsc, oneshot};
11
12/// Handle used to interact with a running supervisor tree.
13#[derive(Clone)]
14pub struct SupervisorHandle<W: Worker> {
15    pub(crate) name: Arc<String>,
16    pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
17}
18
19impl<W: Worker> SupervisorHandle<W> {
20    /// Spawns a supervisor tree based on the provided specification.
21    pub fn start(spec: SupervisorSpec<W>) -> Self {
22        let (control_tx, control_rx) = mpsc::unbounded_channel();
23        let name_arc = Arc::new(spec.name.clone());
24        let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
25
26        let runtime_name = Arc::clone(&name_arc);
27        tokio::spawn(async move {
28            runtime.run().await;
29            tracing::debug!(name = %*runtime_name, "supervisor stopped");
30        });
31
32        Self {
33            name: name_arc,
34            control_tx,
35        }
36    }
37
38    /// Dynamically starts a new child worker
39    pub async fn start_child(
40        &self,
41        id: impl Into<String>,
42        factory: impl Fn() -> W + Send + Sync + 'static,
43        restart_policy: RestartPolicy,
44    ) -> Result<ChildId, SupervisorError> {
45        let (result_tx, result_rx) = oneshot::channel();
46        let spec = WorkerSpec::new(id, factory, restart_policy);
47
48        self.control_tx
49            .send(SupervisorCommand::StartChild {
50                spec,
51                respond_to: result_tx,
52            })
53            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
54
55        result_rx
56            .await
57            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
58    }
59
60    /// Dynamically starts a new child worker with linked initialization.
61    ///
62    /// This method waits for the worker's initialization to complete before returning.
63    /// If initialization fails or times out, an error is returned and the worker is not added.
64    ///
65    /// # Arguments
66    ///
67    /// * `id` - Unique identifier for the child
68    /// * `factory` - Factory function to create the worker
69    /// * `restart_policy` - How to handle worker termination after it starts running
70    /// * `timeout` - Maximum time to wait for initialization
71    ///
72    /// # Errors
73    ///
74    /// * `SupervisorError::InitializationFailed` - Worker initialization returned an error
75    /// * `SupervisorError::InitializationTimeout` - Worker didn't initialize within timeout
76    /// * `SupervisorError::ChildAlreadyExists` - A child with this ID already exists
77    /// * `SupervisorError::ShuttingDown` - Supervisor is shutting down
78    ///
79    /// # Note
80    ///
81    /// Initialization failures do NOT trigger restart policies. The worker must successfully
82    /// initialize before restart policies take effect.
83    pub async fn start_child_linked(
84        &self,
85        id: impl Into<String>,
86        factory: impl Fn() -> W + Send + Sync + 'static,
87        restart_policy: RestartPolicy,
88        timeout: std::time::Duration,
89    ) -> Result<ChildId, SupervisorError> {
90        let (result_tx, result_rx) = oneshot::channel();
91        let spec = WorkerSpec::new(id, factory, restart_policy);
92
93        self.control_tx
94            .send(SupervisorCommand::StartChildLinked {
95                spec,
96                timeout,
97                respond_to: result_tx,
98            })
99            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
100
101        result_rx
102            .await
103            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
104    }
105
106    /// Dynamically terminates a child
107    pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
108        let (result_tx, result_rx) = oneshot::channel();
109
110        self.control_tx
111            .send(SupervisorCommand::TerminateChild {
112                id: id.to_owned(),
113                respond_to: result_tx,
114            })
115            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
116
117        result_rx
118            .await
119            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
120    }
121
122    /// Returns information about all children
123    pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
124        let (result_tx, result_rx) = oneshot::channel();
125
126        self.control_tx
127            .send(SupervisorCommand::WhichChildren {
128                respond_to: result_tx,
129            })
130            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
131
132        result_rx
133            .await
134            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
135    }
136
137    /// Requests a graceful shutdown of the supervisor tree.
138    pub async fn shutdown(&self) -> Result<(), SupervisorError> {
139        self.control_tx
140            .send(SupervisorCommand::Shutdown)
141            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
142        Ok(())
143    }
144
145    /// Returns the supervisor's name.
146    pub fn name(&self) -> &str {
147        self.name.as_str()
148    }
149
150    /// Returns the supervisor's restart strategy.
151    pub async fn restart_strategy(
152        &self,
153    ) -> Result<crate::restart::RestartStrategy, SupervisorError> {
154        let (result_tx, result_rx) = oneshot::channel();
155
156        self.control_tx
157            .send(SupervisorCommand::GetRestartStrategy {
158                respond_to: result_tx,
159            })
160            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
161
162        result_rx
163            .await
164            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
165    }
166
167    /// Returns the supervisor's uptime in seconds.
168    pub async fn uptime(&self) -> Result<u64, SupervisorError> {
169        let (result_tx, result_rx) = oneshot::channel();
170
171        self.control_tx
172            .send(SupervisorCommand::GetUptime {
173                respond_to: result_tx,
174            })
175            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
176
177        result_rx
178            .await
179            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
180    }
181}