atomr_streams/
supervision.rs1use crate::source::Source;
17use futures::stream::StreamExt;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21#[non_exhaustive]
22pub enum SupervisionDirective {
23 Stop,
25 Resume,
27 Restart,
29}
30
31pub type Decider<E> = std::sync::Arc<dyn Fn(&E) -> SupervisionDirective + Send + Sync>;
33
34pub mod deciders {
36 use super::{Decider, SupervisionDirective};
37 use std::sync::Arc;
38
39 pub fn resuming<E: Send + Sync + 'static>() -> Decider<E> {
42 Arc::new(|_| SupervisionDirective::Resume)
43 }
44
45 pub fn stopping<E: Send + Sync + 'static>() -> Decider<E> {
48 Arc::new(|_| SupervisionDirective::Stop)
49 }
50
51 pub fn restarting<E: Send + Sync + 'static>() -> Decider<E> {
53 Arc::new(|_| SupervisionDirective::Restart)
54 }
55}
56
57pub fn with_decider<T, E>(src: Source<Result<T, E>>, decider: Decider<E>) -> Source<T>
60where
61 T: Send + 'static,
62 E: Send + 'static,
63{
64 let inner = src.into_boxed();
65 let mut stopped = false;
66 let stream = inner
67 .take_while(move |item| {
68 let cont = !stopped;
69 if let Err(e) = item {
70 if let SupervisionDirective::Stop = decider(e) {
71 stopped = true;
72 }
73 }
74 futures::future::ready(cont)
75 })
76 .filter_map(|item| futures::future::ready(item.ok()));
77 Source { inner: stream.boxed() }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use crate::sink::Sink;
84
85 #[tokio::test]
86 async fn resuming_decider_drops_errors() {
87 let s: Source<Result<i32, &'static str>> =
88 Source::from_iter(vec![Ok(1), Err("bad"), Ok(2), Err("worse"), Ok(3)]);
89 let out = with_decider(s, deciders::resuming());
90 let collected = Sink::collect(out).await;
91 assert_eq!(collected, vec![1, 2, 3]);
92 }
93
94 #[tokio::test]
95 async fn stopping_decider_terminates_at_first_error() {
96 let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Err("boom"), Ok(99)]);
97 let out = with_decider(s, deciders::stopping());
98 let collected = Sink::collect(out).await;
99 assert_eq!(collected, vec![1, 2]);
100 }
101
102 #[tokio::test]
103 async fn restarting_decider_behaves_like_resume_for_stateless() {
104 let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Err("x"), Ok(7), Err("y"), Ok(8)]);
105 let out = with_decider(s, deciders::restarting());
106 let collected = Sink::collect(out).await;
107 assert_eq!(collected, vec![7, 8]);
108 }
109
110 #[tokio::test]
111 async fn custom_decider_can_inspect_error() {
112 use std::sync::Arc;
113 let decider: Decider<i32> =
114 Arc::new(
115 |e: &i32| {
116 if *e < 0 {
117 SupervisionDirective::Stop
118 } else {
119 SupervisionDirective::Resume
120 }
121 },
122 );
123 let s: Source<Result<i32, i32>> = Source::from_iter(vec![Ok(1), Err(5), Ok(2), Err(-1), Ok(99)]);
124 let out = with_decider(s, decider);
125 let collected = Sink::collect(out).await;
126 assert_eq!(collected, vec![1, 2]);
127 }
128}