Skip to main content

datum/stream/
restart.rs

1//! Backoff supervision wrappers. `RestartSource`/`RestartFlow`/`RestartSink`
2//! rematerialize a failed (or, for `with_backoff`, also completed) inner component
3//! with exponential backoff; `RetryFlow` retries individual elements through a
4//! one-in/one-out flow. All are configured via `RestartSettings`.
5//!
6//! Restart wrappers are lossy across a restart: the triggering element and any
7//! elements buffered in the restarted component are dropped.
8
9use super::*;
10use crate::context::FlowWithContext;
11use crate::stream::error::panic_stream_error;
12use crate::stream::flow::FlowTransform;
13use std::{
14    marker::PhantomData,
15    sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
16    time::Instant,
17};
18
19static BACKOFF_RANDOM_SEED: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
20
21/// Backoff settings for [`RestartSource`], [`RestartFlow`], and [`RestartSink`].
22///
23/// The restart counter follows Akka's `maxRestartsWithin` rule: once the reset
24/// window elapses, both the restart count and exponential backoff position are
25/// reset. The random factor adds up to `random_factor * computed_backoff` extra
26/// delay.
27#[derive(Clone, Debug)]
28pub struct RestartSettings {
29    min_backoff: Duration,
30    max_backoff: Duration,
31    random_factor: f64,
32    max_restarts: usize,
33    max_restarts_within: Duration,
34}
35
36impl RestartSettings {
37    #[must_use]
38    pub fn new(min_backoff: Duration, max_backoff: Duration, random_factor: f64) -> Self {
39        assert!(min_backoff <= max_backoff);
40        assert!(random_factor >= 0.0);
41        Self {
42            min_backoff,
43            max_backoff,
44            random_factor,
45            max_restarts: usize::MAX,
46            max_restarts_within: min_backoff,
47        }
48    }
49
50    #[must_use]
51    pub fn min_backoff(&self) -> Duration {
52        self.min_backoff
53    }
54
55    #[must_use]
56    pub fn max_backoff(&self) -> Duration {
57        self.max_backoff
58    }
59
60    #[must_use]
61    pub fn random_factor(&self) -> f64 {
62        self.random_factor
63    }
64
65    #[must_use]
66    pub fn max_restarts(&self) -> usize {
67        self.max_restarts
68    }
69
70    #[must_use]
71    pub fn max_restarts_within(&self) -> Duration {
72        self.max_restarts_within
73    }
74
75    #[must_use]
76    pub fn with_min_backoff(mut self, value: Duration) -> Self {
77        assert!(value <= self.max_backoff);
78        self.min_backoff = value;
79        self
80    }
81
82    #[must_use]
83    pub fn with_max_backoff(mut self, value: Duration) -> Self {
84        assert!(self.min_backoff <= value);
85        self.max_backoff = value;
86        self
87    }
88
89    #[must_use]
90    pub fn with_random_factor(mut self, value: f64) -> Self {
91        assert!(value >= 0.0);
92        self.random_factor = value;
93        self
94    }
95
96    #[must_use]
97    pub fn with_max_restarts(mut self, count: usize, within: Duration) -> Self {
98        self.max_restarts = count;
99        self.max_restarts_within = within;
100        self
101    }
102}
103
104#[derive(Clone, Copy, Debug, PartialEq, Eq)]
105enum RestartCause {
106    Failure,
107    Completion,
108}
109
110enum RestartDecision<T> {
111    Restart,
112    Finish(Option<StreamResult<T>>),
113}
114
115struct RestartBackoff {
116    settings: RestartSettings,
117    window_start: Instant,
118    restarts_in_window: usize,
119}
120
121impl RestartBackoff {
122    fn new(settings: RestartSettings) -> Self {
123        Self {
124            settings,
125            window_start: Instant::now(),
126            restarts_in_window: 0,
127        }
128    }
129
130    fn next_delay(&mut self) -> Option<Duration> {
131        let now = Instant::now();
132        if now.duration_since(self.window_start) > self.settings.max_restarts_within {
133            self.window_start = now;
134            self.restarts_in_window = 0;
135        }
136        if self.restarts_in_window >= self.settings.max_restarts {
137            return None;
138        }
139
140        let exponent = (self.restarts_in_window.min(31) as i32).max(0);
141        let base = self
142            .settings
143            .min_backoff
144            .mul_f64(2_f64.powi(exponent))
145            .min(self.settings.max_backoff);
146        self.restarts_in_window += 1;
147        if self.settings.random_factor == 0.0 || base.is_zero() {
148            Some(base)
149        } else {
150            let jitter = base.mul_f64(next_random_fraction() * self.settings.random_factor);
151            Some(base.saturating_add(jitter))
152        }
153    }
154}
155
156fn next_random_fraction() -> f64 {
157    let mut current = BACKOFF_RANDOM_SEED.load(AtomicOrdering::Relaxed);
158    loop {
159        let mut next = current;
160        next ^= next << 13;
161        next ^= next >> 7;
162        next ^= next << 17;
163        match BACKOFF_RANDOM_SEED.compare_exchange_weak(
164            current,
165            next,
166            AtomicOrdering::Relaxed,
167            AtomicOrdering::Relaxed,
168        ) {
169            Ok(_) => return ((next >> 11) as f64) / ((1_u64 << 53) as f64),
170            Err(observed) => current = observed,
171        }
172    }
173}
174
175fn wait_backoff(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
176    if delay.is_zero() {
177        return Ok(());
178    }
179    let (sender, receiver) = std::sync::mpsc::sync_channel(1);
180    let _timer = materializer.schedule_once(delay, move || {
181        let _ = sender.send(());
182    });
183
184    loop {
185        if materializer.is_shutdown() {
186            return Err(StreamError::AbruptTermination);
187        }
188        if super::runtime::current_stream_cancelled()
189            .as_ref()
190            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
191        {
192            return Err(StreamError::Cancelled);
193        }
194        // The timer wake arrives on the channel; the bounded recv exists only
195        // so shutdown/cancellation are observed promptly. 25 ms keeps cancel
196        // latency low without busy-spinning a long backoff (a 30 s max
197        // backoff parks ~1.2k times instead of 30k at 1 ms).
198        match receiver.recv_timeout(Duration::from_millis(25)) {
199            Ok(()) => return Ok(()),
200            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
201            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
202                return Err(StreamError::AbruptTermination);
203            }
204        }
205    }
206}
207
208fn invoke_factory<T, F>(context: &str, factory: &F) -> StreamResult<T>
209where
210    F: Fn() -> T,
211{
212    catch_unwind(AssertUnwindSafe(factory)).map_err(|_| panic_stream_error(context))
213}
214
215/// Akka-style restarting source wrapper.
216///
217/// `with_backoff` restarts when the wrapped source fails or completes.
218/// `on_failures_with_backoff` restarts only on failure. During the backoff the
219/// downstream pull is backpressured by waiting on the materializer's central
220/// timer driver; no per-restart timer threads are spawned.
221pub struct RestartSource;
222
223impl RestartSource {
224    #[must_use]
225    pub fn with_backoff<Out, Mat, F>(settings: RestartSettings, factory: F) -> Source<Out>
226    where
227        Out: Send + 'static,
228        Mat: Send + 'static,
229        F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
230    {
231        restart_source(settings, factory, false)
232    }
233
234    #[must_use]
235    pub fn on_failures_with_backoff<Out, Mat, F>(
236        settings: RestartSettings,
237        factory: F,
238    ) -> Source<Out>
239    where
240        Out: Send + 'static,
241        Mat: Send + 'static,
242        F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
243    {
244        restart_source(settings, factory, true)
245    }
246}
247
248fn restart_source<Out, Mat, F>(
249    settings: RestartSettings,
250    factory: F,
251    only_on_failures: bool,
252) -> Source<Out>
253where
254    Out: Send + 'static,
255    Mat: Send + 'static,
256    F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
257{
258    let factory = Arc::new(factory);
259    Source::from_materialized_factory(move |materializer| {
260        Ok((
261            Box::new(RestartSourceStream {
262                factory: Arc::clone(&factory),
263                materializer: materializer.clone(),
264                backoff: RestartBackoff::new(settings.clone()),
265                current: None,
266                only_on_failures,
267                terminal: None,
268                _marker: PhantomData::<Mat>,
269            }),
270            NotUsed,
271        ))
272    })
273}
274
275struct RestartSourceStream<Out, Mat, F> {
276    factory: Arc<F>,
277    materializer: Materializer,
278    backoff: RestartBackoff,
279    current: Option<BoxStream<Out>>,
280    only_on_failures: bool,
281    terminal: Option<StreamResult<()>>,
282    _marker: PhantomData<Mat>,
283}
284
285impl<Out, Mat, F> RestartSourceStream<Out, Mat, F>
286where
287    Out: Send + 'static,
288    Mat: Send + 'static,
289    F: Fn() -> Source<Out, Mat>,
290{
291    fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
292        let source = invoke_factory("RestartSource factory", self.factory.as_ref())?;
293        Arc::clone(&source.factory)
294            .create(&self.materializer)
295            .map(|(stream, _)| stream)
296    }
297
298    fn restart_or_finish(
299        &mut self,
300        cause: RestartCause,
301        error: Option<StreamError>,
302    ) -> RestartDecision<Out> {
303        if self.only_on_failures && cause == RestartCause::Completion {
304            return RestartDecision::Finish(None);
305        }
306        let Some(delay) = self.backoff.next_delay() else {
307            return RestartDecision::Finish(error.map(Err));
308        };
309        if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
310            return RestartDecision::Finish(Some(Err(wait_error)));
311        }
312        RestartDecision::Restart
313    }
314
315    fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
316        match result {
317            Some(Err(error)) => {
318                self.terminal = Some(Err(error.clone()));
319                Some(Err(error))
320            }
321            Some(Ok(_)) | None => {
322                self.terminal = Some(Ok(()));
323                None
324            }
325        }
326    }
327}
328
329impl<Out, Mat, F> Iterator for RestartSourceStream<Out, Mat, F>
330where
331    Out: Send + 'static,
332    Mat: Send + 'static,
333    F: Fn() -> Source<Out, Mat>,
334{
335    type Item = StreamResult<Out>;
336
337    fn next(&mut self) -> Option<Self::Item> {
338        if let Some(terminal) = &self.terminal {
339            return match terminal {
340                Ok(()) => None,
341                Err(error) => Some(Err(error.clone())),
342            };
343        }
344        loop {
345            if self.current.is_none() {
346                match self.rematerialize() {
347                    Ok(stream) => self.current = Some(stream),
348                    Err(error) => {
349                        match self.restart_or_finish(RestartCause::Failure, Some(error)) {
350                            RestartDecision::Restart => {}
351                            RestartDecision::Finish(result) => return self.finish(result),
352                        }
353                        continue;
354                    }
355                }
356            }
357
358            let next = self
359                .current
360                .as_mut()
361                .expect("restart source child exists")
362                .next();
363            match next {
364                Some(Ok(item)) => return Some(Ok(item)),
365                Some(Err(error)) => {
366                    self.current = None;
367                    match self.restart_or_finish(RestartCause::Failure, Some(error)) {
368                        RestartDecision::Restart => {}
369                        RestartDecision::Finish(result) => return self.finish(result),
370                    }
371                }
372                None => {
373                    self.current = None;
374                    match self.restart_or_finish(RestartCause::Completion, None) {
375                        RestartDecision::Restart => {}
376                        RestartDecision::Finish(result) => return self.finish(result),
377                    }
378                }
379            }
380        }
381    }
382}
383
384struct SharedInput<In> {
385    inner: Arc<Mutex<SharedInputState<In>>>,
386}
387
388impl<In> Clone for SharedInput<In> {
389    fn clone(&self) -> Self {
390        Self {
391            inner: Arc::clone(&self.inner),
392        }
393    }
394}
395
396struct SharedInputState<In> {
397    input: Option<BoxStream<In>>,
398    exhausted: bool,
399}
400
401impl<In> SharedInput<In> {
402    fn new(input: BoxStream<In>) -> Self {
403        Self {
404            inner: Arc::new(Mutex::new(SharedInputState {
405                input: Some(input),
406                exhausted: false,
407            })),
408        }
409    }
410
411    fn stream(&self) -> SharedInputStream<In> {
412        SharedInputStream {
413            shared: self.clone(),
414        }
415    }
416
417    fn is_exhausted(&self) -> bool {
418        self.inner
419            .lock()
420            .unwrap_or_else(|poison| poison.into_inner())
421            .exhausted
422    }
423}
424
425struct SharedInputStream<In> {
426    shared: SharedInput<In>,
427}
428
429impl<In> Iterator for SharedInputStream<In> {
430    type Item = StreamResult<In>;
431
432    fn next(&mut self) -> Option<Self::Item> {
433        let mut input = {
434            let mut state = self
435                .shared
436                .inner
437                .lock()
438                .unwrap_or_else(|poison| poison.into_inner());
439            if state.exhausted {
440                return None;
441            }
442            state.input.take().expect("shared input present")
443        };
444        let next = input.next();
445        let mut state = self
446            .shared
447            .inner
448            .lock()
449            .unwrap_or_else(|poison| poison.into_inner());
450        if next.is_none() {
451            state.exhausted = true;
452        }
453        state.input = Some(input);
454        next
455    }
456}
457
458/// Akka-style restarting flow wrapper.
459///
460/// Restart flows are lossy across restarts: the element that triggers the
461/// wrapped flow failure, and any elements already pulled by that wrapped flow,
462/// are dropped. During backoff, downstream demand is held until the central
463/// materializer timer fires and the child flow is rematerialized.
464pub struct RestartFlow;
465
466impl RestartFlow {
467    #[must_use]
468    pub fn with_backoff<In, Out, Mat, F>(settings: RestartSettings, factory: F) -> Flow<In, Out>
469    where
470        In: Send + 'static,
471        Out: Send + 'static,
472        Mat: Send + 'static,
473        F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
474    {
475        restart_flow(settings, factory, false)
476    }
477
478    #[must_use]
479    pub fn on_failures_with_backoff<In, Out, Mat, F>(
480        settings: RestartSettings,
481        factory: F,
482    ) -> Flow<In, Out>
483    where
484        In: Send + 'static,
485        Out: Send + 'static,
486        Mat: Send + 'static,
487        F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
488    {
489        restart_flow(settings, factory, true)
490    }
491}
492
493fn restart_flow<In, Out, Mat, F>(
494    settings: RestartSettings,
495    factory: F,
496    only_on_failures: bool,
497) -> Flow<In, Out>
498where
499    In: Send + 'static,
500    Out: Send + 'static,
501    Mat: Send + 'static,
502    F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
503{
504    let factory = Arc::new(factory);
505    Flow::from_runtime_transform(move |input, materializer| {
506        let shared = SharedInput::new(input);
507        Ok(Box::new(RestartFlowStream {
508            factory: Arc::clone(&factory),
509            materializer: materializer.clone(),
510            shared,
511            backoff: RestartBackoff::new(settings.clone()),
512            current: None,
513            only_on_failures,
514            terminal: None,
515            _marker: PhantomData::<Mat>,
516        }))
517    })
518}
519
520struct RestartFlowStream<In, Out, Mat, F> {
521    factory: Arc<F>,
522    materializer: Materializer,
523    shared: SharedInput<In>,
524    backoff: RestartBackoff,
525    current: Option<BoxStream<Out>>,
526    only_on_failures: bool,
527    terminal: Option<StreamResult<()>>,
528    _marker: PhantomData<Mat>,
529}
530
531impl<In, Out, Mat, F> RestartFlowStream<In, Out, Mat, F>
532where
533    In: Send + 'static,
534    Out: Send + 'static,
535    Mat: Send + 'static,
536    F: Fn() -> Flow<In, Out, Mat>,
537{
538    fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
539        let flow = invoke_factory("RestartFlow factory", self.factory.as_ref())?;
540        (flow.materialize)()?;
541        let input = Box::new(self.shared.stream()) as BoxStream<In>;
542        match flow.transform {
543            FlowTransform::Pure(transform) => Ok(transform(input)),
544            FlowTransform::Runtime(transform) => transform(input, &self.materializer),
545        }
546    }
547
548    fn restart_or_finish(
549        &mut self,
550        cause: RestartCause,
551        error: Option<StreamError>,
552    ) -> RestartDecision<Out> {
553        if self.shared.is_exhausted() && cause == RestartCause::Completion {
554            return RestartDecision::Finish(None);
555        }
556        if self.only_on_failures && cause == RestartCause::Completion {
557            return RestartDecision::Finish(None);
558        }
559        let Some(delay) = self.backoff.next_delay() else {
560            return RestartDecision::Finish(error.map(Err));
561        };
562        if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
563            return RestartDecision::Finish(Some(Err(wait_error)));
564        }
565        RestartDecision::Restart
566    }
567
568    fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
569        match result {
570            Some(Err(error)) => {
571                self.terminal = Some(Err(error.clone()));
572                Some(Err(error))
573            }
574            Some(Ok(_)) | None => {
575                self.terminal = Some(Ok(()));
576                None
577            }
578        }
579    }
580}
581
582impl<In, Out, Mat, F> Iterator for RestartFlowStream<In, Out, Mat, F>
583where
584    In: Send + 'static,
585    Out: Send + 'static,
586    Mat: Send + 'static,
587    F: Fn() -> Flow<In, Out, Mat>,
588{
589    type Item = StreamResult<Out>;
590
591    fn next(&mut self) -> Option<Self::Item> {
592        if let Some(terminal) = &self.terminal {
593            return match terminal {
594                Ok(()) => None,
595                Err(error) => Some(Err(error.clone())),
596            };
597        }
598        loop {
599            if self.current.is_none() {
600                if self.shared.is_exhausted() {
601                    return None;
602                }
603                match self.rematerialize() {
604                    Ok(stream) => self.current = Some(stream),
605                    Err(error) => {
606                        match self.restart_or_finish(RestartCause::Failure, Some(error)) {
607                            RestartDecision::Restart => {}
608                            RestartDecision::Finish(result) => return self.finish(result),
609                        }
610                        continue;
611                    }
612                }
613            }
614
615            let next = self
616                .current
617                .as_mut()
618                .expect("restart flow child exists")
619                .next();
620            match next {
621                Some(Ok(item)) => return Some(Ok(item)),
622                Some(Err(error)) => {
623                    self.current = None;
624                    match self.restart_or_finish(RestartCause::Failure, Some(error)) {
625                        RestartDecision::Restart => {}
626                        RestartDecision::Finish(result) => return self.finish(result),
627                    }
628                }
629                None => {
630                    self.current = None;
631                    match self.restart_or_finish(RestartCause::Completion, None) {
632                        RestartDecision::Restart => {}
633                        RestartDecision::Finish(result) => return self.finish(result),
634                    }
635                }
636            }
637        }
638    }
639}
640
641/// Akka-style restarting sink wrapper.
642///
643/// Datum's current sink wrapper accepts factories that materialize
644/// `StreamCompletion<NotUsed>` so the outer sink can wait for each child sink
645/// before deciding whether to restart. Like Akka's restart sink, upstream is
646/// backpressured during backoff.
647pub struct RestartSink;
648
649impl RestartSink {
650    #[must_use]
651    pub fn with_backoff<In, F>(
652        settings: RestartSettings,
653        factory: F,
654    ) -> Sink<In, StreamCompletion<NotUsed>>
655    where
656        In: Send + 'static,
657        F: Fn() -> Sink<In, StreamCompletion<NotUsed>> + Send + Sync + 'static,
658    {
659        let factory = Arc::new(factory);
660        Sink::from_runner(move |input, materializer| {
661            let materializer = materializer.clone();
662            let factory = Arc::clone(&factory);
663            let settings = settings.clone();
664            let state = Arc::clone(&materializer.inner.state);
665            Ok(materializer.clone().spawn_stream(move |cancelled| {
666                let checked = runtime_checked_stream(input, state, Some(cancelled));
667                let shared = SharedInput::new(Box::new(checked) as BoxStream<In>);
668                let mut backoff = RestartBackoff::new(settings.clone());
669                loop {
670                    if shared.is_exhausted() {
671                        return Ok(NotUsed);
672                    }
673                    let sink = invoke_factory("RestartSink factory", factory.as_ref())?;
674                    let completion = sink.run(Box::new(shared.stream()), &materializer)?;
675                    match completion.wait() {
676                        Ok(_) if shared.is_exhausted() => return Ok(NotUsed),
677                        Ok(_) => {
678                            let Some(delay) = backoff.next_delay() else {
679                                return Ok(NotUsed);
680                            };
681                            wait_backoff(&materializer, delay)?;
682                        }
683                        Err(error) => {
684                            let Some(delay) = backoff.next_delay() else {
685                                return Err(error);
686                            };
687                            wait_backoff(&materializer, delay)?;
688                        }
689                    }
690                }
691            }))
692        })
693    }
694}
695
696/// Akka-style `RetryFlow.withBackoff` helpers.
697///
698/// The wrapped flow must be one-in/one-out. Datum validates that at runtime and
699/// fails the stream if the wrapped flow emits zero or multiple outputs for a
700/// retried element.
701pub struct RetryFlow;
702
703type RetryDecider<In, Out> = Arc<dyn Fn(&In, &Out) -> Option<In> + Send + Sync>;
704
705impl RetryFlow {
706    #[must_use]
707    pub fn with_backoff<In, Out, Mat, Decide>(
708        min_backoff: Duration,
709        max_backoff: Duration,
710        random_factor: f64,
711        max_retries: usize,
712        flow: Flow<In, Out, Mat>,
713        decide_retry: Decide,
714    ) -> Flow<In, Out>
715    where
716        In: Clone + Send + 'static,
717        Out: Send + 'static,
718        Mat: Send + 'static,
719        Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
720    {
721        retry_flow(
722            min_backoff,
723            max_backoff,
724            random_factor,
725            max_retries,
726            flow,
727            Arc::new(decide_retry),
728        )
729    }
730
731    #[must_use]
732    pub fn with_backoff_and_context<In, CtxIn, Out, CtxOut, Mat, Decide>(
733        min_backoff: Duration,
734        max_backoff: Duration,
735        random_factor: f64,
736        max_retries: usize,
737        flow: FlowWithContext<In, CtxIn, Out, CtxOut, Mat>,
738        decide_retry: Decide,
739    ) -> FlowWithContext<In, CtxIn, Out, CtxOut>
740    where
741        In: Clone + Send + 'static,
742        CtxIn: Clone + Send + 'static,
743        Out: Send + 'static,
744        CtxOut: Send + 'static,
745        Mat: Send + 'static,
746        Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
747    {
748        let decide_retry = Arc::new(decide_retry);
749        let delegate = retry_flow(
750            min_backoff,
751            max_backoff,
752            random_factor,
753            max_retries,
754            flow.as_flow(),
755            Arc::new(move |input: &(In, CtxIn), output: &(Out, CtxOut)| {
756                decide_retry(&input.0, &output.0).map(|next| (next, input.1.clone()))
757            }),
758        );
759        FlowWithContext::from_flow(delegate)
760    }
761}
762
763fn retry_flow<In, Out, Mat>(
764    min_backoff: Duration,
765    max_backoff: Duration,
766    random_factor: f64,
767    max_retries: usize,
768    flow: Flow<In, Out, Mat>,
769    decide_retry: RetryDecider<In, Out>,
770) -> Flow<In, Out>
771where
772    In: Clone + Send + 'static,
773    Out: Send + 'static,
774    Mat: Send + 'static,
775{
776    let settings = RestartSettings::new(min_backoff, max_backoff, random_factor)
777        .with_max_restarts(max_retries, max_backoff.max(min_backoff));
778    Flow::from_runtime_transform(move |input, materializer| {
779        Ok(Box::new(RetryFlowStream {
780            input,
781            materializer: materializer.clone(),
782            flow: flow.clone(),
783            decide_retry: Arc::clone(&decide_retry),
784            settings: settings.clone(),
785        }))
786    })
787}
788
789struct RetryFlowStream<In, Out, Mat> {
790    input: BoxStream<In>,
791    materializer: Materializer,
792    flow: Flow<In, Out, Mat>,
793    decide_retry: RetryDecider<In, Out>,
794    settings: RestartSettings,
795}
796
797impl<In, Out, Mat> RetryFlowStream<In, Out, Mat>
798where
799    In: Clone + Send + 'static,
800    Out: Send + 'static,
801    Mat: Send + 'static,
802{
803    fn run_once(&self, item: In) -> StreamResult<Out> {
804        (self.flow.materialize)()?;
805        let input = Box::new(std::iter::once(Ok(item))) as BoxStream<In>;
806        let mut stream = match &self.flow.transform {
807            FlowTransform::Pure(transform) => transform(input),
808            FlowTransform::Runtime(transform) => transform(input, &self.materializer)?,
809        };
810        let output = match stream.next() {
811            Some(Ok(output)) => output,
812            Some(Err(error)) => return Err(error),
813            None => {
814                return Err(StreamError::Failed(
815                    "RetryFlow inner flow produced no output".to_owned(),
816                ));
817            }
818        };
819        if stream.next().is_some() {
820            return Err(StreamError::Failed(
821                "RetryFlow inner flow produced more than one output".to_owned(),
822            ));
823        }
824        Ok(output)
825    }
826}
827
828impl<In, Out, Mat> Iterator for RetryFlowStream<In, Out, Mat>
829where
830    In: Clone + Send + 'static,
831    Out: Send + 'static,
832    Mat: Send + 'static,
833{
834    type Item = StreamResult<Out>;
835
836    fn next(&mut self) -> Option<Self::Item> {
837        let original = match self.input.next()? {
838            Ok(item) => item,
839            Err(error) => return Some(Err(error)),
840        };
841        let mut current = original.clone();
842        let mut retries = 0_usize;
843        let mut backoff = RestartBackoff::new(self.settings.clone());
844        loop {
845            let output = match self.run_once(current.clone()) {
846                Ok(output) => output,
847                Err(error) => return Some(Err(error)),
848            };
849            let retry =
850                match catch_unwind(AssertUnwindSafe(|| (self.decide_retry)(&original, &output))) {
851                    Ok(retry) => retry,
852                    Err(_) => return Some(Err(panic_stream_error("RetryFlow decider"))),
853                };
854            let Some(next_input) = retry else {
855                return Some(Ok(output));
856            };
857            if retries >= self.settings.max_restarts {
858                return Some(Ok(output));
859            }
860            let delay = backoff.next_delay().unwrap_or(self.settings.max_backoff);
861            if let Err(error) = wait_backoff(&self.materializer, delay) {
862                return Some(Err(error));
863            }
864            retries += 1;
865            current = next_input;
866        }
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::*;
873    use std::sync::{
874        Arc,
875        atomic::{AtomicUsize, Ordering},
876    };
877
878    fn boom() -> StreamError {
879        StreamError::Failed("boom".to_owned())
880    }
881
882    #[test]
883    fn supervised_map_result_stops_by_default_and_resumes_when_requested() {
884        let stopped = Source::from_iter([1, 2, 3, 4])
885            .try_map(|item| if item == 3 { Err(boom()) } else { Ok(item) })
886            .run_collect();
887        assert_eq!(stopped, Err(boom()));
888
889        let resumed = Source::from_iter([1, 2, 3, 4])
890            .map_result_with_supervision(
891                |item| if item == 3 { Err(boom()) } else { Ok(item) },
892                Supervision::resuming_decider(),
893            )
894            .run_collect()
895            .unwrap();
896        assert_eq!(resumed, vec![1, 2, 4]);
897    }
898
899    #[test]
900    fn supervised_scan_restart_resets_state_and_reemits_seed() {
901        let resumed = Source::from_iter([1, 3, -1, 5, 7])
902            .scan_result_with_supervision(
903                0,
904                |acc, item| {
905                    if item < 0 {
906                        Err(boom())
907                    } else {
908                        Ok(acc + item)
909                    }
910                },
911                Supervision::resuming_decider(),
912            )
913            .run_collect()
914            .unwrap();
915        assert_eq!(resumed, vec![0, 1, 4, 9, 16]);
916
917        let restarted = Source::from_iter([1, 3, -1, 5, 7])
918            .scan_result_with_supervision(
919                0,
920                |acc, item| {
921                    if item < 0 {
922                        Err(boom())
923                    } else {
924                        Ok(acc + item)
925                    }
926                },
927                Supervision::restarting_decider(),
928            )
929            .run_collect()
930            .unwrap();
931        assert_eq!(restarted, vec![0, 1, 4, 0, 5, 12]);
932    }
933
934    #[test]
935    fn supervised_fold_restart_resets_accumulator() {
936        let resumed = Source::from_iter(1..=5)
937            .run_with(Sink::fold_result_with_supervision(
938                0,
939                |acc, item| {
940                    if item == 3 {
941                        Err(boom())
942                    } else {
943                        Ok(acc + item)
944                    }
945                },
946                Supervision::resuming_decider(),
947            ))
948            .unwrap()
949            .wait()
950            .unwrap();
951        assert_eq!(resumed, 12);
952
953        let restarted = Source::from_iter(1..=5)
954            .run_with(Sink::fold_result_with_supervision(
955                0,
956                |acc, item| {
957                    if item == 3 {
958                        Err(boom())
959                    } else {
960                        Ok(acc + item)
961                    }
962                },
963                Supervision::restarting_decider(),
964            ))
965            .unwrap()
966            .wait()
967            .unwrap();
968        assert_eq!(restarted, 9);
969    }
970
971    #[test]
972    fn supervised_map_async_drops_failed_future_once() {
973        let decisions = Arc::new(AtomicUsize::new(0));
974        let decider = {
975            let decisions = Arc::clone(&decisions);
976            Arc::new(move |_: &StreamError| {
977                decisions.fetch_add(1, Ordering::SeqCst);
978                SupervisionDirective::Resume
979            }) as SupervisionDecider
980        };
981
982        let collected = Source::from_iter([1, 2, 3, 4])
983            .map_async_with_supervision(
984                2,
985                |item| async move { if item == 3 { Err(boom()) } else { Ok(item) } },
986                decider,
987            )
988            .run_collect()
989            .unwrap();
990
991        assert_eq!(collected, vec![1, 2, 4]);
992        assert_eq!(decisions.load(Ordering::SeqCst), 1);
993    }
994
995    #[test]
996    fn restart_source_restarts_on_completion_until_cap() {
997        let attempts = Arc::new(AtomicUsize::new(0));
998        let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
999            .with_max_restarts(2, Duration::from_secs(1));
1000        let source = RestartSource::with_backoff(settings, {
1001            let attempts = Arc::clone(&attempts);
1002            move || {
1003                let attempt = attempts.fetch_add(1, Ordering::SeqCst);
1004                Source::single(attempt)
1005            }
1006        });
1007
1008        let values = source.run_collect().unwrap();
1009        assert_eq!(values, vec![0, 1, 2]);
1010        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1011    }
1012
1013    #[test]
1014    fn restart_source_on_failures_does_not_restart_on_completion() {
1015        let attempts = Arc::new(AtomicUsize::new(0));
1016        let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1017            .with_max_restarts(3, Duration::from_secs(1));
1018        let source = RestartSource::on_failures_with_backoff(settings, {
1019            let attempts = Arc::clone(&attempts);
1020            move || {
1021                if attempts.fetch_add(1, Ordering::SeqCst) == 0 {
1022                    Source::failed(boom())
1023                } else {
1024                    Source::from_iter([7, 8])
1025                }
1026            }
1027        });
1028
1029        let values = source.run_collect().unwrap();
1030        assert_eq!(values, vec![7, 8]);
1031        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1032    }
1033
1034    #[test]
1035    fn restart_source_cap_resets_after_within_window() {
1036        let attempts = Arc::new(AtomicUsize::new(0));
1037        let settings =
1038            RestartSettings::new(Duration::from_millis(8), Duration::from_millis(8), 0.0)
1039                .with_max_restarts(1, Duration::from_millis(1));
1040        let source = RestartSource::on_failures_with_backoff(settings, {
1041            let attempts = Arc::clone(&attempts);
1042            move || {
1043                let attempt = attempts.fetch_add(1, Ordering::SeqCst);
1044                if attempt < 2 {
1045                    Source::failed(boom())
1046                } else {
1047                    Source::single(42)
1048                }
1049            }
1050        });
1051
1052        let started = Instant::now();
1053        let values = source.run_collect().unwrap();
1054        assert_eq!(values, vec![42]);
1055        assert!(started.elapsed() >= Duration::from_millis(8));
1056        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1057    }
1058
1059    #[test]
1060    fn restart_flow_drops_failed_in_flight_element_and_continues() {
1061        let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1062            .with_max_restarts(1, Duration::from_secs(1));
1063        let values = Source::from_iter([1, 2, 3, 4, 5])
1064            .via(RestartFlow::on_failures_with_backoff(settings, || {
1065                Flow::identity().try_map(|item| if item == 3 { Err(boom()) } else { Ok(item) })
1066            }))
1067            .run_collect()
1068            .unwrap();
1069
1070        assert_eq!(values, vec![1, 2, 4, 5]);
1071    }
1072
1073    #[test]
1074    fn retry_flow_retries_with_backoff_then_emits_last_output() {
1075        let flow = Flow::identity().map(|item: i32| item / 2);
1076        let values = Source::from_iter([5, 1])
1077            .via(RetryFlow::with_backoff(
1078                Duration::ZERO,
1079                Duration::ZERO,
1080                0.0,
1081                3,
1082                flow,
1083                |_, output| (*output > 0).then_some(*output),
1084            ))
1085            .run_collect()
1086            .unwrap();
1087
1088        assert_eq!(values, vec![0, 0]);
1089    }
1090}