Skip to main content

ash_flare/supervisor/
handle.rs

1//! Supervisor handle - public API for interacting with supervisors
2
3use super::error::SupervisorError;
4use super::runtime::{SupervisorCommand, SupervisorRuntime};
5use super::spec::SupervisorSpec;
6use crate::restart::RestartPolicy;
7use crate::types::{ChildId, ChildInfo};
8use crate::worker::{Worker, WorkerSpec};
9use std::sync::Arc;
10use tokio::sync::{mpsc, oneshot};
11
12/// Handle used to interact with a running supervisor tree.
13#[derive(Clone)]
14pub struct SupervisorHandle<W: Worker> {
15    pub(crate) name: Arc<String>,
16    pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
17}
18
19impl<W: Worker> SupervisorHandle<W> {
20    /// Spawns a supervisor tree based on the provided specification.
21    #[must_use]
22    pub fn start(spec: SupervisorSpec<W>) -> Self {
23        let (control_tx, control_rx) = mpsc::unbounded_channel();
24        let name_arc = Arc::new(spec.name.clone());
25        let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
26
27        let runtime_name = Arc::clone(&name_arc);
28        tokio::spawn(async move {
29            runtime.run().await;
30            tracing::debug!(name = %*runtime_name, "supervisor stopped");
31        });
32
33        Self {
34            name: name_arc,
35            control_tx,
36        }
37    }
38
39    /// Dynamically starts a new child worker
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the supervisor is shutting down or a child with this ID already exists.
44    pub async fn start_child(
45        &self,
46        id: impl Into<String>,
47        factory: impl Fn() -> W + Send + Sync + 'static,
48        restart_policy: RestartPolicy,
49    ) -> Result<ChildId, SupervisorError> {
50        let (result_tx, result_rx) = oneshot::channel();
51        let spec = WorkerSpec::new(id, factory, restart_policy);
52
53        self.control_tx
54            .send(SupervisorCommand::StartChild {
55                spec,
56                respond_to: result_tx,
57            })
58            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
59
60        result_rx
61            .await
62            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
63    }
64
65    /// Dynamically starts a new child worker with linked initialization.
66    ///
67    /// This method waits for the worker's initialization to complete before returning.
68    /// If initialization fails or times out, an error is returned and the worker is not added.
69    ///
70    /// # Arguments
71    ///
72    /// * `id` - Unique identifier for the child
73    /// * `factory` - Factory function to create the worker
74    /// * `restart_policy` - How to handle worker termination after it starts running
75    /// * `timeout` - Maximum time to wait for initialization
76    ///
77    /// # Errors
78    ///
79    /// * `SupervisorError::InitializationFailed` - Worker initialization returned an error
80    /// * `SupervisorError::InitializationTimeout` - Worker didn't initialize within timeout
81    /// * `SupervisorError::ChildAlreadyExists` - A child with this ID already exists
82    /// * `SupervisorError::ShuttingDown` - Supervisor is shutting down
83    ///
84    /// # Note
85    ///
86    /// Initialization failures do NOT trigger restart policies. The worker must successfully
87    /// initialize before restart policies take effect.
88    pub async fn start_child_linked(
89        &self,
90        id: impl Into<String>,
91        factory: impl Fn() -> W + Send + Sync + 'static,
92        restart_policy: RestartPolicy,
93        timeout: std::time::Duration,
94    ) -> Result<ChildId, SupervisorError> {
95        let (result_tx, result_rx) = oneshot::channel();
96        let spec = WorkerSpec::new(id, factory, restart_policy);
97
98        self.control_tx
99            .send(SupervisorCommand::StartChildLinked {
100                spec,
101                timeout,
102                respond_to: result_tx,
103            })
104            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
105
106        result_rx
107            .await
108            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
109    }
110
111    /// Dynamically terminates a child
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the child is not found or the supervisor is shutting down.
116    pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
117        let (result_tx, result_rx) = oneshot::channel();
118
119        self.control_tx
120            .send(SupervisorCommand::TerminateChild {
121                id: id.to_owned(),
122                respond_to: result_tx,
123            })
124            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
125
126        result_rx
127            .await
128            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
129    }
130
131    /// Returns information about all children
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the supervisor is shutting down.
136    pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
137        let (result_tx, result_rx) = oneshot::channel();
138
139        self.control_tx
140            .send(SupervisorCommand::WhichChildren {
141                respond_to: result_tx,
142            })
143            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
144
145        result_rx
146            .await
147            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
148    }
149
150    /// Requests a graceful shutdown of the supervisor tree.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the supervisor channel is already closed.
155    #[allow(clippy::unused_async)]
156    pub async fn shutdown(&self) -> Result<(), SupervisorError> {
157        self.control_tx
158            .send(SupervisorCommand::Shutdown)
159            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
160        Ok(())
161    }
162
163    /// Returns the supervisor's name.
164    #[must_use]
165    pub fn name(&self) -> &str {
166        self.name.as_str()
167    }
168
169    /// Returns the supervisor's restart strategy.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the supervisor is shutting down.
174    pub async fn restart_strategy(
175        &self,
176    ) -> Result<crate::restart::RestartStrategy, SupervisorError> {
177        let (result_tx, result_rx) = oneshot::channel();
178
179        self.control_tx
180            .send(SupervisorCommand::GetRestartStrategy {
181                respond_to: result_tx,
182            })
183            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
184
185        result_rx
186            .await
187            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
188    }
189
190    /// Returns the supervisor's uptime in seconds.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the supervisor is shutting down.
195    pub async fn uptime(&self) -> Result<u64, SupervisorError> {
196        let (result_tx, result_rx) = oneshot::channel();
197
198        self.control_tx
199            .send(SupervisorCommand::GetUptime {
200                respond_to: result_tx,
201            })
202            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
203
204        result_rx
205            .await
206            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
207    }
208}