Skip to main content

datum/stream/
restart.rs

1use super::*;
2use crate::context::FlowWithContext;
3use crate::stream::error::panic_stream_error;
4use crate::stream::flow::FlowTransform;
5use std::{
6    marker::PhantomData,
7    sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
8    time::Instant,
9};
10
11static BACKOFF_RANDOM_SEED: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
12
13/// Backoff settings for [`RestartSource`], [`RestartFlow`], and [`RestartSink`].
14///
15/// The restart counter follows Akka's `maxRestartsWithin` rule: once the reset
16/// window elapses, both the restart count and exponential backoff position are
17/// reset. The random factor adds up to `random_factor * computed_backoff` extra
18/// delay.
19#[derive(Clone, Debug)]
20pub struct RestartSettings {
21    min_backoff: Duration,
22    max_backoff: Duration,
23    random_factor: f64,
24    max_restarts: usize,
25    max_restarts_within: Duration,
26}
27
28impl RestartSettings {
29    #[must_use]
30    pub fn new(min_backoff: Duration, max_backoff: Duration, random_factor: f64) -> Self {
31        assert!(min_backoff <= max_backoff);
32        assert!(random_factor >= 0.0);
33        Self {
34            min_backoff,
35            max_backoff,
36            random_factor,
37            max_restarts: usize::MAX,
38            max_restarts_within: min_backoff,
39        }
40    }
41
42    #[must_use]
43    pub fn min_backoff(&self) -> Duration {
44        self.min_backoff
45    }
46
47    #[must_use]
48    pub fn max_backoff(&self) -> Duration {
49        self.max_backoff
50    }
51
52    #[must_use]
53    pub fn random_factor(&self) -> f64 {
54        self.random_factor
55    }
56
57    #[must_use]
58    pub fn max_restarts(&self) -> usize {
59        self.max_restarts
60    }
61
62    #[must_use]
63    pub fn max_restarts_within(&self) -> Duration {
64        self.max_restarts_within
65    }
66
67    #[must_use]
68    pub fn with_min_backoff(mut self, value: Duration) -> Self {
69        assert!(value <= self.max_backoff);
70        self.min_backoff = value;
71        self
72    }
73
74    #[must_use]
75    pub fn with_max_backoff(mut self, value: Duration) -> Self {
76        assert!(self.min_backoff <= value);
77        self.max_backoff = value;
78        self
79    }
80
81    #[must_use]
82    pub fn with_random_factor(mut self, value: f64) -> Self {
83        assert!(value >= 0.0);
84        self.random_factor = value;
85        self
86    }
87
88    #[must_use]
89    pub fn with_max_restarts(mut self, count: usize, within: Duration) -> Self {
90        self.max_restarts = count;
91        self.max_restarts_within = within;
92        self
93    }
94}
95
96#[derive(Clone, Copy, Debug, PartialEq, Eq)]
97enum RestartCause {
98    Failure,
99    Completion,
100}
101
102enum RestartDecision<T> {
103    Restart,
104    Finish(Option<StreamResult<T>>),
105}
106
107struct RestartBackoff {
108    settings: RestartSettings,
109    window_start: Instant,
110    restarts_in_window: usize,
111}
112
113impl RestartBackoff {
114    fn new(settings: RestartSettings) -> Self {
115        Self {
116            settings,
117            window_start: Instant::now(),
118            restarts_in_window: 0,
119        }
120    }
121
122    fn next_delay(&mut self) -> Option<Duration> {
123        let now = Instant::now();
124        if now.duration_since(self.window_start) > self.settings.max_restarts_within {
125            self.window_start = now;
126            self.restarts_in_window = 0;
127        }
128        if self.restarts_in_window >= self.settings.max_restarts {
129            return None;
130        }
131
132        let exponent = (self.restarts_in_window.min(31) as i32).max(0);
133        let base = self
134            .settings
135            .min_backoff
136            .mul_f64(2_f64.powi(exponent))
137            .min(self.settings.max_backoff);
138        self.restarts_in_window += 1;
139        if self.settings.random_factor == 0.0 || base.is_zero() {
140            Some(base)
141        } else {
142            let jitter = base.mul_f64(next_random_fraction() * self.settings.random_factor);
143            Some(base.saturating_add(jitter))
144        }
145    }
146}
147
148fn next_random_fraction() -> f64 {
149    let mut current = BACKOFF_RANDOM_SEED.load(AtomicOrdering::Relaxed);
150    loop {
151        let mut next = current;
152        next ^= next << 13;
153        next ^= next >> 7;
154        next ^= next << 17;
155        match BACKOFF_RANDOM_SEED.compare_exchange_weak(
156            current,
157            next,
158            AtomicOrdering::Relaxed,
159            AtomicOrdering::Relaxed,
160        ) {
161            Ok(_) => return ((next >> 11) as f64) / ((1_u64 << 53) as f64),
162            Err(observed) => current = observed,
163        }
164    }
165}
166
167fn wait_backoff(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
168    if delay.is_zero() {
169        return Ok(());
170    }
171    let (sender, receiver) = std::sync::mpsc::sync_channel(1);
172    let _timer = materializer.schedule_once(delay, move || {
173        let _ = sender.send(());
174    });
175
176    loop {
177        if materializer.is_shutdown() {
178            return Err(StreamError::AbruptTermination);
179        }
180        if super::runtime::current_stream_cancelled()
181            .as_ref()
182            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
183        {
184            return Err(StreamError::Cancelled);
185        }
186        // The timer wake arrives on the channel; the bounded recv exists only
187        // so shutdown/cancellation are observed promptly. 25 ms keeps cancel
188        // latency low without busy-spinning a long backoff (a 30 s max
189        // backoff parks ~1.2k times instead of 30k at 1 ms).
190        match receiver.recv_timeout(Duration::from_millis(25)) {
191            Ok(()) => return Ok(()),
192            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
193            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
194                return Err(StreamError::AbruptTermination);
195            }
196        }
197    }
198}
199
200fn invoke_factory<T, F>(context: &str, factory: &F) -> StreamResult<T>
201where
202    F: Fn() -> T,
203{
204    catch_unwind(AssertUnwindSafe(factory)).map_err(|_| panic_stream_error(context))
205}
206
207/// Akka-style restarting source wrapper.
208///
209/// `with_backoff` restarts when the wrapped source fails or completes.
210/// `on_failures_with_backoff` restarts only on failure. During the backoff the
211/// downstream pull is backpressured by waiting on the materializer's central
212/// timer driver; no per-restart timer threads are spawned.
213pub struct RestartSource;
214
215impl RestartSource {
216    #[must_use]
217    pub fn with_backoff<Out, Mat, F>(settings: RestartSettings, factory: F) -> Source<Out>
218    where
219        Out: Send + 'static,
220        Mat: Send + 'static,
221        F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
222    {
223        restart_source(settings, factory, false)
224    }
225
226    #[must_use]
227    pub fn on_failures_with_backoff<Out, Mat, F>(
228        settings: RestartSettings,
229        factory: F,
230    ) -> Source<Out>
231    where
232        Out: Send + 'static,
233        Mat: Send + 'static,
234        F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
235    {
236        restart_source(settings, factory, true)
237    }
238}
239
240fn restart_source<Out, Mat, F>(
241    settings: RestartSettings,
242    factory: F,
243    only_on_failures: bool,
244) -> Source<Out>
245where
246    Out: Send + 'static,
247    Mat: Send + 'static,
248    F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
249{
250    let factory = Arc::new(factory);
251    Source::from_materialized_factory(move |materializer| {
252        Ok((
253            Box::new(RestartSourceStream {
254                factory: Arc::clone(&factory),
255                materializer: materializer.clone(),
256                backoff: RestartBackoff::new(settings.clone()),
257                current: None,
258                only_on_failures,
259                terminal: None,
260                _marker: PhantomData::<Mat>,
261            }),
262            NotUsed,
263        ))
264    })
265}
266
267struct RestartSourceStream<Out, Mat, F> {
268    factory: Arc<F>,
269    materializer: Materializer,
270    backoff: RestartBackoff,
271    current: Option<BoxStream<Out>>,
272    only_on_failures: bool,
273    terminal: Option<StreamResult<()>>,
274    _marker: PhantomData<Mat>,
275}
276
277impl<Out, Mat, F> RestartSourceStream<Out, Mat, F>
278where
279    Out: Send + 'static,
280    Mat: Send + 'static,
281    F: Fn() -> Source<Out, Mat>,
282{
283    fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
284        let source = invoke_factory("RestartSource factory", self.factory.as_ref())?;
285        Arc::clone(&source.factory)
286            .create(&self.materializer)
287            .map(|(stream, _)| stream)
288    }
289
290    fn restart_or_finish(
291        &mut self,
292        cause: RestartCause,
293        error: Option<StreamError>,
294    ) -> RestartDecision<Out> {
295        if self.only_on_failures && cause == RestartCause::Completion {
296            return RestartDecision::Finish(None);
297        }
298        let Some(delay) = self.backoff.next_delay() else {
299            return RestartDecision::Finish(error.map(Err));
300        };
301        if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
302            return RestartDecision::Finish(Some(Err(wait_error)));
303        }
304        RestartDecision::Restart
305    }
306
307    fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
308        match result {
309            Some(Err(error)) => {
310                self.terminal = Some(Err(error.clone()));
311                Some(Err(error))
312            }
313            Some(Ok(_)) | None => {
314                self.terminal = Some(Ok(()));
315                None
316            }
317        }
318    }
319}
320
321impl<Out, Mat, F> Iterator for RestartSourceStream<Out, Mat, F>
322where
323    Out: Send + 'static,
324    Mat: Send + 'static,
325    F: Fn() -> Source<Out, Mat>,
326{
327    type Item = StreamResult<Out>;
328
329    fn next(&mut self) -> Option<Self::Item> {
330        if let Some(terminal) = &self.terminal {
331            return match terminal {
332                Ok(()) => None,
333                Err(error) => Some(Err(error.clone())),
334            };
335        }
336        loop {
337            if self.current.is_none() {
338                match self.rematerialize() {
339                    Ok(stream) => self.current = Some(stream),
340                    Err(error) => {
341                        match self.restart_or_finish(RestartCause::Failure, Some(error)) {
342                            RestartDecision::Restart => {}
343                            RestartDecision::Finish(result) => return self.finish(result),
344                        }
345                        continue;
346                    }
347                }
348            }
349
350            let next = self
351                .current
352                .as_mut()
353                .expect("restart source child exists")
354                .next();
355            match next {
356                Some(Ok(item)) => return Some(Ok(item)),
357                Some(Err(error)) => {
358                    self.current = None;
359                    match self.restart_or_finish(RestartCause::Failure, Some(error)) {
360                        RestartDecision::Restart => {}
361                        RestartDecision::Finish(result) => return self.finish(result),
362                    }
363                }
364                None => {
365                    self.current = None;
366                    match self.restart_or_finish(RestartCause::Completion, None) {
367                        RestartDecision::Restart => {}
368                        RestartDecision::Finish(result) => return self.finish(result),
369                    }
370                }
371            }
372        }
373    }
374}
375
376struct SharedInput<In> {
377    inner: Arc<Mutex<SharedInputState<In>>>,
378}
379
380impl<In> Clone for SharedInput<In> {
381    fn clone(&self) -> Self {
382        Self {
383            inner: Arc::clone(&self.inner),
384        }
385    }
386}
387
388struct SharedInputState<In> {
389    input: Option<BoxStream<In>>,
390    exhausted: bool,
391}
392
393impl<In> SharedInput<In> {
394    fn new(input: BoxStream<In>) -> Self {
395        Self {
396            inner: Arc::new(Mutex::new(SharedInputState {
397                input: Some(input),
398                exhausted: false,
399            })),
400        }
401    }
402
403    fn stream(&self) -> SharedInputStream<In> {
404        SharedInputStream {
405            shared: self.clone(),
406        }
407    }
408
409    fn is_exhausted(&self) -> bool {
410        self.inner
411            .lock()
412            .unwrap_or_else(|poison| poison.into_inner())
413            .exhausted
414    }
415}
416
417struct SharedInputStream<In> {
418    shared: SharedInput<In>,
419}
420
421impl<In> Iterator for SharedInputStream<In> {
422    type Item = StreamResult<In>;
423
424    fn next(&mut self) -> Option<Self::Item> {
425        let mut input = {
426            let mut state = self
427                .shared
428                .inner
429                .lock()
430                .unwrap_or_else(|poison| poison.into_inner());
431            if state.exhausted {
432                return None;
433            }
434            state.input.take().expect("shared input present")
435        };
436        let next = input.next();
437        let mut state = self
438            .shared
439            .inner
440            .lock()
441            .unwrap_or_else(|poison| poison.into_inner());
442        if next.is_none() {
443            state.exhausted = true;
444        }
445        state.input = Some(input);
446        next
447    }
448}
449
450/// Akka-style restarting flow wrapper.
451///
452/// Restart flows are lossy across restarts: the element that triggers the
453/// wrapped flow failure, and any elements already pulled by that wrapped flow,
454/// are dropped. During backoff, downstream demand is held until the central
455/// materializer timer fires and the child flow is rematerialized.
456pub struct RestartFlow;
457
458impl RestartFlow {
459    #[must_use]
460    pub fn with_backoff<In, Out, Mat, F>(settings: RestartSettings, factory: F) -> Flow<In, Out>
461    where
462        In: Send + 'static,
463        Out: Send + 'static,
464        Mat: Send + 'static,
465        F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
466    {
467        restart_flow(settings, factory, false)
468    }
469
470    #[must_use]
471    pub fn on_failures_with_backoff<In, Out, Mat, F>(
472        settings: RestartSettings,
473        factory: F,
474    ) -> Flow<In, Out>
475    where
476        In: Send + 'static,
477        Out: Send + 'static,
478        Mat: Send + 'static,
479        F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
480    {
481        restart_flow(settings, factory, true)
482    }
483}
484
485fn restart_flow<In, Out, Mat, F>(
486    settings: RestartSettings,
487    factory: F,
488    only_on_failures: bool,
489) -> Flow<In, Out>
490where
491    In: Send + 'static,
492    Out: Send + 'static,
493    Mat: Send + 'static,
494    F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
495{
496    let factory = Arc::new(factory);
497    Flow::from_runtime_transform(move |input, materializer| {
498        let shared = SharedInput::new(input);
499        Ok(Box::new(RestartFlowStream {
500            factory: Arc::clone(&factory),
501            materializer: materializer.clone(),
502            shared,
503            backoff: RestartBackoff::new(settings.clone()),
504            current: None,
505            only_on_failures,
506            terminal: None,
507            _marker: PhantomData::<Mat>,
508        }))
509    })
510}
511
512struct RestartFlowStream<In, Out, Mat, F> {
513    factory: Arc<F>,
514    materializer: Materializer,
515    shared: SharedInput<In>,
516    backoff: RestartBackoff,
517    current: Option<BoxStream<Out>>,
518    only_on_failures: bool,
519    terminal: Option<StreamResult<()>>,
520    _marker: PhantomData<Mat>,
521}
522
523impl<In, Out, Mat, F> RestartFlowStream<In, Out, Mat, F>
524where
525    In: Send + 'static,
526    Out: Send + 'static,
527    Mat: Send + 'static,
528    F: Fn() -> Flow<In, Out, Mat>,
529{
530    fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
531        let flow = invoke_factory("RestartFlow factory", self.factory.as_ref())?;
532        (flow.materialize)()?;
533        let input = Box::new(self.shared.stream()) as BoxStream<In>;
534        match flow.transform {
535            FlowTransform::Pure(transform) => Ok(transform(input)),
536            FlowTransform::Runtime(transform) => transform(input, &self.materializer),
537        }
538    }
539
540    fn restart_or_finish(
541        &mut self,
542        cause: RestartCause,
543        error: Option<StreamError>,
544    ) -> RestartDecision<Out> {
545        if self.shared.is_exhausted() && cause == RestartCause::Completion {
546            return RestartDecision::Finish(None);
547        }
548        if self.only_on_failures && cause == RestartCause::Completion {
549            return RestartDecision::Finish(None);
550        }
551        let Some(delay) = self.backoff.next_delay() else {
552            return RestartDecision::Finish(error.map(Err));
553        };
554        if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
555            return RestartDecision::Finish(Some(Err(wait_error)));
556        }
557        RestartDecision::Restart
558    }
559
560    fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
561        match result {
562            Some(Err(error)) => {
563                self.terminal = Some(Err(error.clone()));
564                Some(Err(error))
565            }
566            Some(Ok(_)) | None => {
567                self.terminal = Some(Ok(()));
568                None
569            }
570        }
571    }
572}
573
574impl<In, Out, Mat, F> Iterator for RestartFlowStream<In, Out, Mat, F>
575where
576    In: Send + 'static,
577    Out: Send + 'static,
578    Mat: Send + 'static,
579    F: Fn() -> Flow<In, Out, Mat>,
580{
581    type Item = StreamResult<Out>;
582
583    fn next(&mut self) -> Option<Self::Item> {
584        if let Some(terminal) = &self.terminal {
585            return match terminal {
586                Ok(()) => None,
587                Err(error) => Some(Err(error.clone())),
588            };
589        }
590        loop {
591            if self.current.is_none() {
592                if self.shared.is_exhausted() {
593                    return None;
594                }
595                match self.rematerialize() {
596                    Ok(stream) => self.current = Some(stream),
597                    Err(error) => {
598                        match self.restart_or_finish(RestartCause::Failure, Some(error)) {
599                            RestartDecision::Restart => {}
600                            RestartDecision::Finish(result) => return self.finish(result),
601                        }
602                        continue;
603                    }
604                }
605            }
606
607            let next = self
608                .current
609                .as_mut()
610                .expect("restart flow child exists")
611                .next();
612            match next {
613                Some(Ok(item)) => return Some(Ok(item)),
614                Some(Err(error)) => {
615                    self.current = None;
616                    match self.restart_or_finish(RestartCause::Failure, Some(error)) {
617                        RestartDecision::Restart => {}
618                        RestartDecision::Finish(result) => return self.finish(result),
619                    }
620                }
621                None => {
622                    self.current = None;
623                    match self.restart_or_finish(RestartCause::Completion, None) {
624                        RestartDecision::Restart => {}
625                        RestartDecision::Finish(result) => return self.finish(result),
626                    }
627                }
628            }
629        }
630    }
631}
632
633/// Akka-style restarting sink wrapper.
634///
635/// Datum's current sink wrapper accepts factories that materialize
636/// `StreamCompletion<NotUsed>` so the outer sink can wait for each child sink
637/// before deciding whether to restart. Like Akka's restart sink, upstream is
638/// backpressured during backoff.
639pub struct RestartSink;
640
641impl RestartSink {
642    #[must_use]
643    pub fn with_backoff<In, F>(
644        settings: RestartSettings,
645        factory: F,
646    ) -> Sink<In, StreamCompletion<NotUsed>>
647    where
648        In: Send + 'static,
649        F: Fn() -> Sink<In, StreamCompletion<NotUsed>> + Send + Sync + 'static,
650    {
651        let factory = Arc::new(factory);
652        Sink::from_runner(move |input, materializer| {
653            let materializer = materializer.clone();
654            let factory = Arc::clone(&factory);
655            let settings = settings.clone();
656            let state = Arc::clone(&materializer.inner.state);
657            Ok(materializer.clone().spawn_stream(move |cancelled| {
658                let checked = runtime_checked_stream(input, state, Some(cancelled));
659                let shared = SharedInput::new(Box::new(checked) as BoxStream<In>);
660                let mut backoff = RestartBackoff::new(settings.clone());
661                loop {
662                    if shared.is_exhausted() {
663                        return Ok(NotUsed);
664                    }
665                    let sink = invoke_factory("RestartSink factory", factory.as_ref())?;
666                    let completion = sink.run(Box::new(shared.stream()), &materializer)?;
667                    match completion.wait() {
668                        Ok(_) if shared.is_exhausted() => return Ok(NotUsed),
669                        Ok(_) => {
670                            let Some(delay) = backoff.next_delay() else {
671                                return Ok(NotUsed);
672                            };
673                            wait_backoff(&materializer, delay)?;
674                        }
675                        Err(error) => {
676                            let Some(delay) = backoff.next_delay() else {
677                                return Err(error);
678                            };
679                            wait_backoff(&materializer, delay)?;
680                        }
681                    }
682                }
683            }))
684        })
685    }
686}
687
688/// Akka-style `RetryFlow.withBackoff` helpers.
689///
690/// The wrapped flow must be one-in/one-out. Datum validates that at runtime and
691/// fails the stream if the wrapped flow emits zero or multiple outputs for a
692/// retried element.
693pub struct RetryFlow;
694
695type RetryDecider<In, Out> = Arc<dyn Fn(&In, &Out) -> Option<In> + Send + Sync>;
696
697impl RetryFlow {
698    #[must_use]
699    pub fn with_backoff<In, Out, Mat, Decide>(
700        min_backoff: Duration,
701        max_backoff: Duration,
702        random_factor: f64,
703        max_retries: usize,
704        flow: Flow<In, Out, Mat>,
705        decide_retry: Decide,
706    ) -> Flow<In, Out>
707    where
708        In: Clone + Send + 'static,
709        Out: Send + 'static,
710        Mat: Send + 'static,
711        Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
712    {
713        retry_flow(
714            min_backoff,
715            max_backoff,
716            random_factor,
717            max_retries,
718            flow,
719            Arc::new(decide_retry),
720        )
721    }
722
723    #[must_use]
724    pub fn with_backoff_and_context<In, CtxIn, Out, CtxOut, Mat, Decide>(
725        min_backoff: Duration,
726        max_backoff: Duration,
727        random_factor: f64,
728        max_retries: usize,
729        flow: FlowWithContext<In, CtxIn, Out, CtxOut, Mat>,
730        decide_retry: Decide,
731    ) -> FlowWithContext<In, CtxIn, Out, CtxOut>
732    where
733        In: Clone + Send + 'static,
734        CtxIn: Clone + Send + 'static,
735        Out: Send + 'static,
736        CtxOut: Send + 'static,
737        Mat: Send + 'static,
738        Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
739    {
740        let decide_retry = Arc::new(decide_retry);
741        let delegate = retry_flow(
742            min_backoff,
743            max_backoff,
744            random_factor,
745            max_retries,
746            flow.as_flow(),
747            Arc::new(move |input: &(In, CtxIn), output: &(Out, CtxOut)| {
748                decide_retry(&input.0, &output.0).map(|next| (next, input.1.clone()))
749            }),
750        );
751        FlowWithContext::from_flow(delegate)
752    }
753}
754
755fn retry_flow<In, Out, Mat>(
756    min_backoff: Duration,
757    max_backoff: Duration,
758    random_factor: f64,
759    max_retries: usize,
760    flow: Flow<In, Out, Mat>,
761    decide_retry: RetryDecider<In, Out>,
762) -> Flow<In, Out>
763where
764    In: Clone + Send + 'static,
765    Out: Send + 'static,
766    Mat: Send + 'static,
767{
768    let settings = RestartSettings::new(min_backoff, max_backoff, random_factor)
769        .with_max_restarts(max_retries, max_backoff.max(min_backoff));
770    Flow::from_runtime_transform(move |input, materializer| {
771        Ok(Box::new(RetryFlowStream {
772            input,
773            materializer: materializer.clone(),
774            flow: flow.clone(),
775            decide_retry: Arc::clone(&decide_retry),
776            settings: settings.clone(),
777        }))
778    })
779}
780
781struct RetryFlowStream<In, Out, Mat> {
782    input: BoxStream<In>,
783    materializer: Materializer,
784    flow: Flow<In, Out, Mat>,
785    decide_retry: RetryDecider<In, Out>,
786    settings: RestartSettings,
787}
788
789impl<In, Out, Mat> RetryFlowStream<In, Out, Mat>
790where
791    In: Clone + Send + 'static,
792    Out: Send + 'static,
793    Mat: Send + 'static,
794{
795    fn run_once(&self, item: In) -> StreamResult<Out> {
796        (self.flow.materialize)()?;
797        let input = Box::new(std::iter::once(Ok(item))) as BoxStream<In>;
798        let mut stream = match &self.flow.transform {
799            FlowTransform::Pure(transform) => transform(input),
800            FlowTransform::Runtime(transform) => transform(input, &self.materializer)?,
801        };
802        let output = match stream.next() {
803            Some(Ok(output)) => output,
804            Some(Err(error)) => return Err(error),
805            None => {
806                return Err(StreamError::Failed(
807                    "RetryFlow inner flow produced no output".to_owned(),
808                ));
809            }
810        };
811        if stream.next().is_some() {
812            return Err(StreamError::Failed(
813                "RetryFlow inner flow produced more than one output".to_owned(),
814            ));
815        }
816        Ok(output)
817    }
818}
819
820impl<In, Out, Mat> Iterator for RetryFlowStream<In, Out, Mat>
821where
822    In: Clone + Send + 'static,
823    Out: Send + 'static,
824    Mat: Send + 'static,
825{
826    type Item = StreamResult<Out>;
827
828    fn next(&mut self) -> Option<Self::Item> {
829        let original = match self.input.next()? {
830            Ok(item) => item,
831            Err(error) => return Some(Err(error)),
832        };
833        let mut current = original.clone();
834        let mut retries = 0_usize;
835        let mut backoff = RestartBackoff::new(self.settings.clone());
836        loop {
837            let output = match self.run_once(current.clone()) {
838                Ok(output) => output,
839                Err(error) => return Some(Err(error)),
840            };
841            let retry =
842                match catch_unwind(AssertUnwindSafe(|| (self.decide_retry)(&original, &output))) {
843                    Ok(retry) => retry,
844                    Err(_) => return Some(Err(panic_stream_error("RetryFlow decider"))),
845                };
846            let Some(next_input) = retry else {
847                return Some(Ok(output));
848            };
849            if retries >= self.settings.max_restarts {
850                return Some(Ok(output));
851            }
852            let delay = backoff.next_delay().unwrap_or(self.settings.max_backoff);
853            if let Err(error) = wait_backoff(&self.materializer, delay) {
854                return Some(Err(error));
855            }
856            retries += 1;
857            current = next_input;
858        }
859    }
860}
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865    use std::sync::{
866        Arc,
867        atomic::{AtomicUsize, Ordering},
868    };
869
870    fn boom() -> StreamError {
871        StreamError::Failed("boom".to_owned())
872    }
873
874    #[test]
875    fn supervised_map_result_stops_by_default_and_resumes_when_requested() {
876        let stopped = Source::from_iter([1, 2, 3, 4])
877            .map_result(|item| if item == 3 { Err(boom()) } else { Ok(item) })
878            .run_collect();
879        assert_eq!(stopped, Err(boom()));
880
881        let resumed = Source::from_iter([1, 2, 3, 4])
882            .map_result_with_supervision(
883                |item| if item == 3 { Err(boom()) } else { Ok(item) },
884                Supervision::resuming_decider(),
885            )
886            .run_collect()
887            .unwrap();
888        assert_eq!(resumed, vec![1, 2, 4]);
889    }
890
891    #[test]
892    fn supervised_scan_restart_resets_state_and_reemits_seed() {
893        let resumed = Source::from_iter([1, 3, -1, 5, 7])
894            .scan_result_with_supervision(
895                0,
896                |acc, item| {
897                    if item < 0 {
898                        Err(boom())
899                    } else {
900                        Ok(acc + item)
901                    }
902                },
903                Supervision::resuming_decider(),
904            )
905            .run_collect()
906            .unwrap();
907        assert_eq!(resumed, vec![0, 1, 4, 9, 16]);
908
909        let restarted = Source::from_iter([1, 3, -1, 5, 7])
910            .scan_result_with_supervision(
911                0,
912                |acc, item| {
913                    if item < 0 {
914                        Err(boom())
915                    } else {
916                        Ok(acc + item)
917                    }
918                },
919                Supervision::restarting_decider(),
920            )
921            .run_collect()
922            .unwrap();
923        assert_eq!(restarted, vec![0, 1, 4, 0, 5, 12]);
924    }
925
926    #[test]
927    fn supervised_fold_restart_resets_accumulator() {
928        let resumed = Source::from_iter(1..=5)
929            .run_with(Sink::fold_result_with_supervision(
930                0,
931                |acc, item| {
932                    if item == 3 {
933                        Err(boom())
934                    } else {
935                        Ok(acc + item)
936                    }
937                },
938                Supervision::resuming_decider(),
939            ))
940            .unwrap()
941            .wait()
942            .unwrap();
943        assert_eq!(resumed, 12);
944
945        let restarted = Source::from_iter(1..=5)
946            .run_with(Sink::fold_result_with_supervision(
947                0,
948                |acc, item| {
949                    if item == 3 {
950                        Err(boom())
951                    } else {
952                        Ok(acc + item)
953                    }
954                },
955                Supervision::restarting_decider(),
956            ))
957            .unwrap()
958            .wait()
959            .unwrap();
960        assert_eq!(restarted, 9);
961    }
962
963    #[test]
964    fn supervised_map_async_drops_failed_future_once() {
965        let decisions = Arc::new(AtomicUsize::new(0));
966        let decider = {
967            let decisions = Arc::clone(&decisions);
968            Arc::new(move |_: &StreamError| {
969                decisions.fetch_add(1, Ordering::SeqCst);
970                SupervisionDirective::Resume
971            }) as SupervisionDecider
972        };
973
974        let collected = Source::from_iter([1, 2, 3, 4])
975            .map_async_with_supervision(
976                2,
977                |item| async move { if item == 3 { Err(boom()) } else { Ok(item) } },
978                decider,
979            )
980            .run_collect()
981            .unwrap();
982
983        assert_eq!(collected, vec![1, 2, 4]);
984        assert_eq!(decisions.load(Ordering::SeqCst), 1);
985    }
986
987    #[test]
988    fn restart_source_restarts_on_completion_until_cap() {
989        let attempts = Arc::new(AtomicUsize::new(0));
990        let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
991            .with_max_restarts(2, Duration::from_secs(1));
992        let source = RestartSource::with_backoff(settings, {
993            let attempts = Arc::clone(&attempts);
994            move || {
995                let attempt = attempts.fetch_add(1, Ordering::SeqCst);
996                Source::single(attempt)
997            }
998        });
999
1000        let values = source.run_collect().unwrap();
1001        assert_eq!(values, vec![0, 1, 2]);
1002        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1003    }
1004
1005    #[test]
1006    fn restart_source_on_failures_does_not_restart_on_completion() {
1007        let attempts = Arc::new(AtomicUsize::new(0));
1008        let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1009            .with_max_restarts(3, Duration::from_secs(1));
1010        let source = RestartSource::on_failures_with_backoff(settings, {
1011            let attempts = Arc::clone(&attempts);
1012            move || {
1013                if attempts.fetch_add(1, Ordering::SeqCst) == 0 {
1014                    Source::failed(boom())
1015                } else {
1016                    Source::from_iter([7, 8])
1017                }
1018            }
1019        });
1020
1021        let values = source.run_collect().unwrap();
1022        assert_eq!(values, vec![7, 8]);
1023        assert_eq!(attempts.load(Ordering::SeqCst), 2);
1024    }
1025
1026    #[test]
1027    fn restart_source_cap_resets_after_within_window() {
1028        let attempts = Arc::new(AtomicUsize::new(0));
1029        let settings =
1030            RestartSettings::new(Duration::from_millis(8), Duration::from_millis(8), 0.0)
1031                .with_max_restarts(1, Duration::from_millis(1));
1032        let source = RestartSource::on_failures_with_backoff(settings, {
1033            let attempts = Arc::clone(&attempts);
1034            move || {
1035                let attempt = attempts.fetch_add(1, Ordering::SeqCst);
1036                if attempt < 2 {
1037                    Source::failed(boom())
1038                } else {
1039                    Source::single(42)
1040                }
1041            }
1042        });
1043
1044        let started = Instant::now();
1045        let values = source.run_collect().unwrap();
1046        assert_eq!(values, vec![42]);
1047        assert!(started.elapsed() >= Duration::from_millis(8));
1048        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1049    }
1050
1051    #[test]
1052    fn restart_flow_drops_failed_in_flight_element_and_continues() {
1053        let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1054            .with_max_restarts(1, Duration::from_secs(1));
1055        let values = Source::from_iter([1, 2, 3, 4, 5])
1056            .via(RestartFlow::on_failures_with_backoff(settings, || {
1057                Flow::identity().map_result(|item| if item == 3 { Err(boom()) } else { Ok(item) })
1058            }))
1059            .run_collect()
1060            .unwrap();
1061
1062        assert_eq!(values, vec![1, 2, 4, 5]);
1063    }
1064
1065    #[test]
1066    fn retry_flow_retries_with_backoff_then_emits_last_output() {
1067        let flow = Flow::identity().map(|item: i32| item / 2);
1068        let values = Source::from_iter([5, 1])
1069            .via(RetryFlow::with_backoff(
1070                Duration::ZERO,
1071                Duration::ZERO,
1072                0.0,
1073                3,
1074                flow,
1075                |_, output| (*output > 0).then_some(*output),
1076            ))
1077            .run_collect()
1078            .unwrap();
1079
1080        assert_eq!(values, vec![0, 0]);
1081    }
1082}