1use std::sync::Arc;
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
14pub struct PanicPayload {
15 pub module: String,
16 pub qualname: String,
17 pub repr: String,
18}
19
20impl PanicPayload {
21 pub fn new(module: impl Into<String>, qualname: impl Into<String>, repr: impl Into<String>) -> Self {
22 Self { module: module.into(), qualname: qualname.into(), repr: repr.into() }
23 }
24
25 pub fn class_path(&self) -> String {
27 if self.module.is_empty() {
28 self.qualname.clone()
29 } else {
30 format!("{}.{}", self.module, self.qualname)
31 }
32 }
33
34 pub fn to_wire(&self) -> String {
38 format!("{}: {}", self.class_path(), self.repr)
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum SuspendMode {
48 FlatOnly,
50 RiskReducingOnly,
52 FullHalt,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq)]
67#[non_exhaustive]
68pub enum Directive {
69 Resume,
70 Restart,
71 Stop,
72 Escalate,
73 Throttle {
76 factor: f32,
77 window: Duration,
78 },
79 Suspend {
81 mode: SuspendMode,
82 },
83 ResumeFrom(SuspendMode),
85}
86
87impl Eq for Directive {}
88
89pub type Decider = Arc<dyn Fn(&str) -> Directive + Send + Sync>;
90
91#[derive(Clone)]
95pub struct SupervisorStrategy {
96 pub kind: StrategyKind,
97 pub max_retries: Option<u32>,
98 pub within: Option<Duration>,
99 pub decider: Decider,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103#[non_exhaustive]
104pub enum StrategyKind {
105 OneForOne,
106 AllForOne,
107}
108
109impl std::fmt::Debug for SupervisorStrategy {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("SupervisorStrategy")
112 .field("kind", &self.kind)
113 .field("max_retries", &self.max_retries)
114 .field("within", &self.within)
115 .finish_non_exhaustive()
116 }
117}
118
119impl Default for SupervisorStrategy {
120 fn default() -> Self {
121 OneForOneStrategy::default().into()
122 }
123}
124
125impl SupervisorStrategy {
126 pub fn decide(&self, err: &str) -> Directive {
127 (self.decider)(err)
128 }
129
130 pub fn on_max_retries(&self) -> Directive {
138 Directive::Escalate
139 }
140}
141
142pub struct OneForOneStrategy {
144 pub max_retries: Option<u32>,
145 pub within: Option<Duration>,
146 pub decider: Decider,
147}
148
149impl Default for OneForOneStrategy {
150 fn default() -> Self {
151 Self {
152 max_retries: Some(10),
153 within: Some(Duration::from_secs(60)),
154 decider: Arc::new(|_| Directive::Restart),
155 }
156 }
157}
158
159impl OneForOneStrategy {
160 pub fn new() -> Self {
161 Self::default()
162 }
163
164 pub fn with_max_retries(mut self, n: u32) -> Self {
165 self.max_retries = Some(n);
166 self
167 }
168
169 pub fn with_within(mut self, d: Duration) -> Self {
170 self.within = Some(d);
171 self
172 }
173
174 pub fn with_decider(mut self, f: impl Fn(&str) -> Directive + Send + Sync + 'static) -> Self {
175 self.decider = Arc::new(f);
176 self
177 }
178}
179
180impl From<OneForOneStrategy> for SupervisorStrategy {
181 fn from(o: OneForOneStrategy) -> Self {
182 Self {
183 kind: StrategyKind::OneForOne,
184 max_retries: o.max_retries,
185 within: o.within,
186 decider: o.decider,
187 }
188 }
189}
190
191pub struct AllForOneStrategy {
193 pub max_retries: Option<u32>,
194 pub within: Option<Duration>,
195 pub decider: Decider,
196}
197
198impl Default for AllForOneStrategy {
199 fn default() -> Self {
200 Self {
201 max_retries: Some(10),
202 within: Some(Duration::from_secs(60)),
203 decider: Arc::new(|_| Directive::Restart),
204 }
205 }
206}
207
208impl From<AllForOneStrategy> for SupervisorStrategy {
209 fn from(o: AllForOneStrategy) -> Self {
210 Self {
211 kind: StrategyKind::AllForOne,
212 max_retries: o.max_retries,
213 within: o.within,
214 decider: o.decider,
215 }
216 }
217}
218
219use crate::actor::Actor;
248
249pub trait SupervisorOf<C: Actor> {
252 type ChildError: std::error::Error + Send + 'static;
256
257 fn decide(&self, _err: &Self::ChildError) -> Directive {
260 Directive::Restart
261 }
262}
263
264#[derive(Debug, thiserror::Error)]
269#[error("{message}")]
270pub struct SupervisionError {
271 pub message: String,
272}
273
274impl SupervisionError {
275 pub fn new(message: impl Into<String>) -> Self {
276 Self { message: message.into() }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::actor::{Actor, Context};
284
285 #[test]
286 fn default_is_one_for_one_restart() {
287 let s = SupervisorStrategy::default();
288 assert_eq!(s.kind, StrategyKind::OneForOne);
289 assert_eq!(s.decide("boom"), Directive::Restart);
290 }
291
292 #[test]
293 fn custom_decider_runs() {
294 let s: SupervisorStrategy = OneForOneStrategy::new()
295 .with_decider(|e| if e == "stop" { Directive::Stop } else { Directive::Resume })
296 .into();
297 assert_eq!(s.decide("stop"), Directive::Stop);
298 assert_eq!(s.decide("keep"), Directive::Resume);
299 }
300
301 #[derive(Default)]
304 struct Boss;
305 #[derive(Default)]
306 struct Worker;
307
308 #[async_trait::async_trait]
309 impl Actor for Boss {
310 type Msg = ();
311 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
312 }
313
314 #[async_trait::async_trait]
315 impl Actor for Worker {
316 type Msg = ();
317 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
318 }
319
320 #[derive(Debug, thiserror::Error)]
321 #[error("worker died: {0}")]
322 struct WorkerError(String);
323
324 impl SupervisorOf<Worker> for Boss {
325 type ChildError = WorkerError;
326 fn decide(&self, _err: &WorkerError) -> Directive {
327 Directive::Stop
328 }
329 }
330
331 #[test]
332 fn explicit_impl_is_resolvable_with_typed_error() {
333 fn assert_typed_decider<P: SupervisorOf<C, ChildError = WorkerError>, C: Actor>() {}
334 assert_typed_decider::<Boss, Worker>();
335 }
336
337 #[test]
338 fn typed_decider_runs() {
339 let boss = Boss;
340 let err = WorkerError("oops".into());
341 let d = SupervisorOf::<Worker>::decide(&boss, &err);
342 assert_eq!(d, Directive::Stop);
343 }
344
345 #[test]
350 fn supervision_error_works_as_default_child_error() {
351 struct Default42;
352 #[async_trait::async_trait]
353 impl Actor for Default42 {
354 type Msg = ();
355 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
356 }
357 struct AnyParent;
358 #[async_trait::async_trait]
359 impl Actor for AnyParent {
360 type Msg = ();
361 async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
362 }
363 impl SupervisorOf<Default42> for AnyParent {
364 type ChildError = SupervisionError;
365 }
366 let p = AnyParent;
367 let err = SupervisionError::new("crash");
368 assert_eq!(SupervisorOf::<Default42>::decide(&p, &err), Directive::Restart);
369 }
370
371 #[test]
372 fn supervision_error_displays_message() {
373 let e = SupervisionError::new("halt");
374 assert_eq!(e.to_string(), "halt");
375 }
376}