Skip to main content

atomr_streams/
supervision.rs

1//! Stream-level supervision deciders.
2//!
3//! Stream operators on `Source<Result<T, E>>` consult a [`Decider`]
4//! to decide what to do on each `Err`:
5//!
6//! * `Stop` — terminate the stream (and propagate the error to the
7//!   downstream `Sink::collect_with_status` if used).
8//! * `Resume` — drop the failing element and continue.
9//! * `Restart` — drop element and conceptually reset operator state
10//!   (we surface this as `Resume` for stateless operators).
11//!
12//! `with_decider(src, decider)` returns a `Source<T>` (Result-stripped)
13//! by applying the decider to each `Err` element and emitting only
14//! the `Ok` payloads downstream.
15
16use crate::source::Source;
17use futures::stream::StreamExt;
18
19/// What a [`Decider`] returns for an error.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21#[non_exhaustive]
22pub enum SupervisionDirective {
23    /// Stop the stream — caller should treat as terminal.
24    Stop,
25    /// Drop the failing element; continue with the next.
26    Resume,
27    /// Drop the failing element; conceptually reset operator state.
28    Restart,
29}
30
31/// A decider is a closure mapping `&E → SupervisionDirective`.
32pub type Decider<E> = std::sync::Arc<dyn Fn(&E) -> SupervisionDirective + Send + Sync>;
33
34/// Conventional decider helpers.
35pub mod deciders {
36    use super::{Decider, SupervisionDirective};
37    use std::sync::Arc;
38
39    /// Always `Resume` — never lets a single bad element kill the
40    /// stream.
41    pub fn resuming<E: Send + Sync + 'static>() -> Decider<E> {
42        Arc::new(|_| SupervisionDirective::Resume)
43    }
44
45    /// Always `Stop` — first error tears the stream down (
46    /// default).
47    pub fn stopping<E: Send + Sync + 'static>() -> Decider<E> {
48        Arc::new(|_| SupervisionDirective::Stop)
49    }
50
51    /// Always `Restart`.
52    pub fn restarting<E: Send + Sync + 'static>() -> Decider<E> {
53        Arc::new(|_| SupervisionDirective::Restart)
54    }
55}
56
57/// Apply `decider` to each error in `src`, emitting only the
58/// surviving `Ok` payloads.
59pub fn with_decider<T, E>(src: Source<Result<T, E>>, decider: Decider<E>) -> Source<T>
60where
61    T: Send + 'static,
62    E: Send + 'static,
63{
64    let inner = src.into_boxed();
65    let mut stopped = false;
66    let stream = inner
67        .take_while(move |item| {
68            let cont = !stopped;
69            if let Err(e) = item {
70                if let SupervisionDirective::Stop = decider(e) {
71                    stopped = true;
72                }
73            }
74            futures::future::ready(cont)
75        })
76        .filter_map(|item| futures::future::ready(item.ok()));
77    Source { inner: stream.boxed() }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use crate::sink::Sink;
84
85    #[tokio::test]
86    async fn resuming_decider_drops_errors() {
87        let s: Source<Result<i32, &'static str>> =
88            Source::from_iter(vec![Ok(1), Err("bad"), Ok(2), Err("worse"), Ok(3)]);
89        let out = with_decider(s, deciders::resuming());
90        let collected = Sink::collect(out).await;
91        assert_eq!(collected, vec![1, 2, 3]);
92    }
93
94    #[tokio::test]
95    async fn stopping_decider_terminates_at_first_error() {
96        let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Err("boom"), Ok(99)]);
97        let out = with_decider(s, deciders::stopping());
98        let collected = Sink::collect(out).await;
99        assert_eq!(collected, vec![1, 2]);
100    }
101
102    #[tokio::test]
103    async fn restarting_decider_behaves_like_resume_for_stateless() {
104        let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Err("x"), Ok(7), Err("y"), Ok(8)]);
105        let out = with_decider(s, deciders::restarting());
106        let collected = Sink::collect(out).await;
107        assert_eq!(collected, vec![7, 8]);
108    }
109
110    #[tokio::test]
111    async fn custom_decider_can_inspect_error() {
112        use std::sync::Arc;
113        let decider: Decider<i32> =
114            Arc::new(
115                |e: &i32| {
116                    if *e < 0 {
117                        SupervisionDirective::Stop
118                    } else {
119                        SupervisionDirective::Resume
120                    }
121                },
122            );
123        let s: Source<Result<i32, i32>> = Source::from_iter(vec![Ok(1), Err(5), Ok(2), Err(-1), Ok(99)]);
124        let out = with_decider(s, decider);
125        let collected = Sink::collect(out).await;
126        assert_eq!(collected, vec![1, 2]);
127    }
128}