ash_flare/supervisor/
handle.rs1use super::error::SupervisorError;
4use super::runtime::{SupervisorCommand, SupervisorRuntime};
5use super::spec::SupervisorSpec;
6use crate::restart::RestartPolicy;
7use crate::types::{ChildId, ChildInfo};
8use crate::worker::{Worker, WorkerSpec};
9use std::sync::Arc;
10use tokio::sync::{mpsc, oneshot};
11
12#[derive(Clone)]
14pub struct SupervisorHandle<W: Worker> {
15 pub(crate) name: Arc<String>,
16 pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
17}
18
19impl<W: Worker> SupervisorHandle<W> {
20 pub fn start(spec: SupervisorSpec<W>) -> Self {
22 let (control_tx, control_rx) = mpsc::unbounded_channel();
23 let name_arc = Arc::new(spec.name.clone());
24 let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
25
26 let runtime_name = Arc::clone(&name_arc);
27 tokio::spawn(async move {
28 runtime.run().await;
29 tracing::debug!(name = %*runtime_name, "supervisor stopped");
30 });
31
32 Self {
33 name: name_arc,
34 control_tx,
35 }
36 }
37
38 pub async fn start_child(
40 &self,
41 id: impl Into<String>,
42 factory: impl Fn() -> W + Send + Sync + 'static,
43 restart_policy: RestartPolicy,
44 ) -> Result<ChildId, SupervisorError> {
45 let (result_tx, result_rx) = oneshot::channel();
46 let spec = WorkerSpec::new(id, factory, restart_policy);
47
48 self.control_tx
49 .send(SupervisorCommand::StartChild {
50 spec,
51 respond_to: result_tx,
52 })
53 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
54
55 result_rx
56 .await
57 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
58 }
59
60 pub async fn start_child_linked(
84 &self,
85 id: impl Into<String>,
86 factory: impl Fn() -> W + Send + Sync + 'static,
87 restart_policy: RestartPolicy,
88 timeout: std::time::Duration,
89 ) -> Result<ChildId, SupervisorError> {
90 let (result_tx, result_rx) = oneshot::channel();
91 let spec = WorkerSpec::new(id, factory, restart_policy);
92
93 self.control_tx
94 .send(SupervisorCommand::StartChildLinked {
95 spec,
96 timeout,
97 respond_to: result_tx,
98 })
99 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
100
101 result_rx
102 .await
103 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
104 }
105
106 pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
108 let (result_tx, result_rx) = oneshot::channel();
109
110 self.control_tx
111 .send(SupervisorCommand::TerminateChild {
112 id: id.to_owned(),
113 respond_to: result_tx,
114 })
115 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
116
117 result_rx
118 .await
119 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
120 }
121
122 pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
124 let (result_tx, result_rx) = oneshot::channel();
125
126 self.control_tx
127 .send(SupervisorCommand::WhichChildren {
128 respond_to: result_tx,
129 })
130 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
131
132 result_rx
133 .await
134 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
135 }
136
137 pub async fn shutdown(&self) -> Result<(), SupervisorError> {
139 self.control_tx
140 .send(SupervisorCommand::Shutdown)
141 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
142 Ok(())
143 }
144
145 pub fn name(&self) -> &str {
147 self.name.as_str()
148 }
149
150 pub async fn restart_strategy(
152 &self,
153 ) -> Result<crate::restart::RestartStrategy, SupervisorError> {
154 let (result_tx, result_rx) = oneshot::channel();
155
156 self.control_tx
157 .send(SupervisorCommand::GetRestartStrategy {
158 respond_to: result_tx,
159 })
160 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
161
162 result_rx
163 .await
164 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
165 }
166
167 pub async fn uptime(&self) -> Result<u64, SupervisorError> {
169 let (result_tx, result_rx) = oneshot::channel();
170
171 self.control_tx
172 .send(SupervisorCommand::GetUptime {
173 respond_to: result_tx,
174 })
175 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
176
177 result_rx
178 .await
179 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
180 }
181}