atomr_core/
supervision.rs1use std::sync::Arc;
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
14pub struct PanicPayload {
15 pub module: String,
16 pub qualname: String,
17 pub repr: String,
18}
19
20impl PanicPayload {
21 pub fn new(module: impl Into<String>, qualname: impl Into<String>, repr: impl Into<String>) -> Self {
22 Self { module: module.into(), qualname: qualname.into(), repr: repr.into() }
23 }
24
25 pub fn class_path(&self) -> String {
27 if self.module.is_empty() {
28 self.qualname.clone()
29 } else {
30 format!("{}.{}", self.module, self.qualname)
31 }
32 }
33
34 pub fn to_wire(&self) -> String {
38 format!("{}: {}", self.class_path(), self.repr)
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44#[non_exhaustive]
45pub enum Directive {
46 Resume,
47 Restart,
48 Stop,
49 Escalate,
50}
51
52pub type Decider = Arc<dyn Fn(&str) -> Directive + Send + Sync>;
53
54#[derive(Clone)]
58pub struct SupervisorStrategy {
59 pub kind: StrategyKind,
60 pub max_retries: Option<u32>,
61 pub within: Option<Duration>,
62 pub decider: Decider,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66#[non_exhaustive]
67pub enum StrategyKind {
68 OneForOne,
69 AllForOne,
70}
71
72impl std::fmt::Debug for SupervisorStrategy {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("SupervisorStrategy")
75 .field("kind", &self.kind)
76 .field("max_retries", &self.max_retries)
77 .field("within", &self.within)
78 .finish_non_exhaustive()
79 }
80}
81
82impl Default for SupervisorStrategy {
83 fn default() -> Self {
84 OneForOneStrategy::default().into()
85 }
86}
87
88impl SupervisorStrategy {
89 pub fn decide(&self, err: &str) -> Directive {
90 (self.decider)(err)
91 }
92
93 pub fn on_max_retries(&self) -> Directive {
101 Directive::Escalate
102 }
103}
104
105pub struct OneForOneStrategy {
107 pub max_retries: Option<u32>,
108 pub within: Option<Duration>,
109 pub decider: Decider,
110}
111
112impl Default for OneForOneStrategy {
113 fn default() -> Self {
114 Self {
115 max_retries: Some(10),
116 within: Some(Duration::from_secs(60)),
117 decider: Arc::new(|_| Directive::Restart),
118 }
119 }
120}
121
122impl OneForOneStrategy {
123 pub fn new() -> Self {
124 Self::default()
125 }
126
127 pub fn with_max_retries(mut self, n: u32) -> Self {
128 self.max_retries = Some(n);
129 self
130 }
131
132 pub fn with_within(mut self, d: Duration) -> Self {
133 self.within = Some(d);
134 self
135 }
136
137 pub fn with_decider(mut self, f: impl Fn(&str) -> Directive + Send + Sync + 'static) -> Self {
138 self.decider = Arc::new(f);
139 self
140 }
141}
142
143impl From<OneForOneStrategy> for SupervisorStrategy {
144 fn from(o: OneForOneStrategy) -> Self {
145 Self {
146 kind: StrategyKind::OneForOne,
147 max_retries: o.max_retries,
148 within: o.within,
149 decider: o.decider,
150 }
151 }
152}
153
154pub struct AllForOneStrategy {
156 pub max_retries: Option<u32>,
157 pub within: Option<Duration>,
158 pub decider: Decider,
159}
160
161impl Default for AllForOneStrategy {
162 fn default() -> Self {
163 Self {
164 max_retries: Some(10),
165 within: Some(Duration::from_secs(60)),
166 decider: Arc::new(|_| Directive::Restart),
167 }
168 }
169}
170
171impl From<AllForOneStrategy> for SupervisorStrategy {
172 fn from(o: AllForOneStrategy) -> Self {
173 Self {
174 kind: StrategyKind::AllForOne,
175 max_retries: o.max_retries,
176 within: o.within,
177 decider: o.decider,
178 }
179 }
180}
181
182use crate::actor::Actor;
211
212pub trait SupervisorOf<C: Actor> {
215 type ChildError: std::error::Error + Send + 'static;
219
220 fn decide(&self, _err: &Self::ChildError) -> Directive {
223 Directive::Restart
224 }
225}
226
227#[derive(Debug, thiserror::Error)]
232#[error("{message}")]
233pub struct SupervisionError {
234 pub message: String,
235}
236
237impl SupervisionError {
238 pub fn new(message: impl Into<String>) -> Self {
239 Self { message: message.into() }
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::actor::{Actor, Context};
247
248 #[test]
249 fn default_is_one_for_one_restart() {
250 let s = SupervisorStrategy::default();
251 assert_eq!(s.kind, StrategyKind::OneForOne);
252 assert_eq!(s.decide("boom"), Directive::Restart);
253 }
254
255 #[test]
256 fn custom_decider_runs() {
257 let s: SupervisorStrategy = OneForOneStrategy::new()
258 .with_decider(|e| if e == "stop" { Directive::Stop } else { Directive::Resume })
259 .into();
260 assert_eq!(s.decide("stop"), Directive::Stop);
261 assert_eq!(s.decide("keep"), Directive::Resume);
262 }
263
264 #[derive(Default)]
267 struct Boss;
268 #[derive(Default)]
269 struct Worker;
270
271 #[async_trait::async_trait]
272 impl Actor for Boss {
273 type Msg = ();
274 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
275 }
276
277 #[async_trait::async_trait]
278 impl Actor for Worker {
279 type Msg = ();
280 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
281 }
282
283 #[derive(Debug, thiserror::Error)]
284 #[error("worker died: {0}")]
285 struct WorkerError(String);
286
287 impl SupervisorOf<Worker> for Boss {
288 type ChildError = WorkerError;
289 fn decide(&self, _err: &WorkerError) -> Directive {
290 Directive::Stop
291 }
292 }
293
294 #[test]
295 fn explicit_impl_is_resolvable_with_typed_error() {
296 fn assert_typed_decider<P: SupervisorOf<C, ChildError = WorkerError>, C: Actor>() {}
297 assert_typed_decider::<Boss, Worker>();
298 }
299
300 #[test]
301 fn typed_decider_runs() {
302 let boss = Boss;
303 let err = WorkerError("oops".into());
304 let d = SupervisorOf::<Worker>::decide(&boss, &err);
305 assert_eq!(d, Directive::Stop);
306 }
307
308 #[test]
313 fn supervision_error_works_as_default_child_error() {
314 struct Default42;
315 #[async_trait::async_trait]
316 impl Actor for Default42 {
317 type Msg = ();
318 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
319 }
320 struct AnyParent;
321 #[async_trait::async_trait]
322 impl Actor for AnyParent {
323 type Msg = ();
324 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
325 }
326 impl SupervisorOf<Default42> for AnyParent {
327 type ChildError = SupervisionError;
328 }
329 let p = AnyParent;
330 let err = SupervisionError::new("crash");
331 assert_eq!(SupervisorOf::<Default42>::decide(&p, &err), Directive::Restart);
332 }
333
334 #[test]
335 fn supervision_error_displays_message() {
336 let e = SupervisionError::new("halt");
337 assert_eq!(e.to_string(), "halt");
338 }
339}