ash_flare/supervisor/
handle.rs1use super::error::SupervisorError;
4use super::runtime::{SupervisorCommand, SupervisorRuntime};
5use super::spec::SupervisorSpec;
6use crate::restart::RestartPolicy;
7use crate::types::{ChildId, ChildInfo};
8use crate::worker::{Worker, WorkerSpec};
9use std::sync::Arc;
10use tokio::sync::{mpsc, oneshot};
11
12#[derive(Clone)]
14pub struct SupervisorHandle<W: Worker> {
15 pub(crate) name: Arc<String>,
16 pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
17}
18
19impl<W: Worker> SupervisorHandle<W> {
20 #[must_use]
22 pub fn start(spec: SupervisorSpec<W>) -> Self {
23 let (control_tx, control_rx) = mpsc::unbounded_channel();
24 let name_arc = Arc::new(spec.name.clone());
25 let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
26
27 let runtime_name = Arc::clone(&name_arc);
28 tokio::spawn(async move {
29 runtime.run().await;
30 tracing::debug!(name = %*runtime_name, "supervisor stopped");
31 });
32
33 Self {
34 name: name_arc,
35 control_tx,
36 }
37 }
38
39 pub async fn start_child(
45 &self,
46 id: impl Into<String>,
47 factory: impl Fn() -> W + Send + Sync + 'static,
48 restart_policy: RestartPolicy,
49 ) -> Result<ChildId, SupervisorError> {
50 let (result_tx, result_rx) = oneshot::channel();
51 let spec = WorkerSpec::new(id, factory, restart_policy);
52
53 self.control_tx
54 .send(SupervisorCommand::StartChild {
55 spec,
56 respond_to: result_tx,
57 })
58 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
59
60 result_rx
61 .await
62 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
63 }
64
65 pub async fn start_child_linked(
89 &self,
90 id: impl Into<String>,
91 factory: impl Fn() -> W + Send + Sync + 'static,
92 restart_policy: RestartPolicy,
93 timeout: std::time::Duration,
94 ) -> Result<ChildId, SupervisorError> {
95 let (result_tx, result_rx) = oneshot::channel();
96 let spec = WorkerSpec::new(id, factory, restart_policy);
97
98 self.control_tx
99 .send(SupervisorCommand::StartChildLinked {
100 spec,
101 timeout,
102 respond_to: result_tx,
103 })
104 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
105
106 result_rx
107 .await
108 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
109 }
110
111 pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
117 let (result_tx, result_rx) = oneshot::channel();
118
119 self.control_tx
120 .send(SupervisorCommand::TerminateChild {
121 id: id.to_owned(),
122 respond_to: result_tx,
123 })
124 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
125
126 result_rx
127 .await
128 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
129 }
130
131 pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
137 let (result_tx, result_rx) = oneshot::channel();
138
139 self.control_tx
140 .send(SupervisorCommand::WhichChildren {
141 respond_to: result_tx,
142 })
143 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
144
145 result_rx
146 .await
147 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
148 }
149
150 #[allow(clippy::unused_async)]
156 pub async fn shutdown(&self) -> Result<(), SupervisorError> {
157 self.control_tx
158 .send(SupervisorCommand::Shutdown)
159 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
160 Ok(())
161 }
162
163 #[must_use]
165 pub fn name(&self) -> &str {
166 self.name.as_str()
167 }
168
169 pub async fn restart_strategy(
175 &self,
176 ) -> Result<crate::restart::RestartStrategy, SupervisorError> {
177 let (result_tx, result_rx) = oneshot::channel();
178
179 self.control_tx
180 .send(SupervisorCommand::GetRestartStrategy {
181 respond_to: result_tx,
182 })
183 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
184
185 result_rx
186 .await
187 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
188 }
189
190 pub async fn uptime(&self) -> Result<u64, SupervisorError> {
196 let (result_tx, result_rx) = oneshot::channel();
197
198 self.control_tx
199 .send(SupervisorCommand::GetUptime {
200 respond_to: result_tx,
201 })
202 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
203
204 result_rx
205 .await
206 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
207 }
208}