Skip to main content

ash_flare/supervisor/
handle.rs

1//! Supervisor handle - public API for interacting with supervisors
2
3use super::error::SupervisorError;
4use super::runtime::{SupervisorCommand, SupervisorRuntime};
5use super::spec::SupervisorSpec;
6use crate::restart::RestartPolicy;
7use crate::types::{ChildId, ChildInfo};
8use crate::worker::{Worker, WorkerSpec};
9use std::sync::Arc;
10use tokio::sync::{mpsc, oneshot};
11
12/// Handle used to interact with a running supervisor tree.
13#[derive(Clone)]
14pub struct SupervisorHandle<W: Worker> {
15    pub(crate) name: Arc<String>,
16    pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
17}
18
19impl<W: Worker> SupervisorHandle<W> {
20    /// Spawns a supervisor tree based on the provided specification.
21    pub fn start(spec: SupervisorSpec<W>) -> Self {
22        let (control_tx, control_rx) = mpsc::unbounded_channel();
23        let name_arc = Arc::new(spec.name.clone());
24        let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
25
26        let runtime_name = Arc::clone(&name_arc);
27        tokio::spawn(async move {
28            runtime.run().await;
29            slog::debug!(slog_scope::logger(), "supervisor stopped";
30                "name" => &*runtime_name
31            );
32        });
33
34        Self {
35            name: name_arc,
36            control_tx,
37        }
38    }
39
40    /// Dynamically starts a new child worker
41    pub async fn start_child(
42        &self,
43        id: impl Into<String>,
44        factory: impl Fn() -> W + Send + Sync + 'static,
45        restart_policy: RestartPolicy,
46    ) -> Result<ChildId, SupervisorError> {
47        let (result_tx, result_rx) = oneshot::channel();
48        let spec = WorkerSpec::new(id, factory, restart_policy);
49
50        self.control_tx
51            .send(SupervisorCommand::StartChild {
52                spec,
53                respond_to: result_tx,
54            })
55            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
56
57        result_rx
58            .await
59            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
60    }
61
62    /// Dynamically starts a new child worker with linked initialization.
63    ///
64    /// This method waits for the worker's initialization to complete before returning.
65    /// If initialization fails or times out, an error is returned and the worker is not added.
66    ///
67    /// # Arguments
68    ///
69    /// * `id` - Unique identifier for the child
70    /// * `factory` - Factory function to create the worker
71    /// * `restart_policy` - How to handle worker termination after it starts running
72    /// * `timeout` - Maximum time to wait for initialization
73    ///
74    /// # Errors
75    ///
76    /// * `SupervisorError::InitializationFailed` - Worker initialization returned an error
77    /// * `SupervisorError::InitializationTimeout` - Worker didn't initialize within timeout
78    /// * `SupervisorError::ChildAlreadyExists` - A child with this ID already exists
79    /// * `SupervisorError::ShuttingDown` - Supervisor is shutting down
80    ///
81    /// # Note
82    ///
83    /// Initialization failures do NOT trigger restart policies. The worker must successfully
84    /// initialize before restart policies take effect.
85    pub async fn start_child_linked(
86        &self,
87        id: impl Into<String>,
88        factory: impl Fn() -> W + Send + Sync + 'static,
89        restart_policy: RestartPolicy,
90        timeout: std::time::Duration,
91    ) -> Result<ChildId, SupervisorError> {
92        let (result_tx, result_rx) = oneshot::channel();
93        let spec = WorkerSpec::new(id, factory, restart_policy);
94
95        self.control_tx
96            .send(SupervisorCommand::StartChildLinked {
97                spec,
98                timeout,
99                respond_to: result_tx,
100            })
101            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
102
103        result_rx
104            .await
105            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
106    }
107
108    /// Dynamically terminates a child
109    pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
110        let (result_tx, result_rx) = oneshot::channel();
111
112        self.control_tx
113            .send(SupervisorCommand::TerminateChild {
114                id: id.to_owned(),
115                respond_to: result_tx,
116            })
117            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
118
119        result_rx
120            .await
121            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
122    }
123
124    /// Returns information about all children
125    pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
126        let (result_tx, result_rx) = oneshot::channel();
127
128        self.control_tx
129            .send(SupervisorCommand::WhichChildren {
130                respond_to: result_tx,
131            })
132            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
133
134        result_rx
135            .await
136            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
137    }
138
139    /// Requests a graceful shutdown of the supervisor tree.
140    pub async fn shutdown(&self) -> Result<(), SupervisorError> {
141        self.control_tx
142            .send(SupervisorCommand::Shutdown)
143            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
144        Ok(())
145    }
146
147    /// Returns the supervisor's name.
148    pub fn name(&self) -> &str {
149        self.name.as_str()
150    }
151
152    /// Returns the supervisor's restart strategy.
153    pub async fn restart_strategy(
154        &self,
155    ) -> Result<crate::restart::RestartStrategy, SupervisorError> {
156        let (result_tx, result_rx) = oneshot::channel();
157
158        self.control_tx
159            .send(SupervisorCommand::GetRestartStrategy {
160                respond_to: result_tx,
161            })
162            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
163
164        result_rx
165            .await
166            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
167    }
168
169    /// Returns the supervisor's uptime in seconds.
170    pub async fn uptime(&self) -> Result<u64, SupervisorError> {
171        let (result_tx, result_rx) = oneshot::channel();
172
173        self.control_tx
174            .send(SupervisorCommand::GetUptime {
175                respond_to: result_tx,
176            })
177            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
178
179        result_rx
180            .await
181            .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
182    }
183}