atomr_core/
supervision.rs1use std::sync::Arc;
4use std::time::Duration;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[non_exhaustive]
9pub enum Directive {
10 Resume,
11 Restart,
12 Stop,
13 Escalate,
14}
15
16pub type Decider = Arc<dyn Fn(&str) -> Directive + Send + Sync>;
17
18#[derive(Clone)]
22pub struct SupervisorStrategy {
23 pub kind: StrategyKind,
24 pub max_retries: Option<u32>,
25 pub within: Option<Duration>,
26 pub decider: Decider,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30#[non_exhaustive]
31pub enum StrategyKind {
32 OneForOne,
33 AllForOne,
34}
35
36impl std::fmt::Debug for SupervisorStrategy {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("SupervisorStrategy")
39 .field("kind", &self.kind)
40 .field("max_retries", &self.max_retries)
41 .field("within", &self.within)
42 .finish_non_exhaustive()
43 }
44}
45
46impl Default for SupervisorStrategy {
47 fn default() -> Self {
48 OneForOneStrategy::default().into()
49 }
50}
51
52impl SupervisorStrategy {
53 pub fn decide(&self, err: &str) -> Directive {
54 (self.decider)(err)
55 }
56}
57
58pub struct OneForOneStrategy {
60 pub max_retries: Option<u32>,
61 pub within: Option<Duration>,
62 pub decider: Decider,
63}
64
65impl Default for OneForOneStrategy {
66 fn default() -> Self {
67 Self {
68 max_retries: Some(10),
69 within: Some(Duration::from_secs(60)),
70 decider: Arc::new(|_| Directive::Restart),
71 }
72 }
73}
74
75impl OneForOneStrategy {
76 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn with_max_retries(mut self, n: u32) -> Self {
81 self.max_retries = Some(n);
82 self
83 }
84
85 pub fn with_within(mut self, d: Duration) -> Self {
86 self.within = Some(d);
87 self
88 }
89
90 pub fn with_decider(mut self, f: impl Fn(&str) -> Directive + Send + Sync + 'static) -> Self {
91 self.decider = Arc::new(f);
92 self
93 }
94}
95
96impl From<OneForOneStrategy> for SupervisorStrategy {
97 fn from(o: OneForOneStrategy) -> Self {
98 Self {
99 kind: StrategyKind::OneForOne,
100 max_retries: o.max_retries,
101 within: o.within,
102 decider: o.decider,
103 }
104 }
105}
106
107pub struct AllForOneStrategy {
109 pub max_retries: Option<u32>,
110 pub within: Option<Duration>,
111 pub decider: Decider,
112}
113
114impl Default for AllForOneStrategy {
115 fn default() -> Self {
116 Self {
117 max_retries: Some(10),
118 within: Some(Duration::from_secs(60)),
119 decider: Arc::new(|_| Directive::Restart),
120 }
121 }
122}
123
124impl From<AllForOneStrategy> for SupervisorStrategy {
125 fn from(o: AllForOneStrategy) -> Self {
126 Self {
127 kind: StrategyKind::AllForOne,
128 max_retries: o.max_retries,
129 within: o.within,
130 decider: o.decider,
131 }
132 }
133}
134
135use crate::actor::Actor;
164
165pub trait SupervisorOf<C: Actor> {
168 type ChildError: std::error::Error + Send + 'static;
172
173 fn decide(&self, _err: &Self::ChildError) -> Directive {
176 Directive::Restart
177 }
178}
179
180#[derive(Debug, thiserror::Error)]
185#[error("{message}")]
186pub struct SupervisionError {
187 pub message: String,
188}
189
190impl SupervisionError {
191 pub fn new(message: impl Into<String>) -> Self {
192 Self { message: message.into() }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::actor::{Actor, Context};
200
201 #[test]
202 fn default_is_one_for_one_restart() {
203 let s = SupervisorStrategy::default();
204 assert_eq!(s.kind, StrategyKind::OneForOne);
205 assert_eq!(s.decide("boom"), Directive::Restart);
206 }
207
208 #[test]
209 fn custom_decider_runs() {
210 let s: SupervisorStrategy = OneForOneStrategy::new()
211 .with_decider(|e| if e == "stop" { Directive::Stop } else { Directive::Resume })
212 .into();
213 assert_eq!(s.decide("stop"), Directive::Stop);
214 assert_eq!(s.decide("keep"), Directive::Resume);
215 }
216
217 #[derive(Default)]
220 struct Boss;
221 #[derive(Default)]
222 struct Worker;
223
224 #[async_trait::async_trait]
225 impl Actor for Boss {
226 type Msg = ();
227 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
228 }
229
230 #[async_trait::async_trait]
231 impl Actor for Worker {
232 type Msg = ();
233 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
234 }
235
236 #[derive(Debug, thiserror::Error)]
237 #[error("worker died: {0}")]
238 struct WorkerError(String);
239
240 impl SupervisorOf<Worker> for Boss {
241 type ChildError = WorkerError;
242 fn decide(&self, _err: &WorkerError) -> Directive {
243 Directive::Stop
244 }
245 }
246
247 #[test]
248 fn explicit_impl_is_resolvable_with_typed_error() {
249 fn assert_typed_decider<P: SupervisorOf<C, ChildError = WorkerError>, C: Actor>() {}
250 assert_typed_decider::<Boss, Worker>();
251 }
252
253 #[test]
254 fn typed_decider_runs() {
255 let boss = Boss;
256 let err = WorkerError("oops".into());
257 let d = SupervisorOf::<Worker>::decide(&boss, &err);
258 assert_eq!(d, Directive::Stop);
259 }
260
261 #[test]
266 fn supervision_error_works_as_default_child_error() {
267 struct Default42;
268 #[async_trait::async_trait]
269 impl Actor for Default42 {
270 type Msg = ();
271 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
272 }
273 struct AnyParent;
274 #[async_trait::async_trait]
275 impl Actor for AnyParent {
276 type Msg = ();
277 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
278 }
279 impl SupervisorOf<Default42> for AnyParent {
280 type ChildError = SupervisionError;
281 }
282 let p = AnyParent;
283 let err = SupervisionError::new("crash");
284 assert_eq!(SupervisorOf::<Default42>::decide(&p, &err), Directive::Restart);
285 }
286
287 #[test]
288 fn supervision_error_displays_message() {
289 let e = SupervisionError::new("halt");
290 assert_eq!(e.to_string(), "halt");
291 }
292}