atomr_streams/
supervision.rs1use crate::source::Source;
20use futures::stream::StreamExt;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[non_exhaustive]
25pub enum SupervisionDirective {
26 Stop,
28 Resume,
30 Restart,
32}
33
34pub type Decider<E> = std::sync::Arc<dyn Fn(&E) -> SupervisionDirective + Send + Sync>;
36
37pub mod deciders {
39 use super::{Decider, SupervisionDirective};
40 use std::sync::Arc;
41
42 pub fn resuming<E: Send + Sync + 'static>() -> Decider<E> {
45 Arc::new(|_| SupervisionDirective::Resume)
46 }
47
48 pub fn stopping<E: Send + Sync + 'static>() -> Decider<E> {
51 Arc::new(|_| SupervisionDirective::Stop)
52 }
53
54 pub fn restarting<E: Send + Sync + 'static>() -> Decider<E> {
56 Arc::new(|_| SupervisionDirective::Restart)
57 }
58}
59
60pub fn with_decider<T, E>(src: Source<Result<T, E>>, decider: Decider<E>) -> Source<T>
63where
64 T: Send + 'static,
65 E: Send + 'static,
66{
67 let inner = src.into_boxed();
68 let mut stopped = false;
69 let stream = inner
70 .take_while(move |item| {
71 let cont = !stopped;
72 if let Err(e) = item {
73 if let SupervisionDirective::Stop = decider(e) {
74 stopped = true;
75 }
76 }
77 futures::future::ready(cont)
78 })
79 .filter_map(|item| futures::future::ready(item.ok()));
80 Source { inner: stream.boxed() }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86 use crate::sink::Sink;
87
88 #[tokio::test]
89 async fn resuming_decider_drops_errors() {
90 let s: Source<Result<i32, &'static str>> =
91 Source::from_iter(vec![Ok(1), Err("bad"), Ok(2), Err("worse"), Ok(3)]);
92 let out = with_decider(s, deciders::resuming());
93 let collected = Sink::collect(out).await;
94 assert_eq!(collected, vec![1, 2, 3]);
95 }
96
97 #[tokio::test]
98 async fn stopping_decider_terminates_at_first_error() {
99 let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Ok(1), Ok(2), Err("boom"), Ok(99)]);
100 let out = with_decider(s, deciders::stopping());
101 let collected = Sink::collect(out).await;
102 assert_eq!(collected, vec![1, 2]);
103 }
104
105 #[tokio::test]
106 async fn restarting_decider_behaves_like_resume_for_stateless() {
107 let s: Source<Result<i32, &'static str>> = Source::from_iter(vec![Err("x"), Ok(7), Err("y"), Ok(8)]);
108 let out = with_decider(s, deciders::restarting());
109 let collected = Sink::collect(out).await;
110 assert_eq!(collected, vec![7, 8]);
111 }
112
113 #[tokio::test]
114 async fn custom_decider_can_inspect_error() {
115 use std::sync::Arc;
116 let decider: Decider<i32> =
117 Arc::new(
118 |e: &i32| {
119 if *e < 0 {
120 SupervisionDirective::Stop
121 } else {
122 SupervisionDirective::Resume
123 }
124 },
125 );
126 let s: Source<Result<i32, i32>> = Source::from_iter(vec![Ok(1), Err(5), Ok(2), Err(-1), Ok(99)]);
127 let out = with_decider(s, decider);
128 let collected = Sink::collect(out).await;
129 assert_eq!(collected, vec![1, 2]);
130 }
131}