Skip to main content

atomr_core/
supervision.rs

1//! Supervision.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6/// Structured panic payload carried via `std::panic::panic_any` so that
7/// language-binding actors (Python, etc.) can attach typed failure
8/// metadata that the decider can inspect by class name. The
9/// `actor_cell` panic-catch path renders this as
10/// `"<module>.<qualname>: <repr>"` for the legacy `Decider: Fn(&str)`
11/// surface; deciders wishing to match by class can split on the first
12/// `": "` boundary.
13#[derive(Debug, Clone)]
14pub struct PanicPayload {
15    pub module: String,
16    pub qualname: String,
17    pub repr: String,
18}
19
20impl PanicPayload {
21    pub fn new(module: impl Into<String>, qualname: impl Into<String>, repr: impl Into<String>) -> Self {
22        Self { module: module.into(), qualname: qualname.into(), repr: repr.into() }
23    }
24
25    /// Fully-qualified class path, e.g. `"builtins.ValueError"`.
26    pub fn class_path(&self) -> String {
27        if self.module.is_empty() {
28            self.qualname.clone()
29        } else {
30            format!("{}.{}", self.module, self.qualname)
31        }
32    }
33
34    /// Wire format used by `actor_cell::panic_payload_to_string` so
35    /// that legacy `Decider: Fn(&str) -> Directive` deciders can still
36    /// inspect the payload.
37    pub fn to_wire(&self) -> String {
38        format!("{}: {}", self.class_path(), self.repr)
39    }
40}
41
42/// Operating mode a [`Directive::Suspend`] places a child into. Ordered from
43/// least to most restrictive; an actor's [`on_directive`](crate::actor::Actor::on_directive)
44/// hook is expected to gate its outbound effects accordingly.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum SuspendMode {
48    /// Accept only actions that move toward a flat / zero position.
49    FlatOnly,
50    /// Accept only risk-reducing actions (a superset of flat-only intent).
51    RiskReducingOnly,
52    /// Reject all outbound effects until resumed.
53    FullHalt,
54}
55
56/// What the supervisor decides when a child fails — or, for the graded
57/// variants, how it should degrade a *still-running* child.
58///
59/// `Resume`/`Restart`/`Stop`/`Escalate` are the classic crash-recovery
60/// directives. `Throttle`/`Suspend`/`ResumeFrom` are graded operating-mode
61/// changes (FR-6): the supervisor pushes them into a running child via
62/// [`Actor::on_directive`](crate::actor::Actor::on_directive) *without* a
63/// restart, so the child's state is preserved. This lets a risk circuit
64/// breaker reduce order rate/size or move to flat-only instead of bouncing the
65/// actor.
66#[derive(Debug, Clone, Copy, PartialEq)]
67#[non_exhaustive]
68pub enum Directive {
69    Resume,
70    Restart,
71    Stop,
72    Escalate,
73    /// Reduce the child's effective rate/size by `factor` (e.g. `0.25` = quarter)
74    /// for at least `window`. Applied without restart.
75    Throttle {
76        factor: f32,
77        window: Duration,
78    },
79    /// Move the child into a restricted operating `mode` without restart.
80    Suspend {
81        mode: SuspendMode,
82    },
83    /// Step the child back up the ladder to a less-restrictive `mode`.
84    ResumeFrom(SuspendMode),
85}
86
87impl Eq for Directive {}
88
89pub type Decider = Arc<dyn Fn(&str) -> Directive + Send + Sync>;
90
91/// Strategy applied to children of a supervising actor. Splits into
92/// `OneForOne` (each child handled independently) and `AllForOne`
93/// (one child's failure restarts all siblings).
94#[derive(Clone)]
95pub struct SupervisorStrategy {
96    pub kind: StrategyKind,
97    pub max_retries: Option<u32>,
98    pub within: Option<Duration>,
99    pub decider: Decider,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103#[non_exhaustive]
104pub enum StrategyKind {
105    OneForOne,
106    AllForOne,
107}
108
109impl std::fmt::Debug for SupervisorStrategy {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("SupervisorStrategy")
112            .field("kind", &self.kind)
113            .field("max_retries", &self.max_retries)
114            .field("within", &self.within)
115            .finish_non_exhaustive()
116    }
117}
118
119impl Default for SupervisorStrategy {
120    fn default() -> Self {
121        OneForOneStrategy::default().into()
122    }
123}
124
125impl SupervisorStrategy {
126    pub fn decide(&self, err: &str) -> Directive {
127        (self.decider)(err)
128    }
129
130    /// Directive applied when [`Self::max_retries`] within
131    /// [`Self::within`] is exceeded.
132    ///
133    /// Defaults to [`Directive::Escalate`] — matching Akka's behaviour:
134    /// the parent supervisor decides the next step, and at the user-guardian
135    /// root that is equivalent to stopping the cell. Override via a future
136    /// builder method when per-strategy on-overflow behaviour is needed.
137    pub fn on_max_retries(&self) -> Directive {
138        Directive::Escalate
139    }
140}
141
142/// Builder for `OneForOne` — the default.
143pub struct OneForOneStrategy {
144    pub max_retries: Option<u32>,
145    pub within: Option<Duration>,
146    pub decider: Decider,
147}
148
149impl Default for OneForOneStrategy {
150    fn default() -> Self {
151        Self {
152            max_retries: Some(10),
153            within: Some(Duration::from_secs(60)),
154            decider: Arc::new(|_| Directive::Restart),
155        }
156    }
157}
158
159impl OneForOneStrategy {
160    pub fn new() -> Self {
161        Self::default()
162    }
163
164    pub fn with_max_retries(mut self, n: u32) -> Self {
165        self.max_retries = Some(n);
166        self
167    }
168
169    pub fn with_within(mut self, d: Duration) -> Self {
170        self.within = Some(d);
171        self
172    }
173
174    pub fn with_decider(mut self, f: impl Fn(&str) -> Directive + Send + Sync + 'static) -> Self {
175        self.decider = Arc::new(f);
176        self
177    }
178}
179
180impl From<OneForOneStrategy> for SupervisorStrategy {
181    fn from(o: OneForOneStrategy) -> Self {
182        Self {
183            kind: StrategyKind::OneForOne,
184            max_retries: o.max_retries,
185            within: o.within,
186            decider: o.decider,
187        }
188    }
189}
190
191/// Builder for `AllForOne`.
192pub struct AllForOneStrategy {
193    pub max_retries: Option<u32>,
194    pub within: Option<Duration>,
195    pub decider: Decider,
196}
197
198impl Default for AllForOneStrategy {
199    fn default() -> Self {
200        Self {
201            max_retries: Some(10),
202            within: Some(Duration::from_secs(60)),
203            decider: Arc::new(|_| Directive::Restart),
204        }
205    }
206}
207
208impl From<AllForOneStrategy> for SupervisorStrategy {
209    fn from(o: AllForOneStrategy) -> Self {
210        Self {
211            kind: StrategyKind::AllForOne,
212            max_retries: o.max_retries,
213            within: o.within,
214            decider: o.decider,
215        }
216    }
217}
218
219// -- Phase 1.D: typed `SupervisorOf<C>` trait ---------------------------
220//
221// Compile-time supervision contract. The legacy `SupervisorStrategy`
222// value above (a closure-based decider held on `Props`) stays in
223// place as the default runtime policy. `SupervisorOf<C>` layers an
224// **opt-in** per-(parent, child) typed policy on top — see P-8 of
225// `docs/idiomatic-rust.md` and Phase 1.D of
226// `docs/full-port-plan.md`.
227//
228// Rust's coherence rules forbid a blanket-with-override pattern (no
229// stable specialization), so `SupervisorOf<C>` is **not** auto-impl'd
230// for every `(P, C)` pair. Actors that want compile-time-typed child
231// supervision implement it explicitly:
232//
233// ```ignore
234// impl SupervisorOf<Worker> for Boss {
235//     type ChildError = WorkerError;
236//     fn decide(&self, err: &WorkerError) -> Directive { … }
237// }
238// ```
239//
240// Actors without an explicit impl fall through to the legacy
241// closure-based `Props::supervisor_strategy` at runtime — exactly
242// the pre-Phase-1 behaviour. The forthcoming `Context::
243// spawn_supervised::<C>(…)` (Phase 1.D follow-on) will require
244// `Self: SupervisorOf<C>` so that opting into typed supervision is
245// enforced at the call site.
246
247use crate::actor::Actor;
248
249/// A parent actor's typed supervision policy for a specific child
250/// type `C`. Opt-in only — see module docs.
251pub trait SupervisorOf<C: Actor> {
252    /// The child's error type. Implementations choose this; the
253    /// recommended pattern is one error enum per supervised child
254    /// type.
255    type ChildError: std::error::Error + Send + 'static;
256
257    /// Decide what to do when the child fails with `err`. Defaults to
258    /// `Restart`.
259    fn decide(&self, _err: &Self::ChildError) -> Directive {
260        Directive::Restart
261    }
262}
263
264/// Generic boxed-string error suitable for `SupervisorOf` impls that
265/// don't yet have a typed error story (e.g. wrapping a panic
266/// payload). New code should prefer a domain-specific
267/// `#[derive(thiserror::Error)]` enum instead.
268#[derive(Debug, thiserror::Error)]
269#[error("{message}")]
270pub struct SupervisionError {
271    pub message: String,
272}
273
274impl SupervisionError {
275    pub fn new(message: impl Into<String>) -> Self {
276        Self { message: message.into() }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::actor::{Actor, Context};
284
285    #[test]
286    fn default_is_one_for_one_restart() {
287        let s = SupervisorStrategy::default();
288        assert_eq!(s.kind, StrategyKind::OneForOne);
289        assert_eq!(s.decide("boom"), Directive::Restart);
290    }
291
292    #[test]
293    fn custom_decider_runs() {
294        let s: SupervisorStrategy = OneForOneStrategy::new()
295            .with_decider(|e| if e == "stop" { Directive::Stop } else { Directive::Resume })
296            .into();
297        assert_eq!(s.decide("stop"), Directive::Stop);
298        assert_eq!(s.decide("keep"), Directive::Resume);
299    }
300
301    // ---- SupervisorOf<C> --------------------------------------
302
303    #[derive(Default)]
304    struct Boss;
305    #[derive(Default)]
306    struct Worker;
307
308    #[async_trait::async_trait]
309    impl Actor for Boss {
310        type Msg = ();
311        async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
312    }
313
314    #[async_trait::async_trait]
315    impl Actor for Worker {
316        type Msg = ();
317        async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
318    }
319
320    #[derive(Debug, thiserror::Error)]
321    #[error("worker died: {0}")]
322    struct WorkerError(String);
323
324    impl SupervisorOf<Worker> for Boss {
325        type ChildError = WorkerError;
326        fn decide(&self, _err: &WorkerError) -> Directive {
327            Directive::Stop
328        }
329    }
330
331    #[test]
332    fn explicit_impl_is_resolvable_with_typed_error() {
333        fn assert_typed_decider<P: SupervisorOf<C, ChildError = WorkerError>, C: Actor>() {}
334        assert_typed_decider::<Boss, Worker>();
335    }
336
337    #[test]
338    fn typed_decider_runs() {
339        let boss = Boss;
340        let err = WorkerError("oops".into());
341        let d = SupervisorOf::<Worker>::decide(&boss, &err);
342        assert_eq!(d, Directive::Stop);
343    }
344
345    /// Demonstrates the recommended pattern of using
346    /// [`SupervisionError`] for actors that don't yet have a typed
347    /// child-error enum. Future PRs replace this with the domain
348    /// error.
349    #[test]
350    fn supervision_error_works_as_default_child_error() {
351        struct Default42;
352        #[async_trait::async_trait]
353        impl Actor for Default42 {
354            type Msg = ();
355            async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
356        }
357        struct AnyParent;
358        #[async_trait::async_trait]
359        impl Actor for AnyParent {
360            type Msg = ();
361            async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: ()) {}
362        }
363        impl SupervisorOf<Default42> for AnyParent {
364            type ChildError = SupervisionError;
365        }
366        let p = AnyParent;
367        let err = SupervisionError::new("crash");
368        assert_eq!(SupervisorOf::<Default42>::decide(&p, &err), Directive::Restart);
369    }
370
371    #[test]
372    fn supervision_error_displays_message() {
373        let e = SupervisionError::new("halt");
374        assert_eq!(e.to_string(), "halt");
375    }
376}