Skip to main content

atomr_streams/
supervision.rs

1//! Stream-level supervision deciders.
2//!
3//! Phase 12.4 of `docs/full-port-plan.md`. Akka.NET / Akka Streams
4//! parity: `Supervision.Decider` + `withAttributes(supervisionStrategy(…))`.
5//!
6//! Stream operators on `Source<Result<T, E>>` consult a [`Decider`]
7//! to decide what to do on each `Err`:
8//!
9//! * `Stop` — terminate the stream (and propagate the error to the
10//!   downstream `Sink::collect_with_status` if used).
11//! * `Resume` — drop the failing element and continue.
12//! * `Restart` — drop element and conceptually reset operator state
13//!   (we surface this as `Resume` for stateless operators).
14//!
15//! `with_decider(src, decider)` returns a `Source<T>` (Result-stripped)
16//! by applying the decider to each `Err` element and emitting only
17//! the `Ok` payloads downstream.
18
19use crate::source::Source;
20use futures::stream::StreamExt;
21
22/// What a [`Decider`] returns for an error.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum SupervisionDirective {
26    /// Stop the stream — caller should treat as terminal.
27    Stop,
28    /// Drop the failing element; continue with the next.
29    Resume,
30    /// Drop the failing element; conceptually reset operator state.
31    Restart,
32}
33
34/// A decider is a closure mapping `&E → SupervisionDirective`.
35pub type Decider<E> = std::sync::Arc<dyn Fn(&E) -> SupervisionDirective + Send + Sync>;
36
37/// Conventional decider helpers.
38pub mod deciders {
39    use super::{Decider, SupervisionDirective};
40    use std::sync::Arc;
41
42    /// Always `Resume` — never lets a single bad element kill the
43    /// stream.
44    pub fn resuming<E: Send + Sync + 'static>() -> Decider<E> {
45        Arc::new(|_| SupervisionDirective::Resume)
46    }
47
48    /// Always `Stop` — first error tears the stream down (akka.net
49    /// default).
50    pub fn stopping<E: Send + Sync + 'static>() -> Decider<E> {
51        Arc::new(|_| SupervisionDirective::Stop)
52    }
53
54    /// Always `Restart`.
55    pub fn restarting<E: Send + Sync + 'static>() -> Decider<E> {
56        Arc::new(|_| SupervisionDirective::Restart)
57    }
58}
59
60/// Apply `decider` to each error in `src`, emitting only the
61/// surviving `Ok` payloads.
62pub fn with_decider<T, E>(src: Source<Result<T, E>>, decider: Decider<E>) -> Source<T>
63where
64    T: Send + 'static,
65    E: Send + 'static,
66{
67    let inner = src.into_boxed();
68    let mut stopped = false;
69    let stream = inner
70        .take_while(move |item| {
71            let cont = !stopped;
72            if let Err(e) = item {
73                if let SupervisionDirective::Stop = decider(e) {
74                    stopped = true;
75                }
76            }
77            futures::future::ready(cont)
78        })
79        .filter_map(|item| futures::future::ready(item.ok()));
80    Source { inner: stream.boxed() }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use crate::sink::Sink;
87
88    #[tokio::test]
89    async fn resuming_decider_drops_errors() {
90        let s: Source<Result<i32, &'static str>> =
91            Source::from_iter(vec![Ok(1), Err("bad"), Ok(2), Err("worse"), Ok(3)]);
92        let out = with_decider(s, deciders::resuming());
93        let collected = Sink::collect(out).await;
94        assert_eq!(collected, vec![1, 2, 3]);
95    }
96
97    #[tokio::test]
98    async fn stopping_decider_terminates_at_first_error() {
99        let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Err("boom"), Ok(99)]);
100        let out = with_decider(s, deciders::stopping());
101        let collected = Sink::collect(out).await;
102        assert_eq!(collected, vec![1, 2]);
103    }
104
105    #[tokio::test]
106    async fn restarting_decider_behaves_like_resume_for_stateless() {
107        let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Err("x"), Ok(7), Err("y"), Ok(8)]);
108        let out = with_decider(s, deciders::restarting());
109        let collected = Sink::collect(out).await;
110        assert_eq!(collected, vec![7, 8]);
111    }
112
113    #[tokio::test]
114    async fn custom_decider_can_inspect_error() {
115        use std::sync::Arc;
116        let decider: Decider<i32> =
117            Arc::new(
118                |e: &i32| {
119                    if *e < 0 {
120                        SupervisionDirective::Stop
121                    } else {
122                        SupervisionDirective::Resume
123                    }
124                },
125            );
126        let s: Source<Result<i32, i32>> = Source::from_iter(vec![Ok(1), Err(5), Ok(2), Err(-1), Ok(99)]);
127        let out = with_decider(s, decider);
128        let collected = Sink::collect(out).await;
129        assert_eq!(collected, vec![1, 2]);
130    }
131}