atomr_core/
supervision.rs1use std::sync::Arc;
4use std::time::Duration;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[non_exhaustive]
9pub enum Directive {
10 Resume,
11 Restart,
12 Stop,
13 Escalate,
14}
15
16pub type Decider = Arc<dyn Fn(&str) -> Directive + Send + Sync>;
17
18#[derive(Clone)]
21pub struct SupervisorStrategy {
22 pub kind: StrategyKind,
23 pub max_retries: Option<u32>,
24 pub within: Option<Duration>,
25 pub decider: Decider,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29#[non_exhaustive]
30pub enum StrategyKind {
31 OneForOne,
32 AllForOne,
33}
34
35impl std::fmt::Debug for SupervisorStrategy {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("SupervisorStrategy")
38 .field("kind", &self.kind)
39 .field("max_retries", &self.max_retries)
40 .field("within", &self.within)
41 .finish_non_exhaustive()
42 }
43}
44
45impl Default for SupervisorStrategy {
46 fn default() -> Self {
47 OneForOneStrategy::default().into()
48 }
49}
50
51impl SupervisorStrategy {
52 pub fn decide(&self, err: &str) -> Directive {
53 (self.decider)(err)
54 }
55}
56
57pub struct OneForOneStrategy {
59 pub max_retries: Option<u32>,
60 pub within: Option<Duration>,
61 pub decider: Decider,
62}
63
64impl Default for OneForOneStrategy {
65 fn default() -> Self {
66 Self {
67 max_retries: Some(10),
68 within: Some(Duration::from_secs(60)),
69 decider: Arc::new(|_| Directive::Restart),
70 }
71 }
72}
73
74impl OneForOneStrategy {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn with_max_retries(mut self, n: u32) -> Self {
80 self.max_retries = Some(n);
81 self
82 }
83
84 pub fn with_within(mut self, d: Duration) -> Self {
85 self.within = Some(d);
86 self
87 }
88
89 pub fn with_decider(mut self, f: impl Fn(&str) -> Directive + Send + Sync + 'static) -> Self {
90 self.decider = Arc::new(f);
91 self
92 }
93}
94
95impl From<OneForOneStrategy> for SupervisorStrategy {
96 fn from(o: OneForOneStrategy) -> Self {
97 Self {
98 kind: StrategyKind::OneForOne,
99 max_retries: o.max_retries,
100 within: o.within,
101 decider: o.decider,
102 }
103 }
104}
105
106pub struct AllForOneStrategy {
108 pub max_retries: Option<u32>,
109 pub within: Option<Duration>,
110 pub decider: Decider,
111}
112
113impl Default for AllForOneStrategy {
114 fn default() -> Self {
115 Self {
116 max_retries: Some(10),
117 within: Some(Duration::from_secs(60)),
118 decider: Arc::new(|_| Directive::Restart),
119 }
120 }
121}
122
123impl From<AllForOneStrategy> for SupervisorStrategy {
124 fn from(o: AllForOneStrategy) -> Self {
125 Self {
126 kind: StrategyKind::AllForOne,
127 max_retries: o.max_retries,
128 within: o.within,
129 decider: o.decider,
130 }
131 }
132}
133
134use crate::actor::Actor;
163
164pub trait SupervisorOf<C: Actor> {
167 type ChildError: std::error::Error + Send + 'static;
171
172 fn decide(&self, _err: &Self::ChildError) -> Directive {
176 Directive::Restart
177 }
178}
179
180#[derive(Debug, thiserror::Error)]
185#[error("{message}")]
186pub struct SupervisionError {
187 pub message: String,
188}
189
190impl SupervisionError {
191 pub fn new(message: impl Into<String>) -> Self {
192 Self { message: message.into() }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::actor::{Actor, Context};
200
201 #[test]
202 fn default_is_one_for_one_restart() {
203 let s = SupervisorStrategy::default();
204 assert_eq!(s.kind, StrategyKind::OneForOne);
205 assert_eq!(s.decide("boom"), Directive::Restart);
206 }
207
208 #[test]
209 fn custom_decider_runs() {
210 let s: SupervisorStrategy = OneForOneStrategy::new()
211 .with_decider(|e| if e == "stop" { Directive::Stop } else { Directive::Resume })
212 .into();
213 assert_eq!(s.decide("stop"), Directive::Stop);
214 assert_eq!(s.decide("keep"), Directive::Resume);
215 }
216
217 #[derive(Default)]
220 struct Boss;
221 #[derive(Default)]
222 struct Worker;
223
224 #[async_trait::async_trait]
225 impl Actor for Boss {
226 type Msg = ();
227 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
228 }
229
230 #[async_trait::async_trait]
231 impl Actor for Worker {
232 type Msg = ();
233 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
234 }
235
236 #[derive(Debug, thiserror::Error)]
237 #[error("worker died: {0}")]
238 struct WorkerError(String);
239
240 impl SupervisorOf<Worker> for Boss {
241 type ChildError = WorkerError;
242 fn decide(&self, _err: &WorkerError) -> Directive {
243 Directive::Stop
244 }
245 }
246
247 #[test]
248 fn explicit_impl_is_resolvable_with_typed_error() {
249 fn assert_typed_decider<P: SupervisorOf<C, ChildError = WorkerError>, C: Actor>() {}
250 assert_typed_decider::<Boss, Worker>();
251 }
252
253 #[test]
254 fn typed_decider_runs() {
255 let boss = Boss;
256 let err = WorkerError("oops".into());
257 let d = SupervisorOf::<Worker>::decide(&boss, &err);
258 assert_eq!(d, Directive::Stop);
259 }
260
261 #[test]
266 fn supervision_error_works_as_default_child_error() {
267 struct Default42;
268 #[async_trait::async_trait]
269 impl Actor for Default42 {
270 type Msg = ();
271 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
272 }
273 struct AnyParent;
274 #[async_trait::async_trait]
275 impl Actor for AnyParent {
276 type Msg = ();
277 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
278 }
279 impl SupervisorOf<Default42> for AnyParent {
280 type ChildError = SupervisionError;
281 }
282 let p = AnyParent;
283 let err = SupervisionError::new("crash");
284 assert_eq!(SupervisorOf::<Default42>::decide(&p, &err), Directive::Restart);
285 }
286
287 #[test]
288 fn supervision_error_displays_message() {
289 let e = SupervisionError::new("halt");
290 assert_eq!(e.to_string(), "halt");
291 }
292}