ash_flare/supervisor/
handle.rs1use super::error::SupervisorError;
4use super::runtime::{SupervisorCommand, SupervisorRuntime};
5use super::spec::SupervisorSpec;
6use crate::restart::RestartPolicy;
7use crate::types::{ChildId, ChildInfo};
8use crate::worker::{Worker, WorkerSpec};
9use std::sync::Arc;
10use tokio::sync::{mpsc, oneshot};
11
12#[derive(Clone)]
14pub struct SupervisorHandle<W: Worker> {
15 pub(crate) name: Arc<String>,
16 pub(crate) control_tx: mpsc::UnboundedSender<SupervisorCommand<W>>,
17}
18
19impl<W: Worker> SupervisorHandle<W> {
20 pub fn start(spec: SupervisorSpec<W>) -> Self {
22 let (control_tx, control_rx) = mpsc::unbounded_channel();
23 let name_arc = Arc::new(spec.name.clone());
24 let runtime = SupervisorRuntime::new(spec, control_rx, control_tx.clone());
25
26 let runtime_name = Arc::clone(&name_arc);
27 tokio::spawn(async move {
28 runtime.run().await;
29 slog::debug!(slog_scope::logger(), "supervisor stopped";
30 "name" => &*runtime_name
31 );
32 });
33
34 Self {
35 name: name_arc,
36 control_tx,
37 }
38 }
39
40 pub async fn start_child(
42 &self,
43 id: impl Into<String>,
44 factory: impl Fn() -> W + Send + Sync + 'static,
45 restart_policy: RestartPolicy,
46 ) -> Result<ChildId, SupervisorError> {
47 let (result_tx, result_rx) = oneshot::channel();
48 let spec = WorkerSpec::new(id, factory, restart_policy);
49
50 self.control_tx
51 .send(SupervisorCommand::StartChild {
52 spec,
53 respond_to: result_tx,
54 })
55 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
56
57 result_rx
58 .await
59 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
60 }
61
62 pub async fn start_child_linked(
86 &self,
87 id: impl Into<String>,
88 factory: impl Fn() -> W + Send + Sync + 'static,
89 restart_policy: RestartPolicy,
90 timeout: std::time::Duration,
91 ) -> Result<ChildId, SupervisorError> {
92 let (result_tx, result_rx) = oneshot::channel();
93 let spec = WorkerSpec::new(id, factory, restart_policy);
94
95 self.control_tx
96 .send(SupervisorCommand::StartChildLinked {
97 spec,
98 timeout,
99 respond_to: result_tx,
100 })
101 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
102
103 result_rx
104 .await
105 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
106 }
107
108 pub async fn terminate_child(&self, id: &str) -> Result<(), SupervisorError> {
110 let (result_tx, result_rx) = oneshot::channel();
111
112 self.control_tx
113 .send(SupervisorCommand::TerminateChild {
114 id: id.to_owned(),
115 respond_to: result_tx,
116 })
117 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
118
119 result_rx
120 .await
121 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
122 }
123
124 pub async fn which_children(&self) -> Result<Vec<ChildInfo>, SupervisorError> {
126 let (result_tx, result_rx) = oneshot::channel();
127
128 self.control_tx
129 .send(SupervisorCommand::WhichChildren {
130 respond_to: result_tx,
131 })
132 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
133
134 result_rx
135 .await
136 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?
137 }
138
139 pub async fn shutdown(&self) -> Result<(), SupervisorError> {
141 self.control_tx
142 .send(SupervisorCommand::Shutdown)
143 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
144 Ok(())
145 }
146
147 pub fn name(&self) -> &str {
149 self.name.as_str()
150 }
151
152 pub async fn restart_strategy(
154 &self,
155 ) -> Result<crate::restart::RestartStrategy, SupervisorError> {
156 let (result_tx, result_rx) = oneshot::channel();
157
158 self.control_tx
159 .send(SupervisorCommand::GetRestartStrategy {
160 respond_to: result_tx,
161 })
162 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
163
164 result_rx
165 .await
166 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
167 }
168
169 pub async fn uptime(&self) -> Result<u64, SupervisorError> {
171 let (result_tx, result_rx) = oneshot::channel();
172
173 self.control_tx
174 .send(SupervisorCommand::GetUptime {
175 respond_to: result_tx,
176 })
177 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))?;
178
179 result_rx
180 .await
181 .map_err(|_| SupervisorError::ShuttingDown(self.name().to_owned()))
182 }
183}