1use super::*;
2use crate::context::FlowWithContext;
3use crate::stream::error::panic_stream_error;
4use crate::stream::flow::FlowTransform;
5use std::{
6 marker::PhantomData,
7 sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
8 time::Instant,
9};
10
11static BACKOFF_RANDOM_SEED: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
12
13#[derive(Clone, Debug)]
20pub struct RestartSettings {
21 min_backoff: Duration,
22 max_backoff: Duration,
23 random_factor: f64,
24 max_restarts: usize,
25 max_restarts_within: Duration,
26}
27
28impl RestartSettings {
29 #[must_use]
30 pub fn new(min_backoff: Duration, max_backoff: Duration, random_factor: f64) -> Self {
31 assert!(min_backoff <= max_backoff);
32 assert!(random_factor >= 0.0);
33 Self {
34 min_backoff,
35 max_backoff,
36 random_factor,
37 max_restarts: usize::MAX,
38 max_restarts_within: min_backoff,
39 }
40 }
41
42 #[must_use]
43 pub fn min_backoff(&self) -> Duration {
44 self.min_backoff
45 }
46
47 #[must_use]
48 pub fn max_backoff(&self) -> Duration {
49 self.max_backoff
50 }
51
52 #[must_use]
53 pub fn random_factor(&self) -> f64 {
54 self.random_factor
55 }
56
57 #[must_use]
58 pub fn max_restarts(&self) -> usize {
59 self.max_restarts
60 }
61
62 #[must_use]
63 pub fn max_restarts_within(&self) -> Duration {
64 self.max_restarts_within
65 }
66
67 #[must_use]
68 pub fn with_min_backoff(mut self, value: Duration) -> Self {
69 assert!(value <= self.max_backoff);
70 self.min_backoff = value;
71 self
72 }
73
74 #[must_use]
75 pub fn with_max_backoff(mut self, value: Duration) -> Self {
76 assert!(self.min_backoff <= value);
77 self.max_backoff = value;
78 self
79 }
80
81 #[must_use]
82 pub fn with_random_factor(mut self, value: f64) -> Self {
83 assert!(value >= 0.0);
84 self.random_factor = value;
85 self
86 }
87
88 #[must_use]
89 pub fn with_max_restarts(mut self, count: usize, within: Duration) -> Self {
90 self.max_restarts = count;
91 self.max_restarts_within = within;
92 self
93 }
94}
95
96#[derive(Clone, Copy, Debug, PartialEq, Eq)]
97enum RestartCause {
98 Failure,
99 Completion,
100}
101
102enum RestartDecision<T> {
103 Restart,
104 Finish(Option<StreamResult<T>>),
105}
106
107struct RestartBackoff {
108 settings: RestartSettings,
109 window_start: Instant,
110 restarts_in_window: usize,
111}
112
113impl RestartBackoff {
114 fn new(settings: RestartSettings) -> Self {
115 Self {
116 settings,
117 window_start: Instant::now(),
118 restarts_in_window: 0,
119 }
120 }
121
122 fn next_delay(&mut self) -> Option<Duration> {
123 let now = Instant::now();
124 if now.duration_since(self.window_start) > self.settings.max_restarts_within {
125 self.window_start = now;
126 self.restarts_in_window = 0;
127 }
128 if self.restarts_in_window >= self.settings.max_restarts {
129 return None;
130 }
131
132 let exponent = (self.restarts_in_window.min(31) as i32).max(0);
133 let base = self
134 .settings
135 .min_backoff
136 .mul_f64(2_f64.powi(exponent))
137 .min(self.settings.max_backoff);
138 self.restarts_in_window += 1;
139 if self.settings.random_factor == 0.0 || base.is_zero() {
140 Some(base)
141 } else {
142 let jitter = base.mul_f64(next_random_fraction() * self.settings.random_factor);
143 Some(base.saturating_add(jitter))
144 }
145 }
146}
147
148fn next_random_fraction() -> f64 {
149 let mut current = BACKOFF_RANDOM_SEED.load(AtomicOrdering::Relaxed);
150 loop {
151 let mut next = current;
152 next ^= next << 13;
153 next ^= next >> 7;
154 next ^= next << 17;
155 match BACKOFF_RANDOM_SEED.compare_exchange_weak(
156 current,
157 next,
158 AtomicOrdering::Relaxed,
159 AtomicOrdering::Relaxed,
160 ) {
161 Ok(_) => return ((next >> 11) as f64) / ((1_u64 << 53) as f64),
162 Err(observed) => current = observed,
163 }
164 }
165}
166
167fn wait_backoff(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
168 if delay.is_zero() {
169 return Ok(());
170 }
171 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
172 let _timer = materializer.schedule_once(delay, move || {
173 let _ = sender.send(());
174 });
175
176 loop {
177 if materializer.is_shutdown() {
178 return Err(StreamError::AbruptTermination);
179 }
180 if super::runtime::current_stream_cancelled()
181 .as_ref()
182 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
183 {
184 return Err(StreamError::Cancelled);
185 }
186 match receiver.recv_timeout(Duration::from_millis(25)) {
191 Ok(()) => return Ok(()),
192 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
193 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
194 return Err(StreamError::AbruptTermination);
195 }
196 }
197 }
198}
199
200fn invoke_factory<T, F>(context: &str, factory: &F) -> StreamResult<T>
201where
202 F: Fn() -> T,
203{
204 catch_unwind(AssertUnwindSafe(factory)).map_err(|_| panic_stream_error(context))
205}
206
207pub struct RestartSource;
214
215impl RestartSource {
216 #[must_use]
217 pub fn with_backoff<Out, Mat, F>(settings: RestartSettings, factory: F) -> Source<Out>
218 where
219 Out: Send + 'static,
220 Mat: Send + 'static,
221 F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
222 {
223 restart_source(settings, factory, false)
224 }
225
226 #[must_use]
227 pub fn on_failures_with_backoff<Out, Mat, F>(
228 settings: RestartSettings,
229 factory: F,
230 ) -> Source<Out>
231 where
232 Out: Send + 'static,
233 Mat: Send + 'static,
234 F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
235 {
236 restart_source(settings, factory, true)
237 }
238}
239
240fn restart_source<Out, Mat, F>(
241 settings: RestartSettings,
242 factory: F,
243 only_on_failures: bool,
244) -> Source<Out>
245where
246 Out: Send + 'static,
247 Mat: Send + 'static,
248 F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
249{
250 let factory = Arc::new(factory);
251 Source::from_materialized_factory(move |materializer| {
252 Ok((
253 Box::new(RestartSourceStream {
254 factory: Arc::clone(&factory),
255 materializer: materializer.clone(),
256 backoff: RestartBackoff::new(settings.clone()),
257 current: None,
258 only_on_failures,
259 terminal: None,
260 _marker: PhantomData::<Mat>,
261 }),
262 NotUsed,
263 ))
264 })
265}
266
267struct RestartSourceStream<Out, Mat, F> {
268 factory: Arc<F>,
269 materializer: Materializer,
270 backoff: RestartBackoff,
271 current: Option<BoxStream<Out>>,
272 only_on_failures: bool,
273 terminal: Option<StreamResult<()>>,
274 _marker: PhantomData<Mat>,
275}
276
277impl<Out, Mat, F> RestartSourceStream<Out, Mat, F>
278where
279 Out: Send + 'static,
280 Mat: Send + 'static,
281 F: Fn() -> Source<Out, Mat>,
282{
283 fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
284 let source = invoke_factory("RestartSource factory", self.factory.as_ref())?;
285 Arc::clone(&source.factory)
286 .create(&self.materializer)
287 .map(|(stream, _)| stream)
288 }
289
290 fn restart_or_finish(
291 &mut self,
292 cause: RestartCause,
293 error: Option<StreamError>,
294 ) -> RestartDecision<Out> {
295 if self.only_on_failures && cause == RestartCause::Completion {
296 return RestartDecision::Finish(None);
297 }
298 let Some(delay) = self.backoff.next_delay() else {
299 return RestartDecision::Finish(error.map(Err));
300 };
301 if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
302 return RestartDecision::Finish(Some(Err(wait_error)));
303 }
304 RestartDecision::Restart
305 }
306
307 fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
308 match result {
309 Some(Err(error)) => {
310 self.terminal = Some(Err(error.clone()));
311 Some(Err(error))
312 }
313 Some(Ok(_)) | None => {
314 self.terminal = Some(Ok(()));
315 None
316 }
317 }
318 }
319}
320
321impl<Out, Mat, F> Iterator for RestartSourceStream<Out, Mat, F>
322where
323 Out: Send + 'static,
324 Mat: Send + 'static,
325 F: Fn() -> Source<Out, Mat>,
326{
327 type Item = StreamResult<Out>;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 if let Some(terminal) = &self.terminal {
331 return match terminal {
332 Ok(()) => None,
333 Err(error) => Some(Err(error.clone())),
334 };
335 }
336 loop {
337 if self.current.is_none() {
338 match self.rematerialize() {
339 Ok(stream) => self.current = Some(stream),
340 Err(error) => {
341 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
342 RestartDecision::Restart => {}
343 RestartDecision::Finish(result) => return self.finish(result),
344 }
345 continue;
346 }
347 }
348 }
349
350 let next = self
351 .current
352 .as_mut()
353 .expect("restart source child exists")
354 .next();
355 match next {
356 Some(Ok(item)) => return Some(Ok(item)),
357 Some(Err(error)) => {
358 self.current = None;
359 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
360 RestartDecision::Restart => {}
361 RestartDecision::Finish(result) => return self.finish(result),
362 }
363 }
364 None => {
365 self.current = None;
366 match self.restart_or_finish(RestartCause::Completion, None) {
367 RestartDecision::Restart => {}
368 RestartDecision::Finish(result) => return self.finish(result),
369 }
370 }
371 }
372 }
373 }
374}
375
376struct SharedInput<In> {
377 inner: Arc<Mutex<SharedInputState<In>>>,
378}
379
380impl<In> Clone for SharedInput<In> {
381 fn clone(&self) -> Self {
382 Self {
383 inner: Arc::clone(&self.inner),
384 }
385 }
386}
387
388struct SharedInputState<In> {
389 input: Option<BoxStream<In>>,
390 exhausted: bool,
391}
392
393impl<In> SharedInput<In> {
394 fn new(input: BoxStream<In>) -> Self {
395 Self {
396 inner: Arc::new(Mutex::new(SharedInputState {
397 input: Some(input),
398 exhausted: false,
399 })),
400 }
401 }
402
403 fn stream(&self) -> SharedInputStream<In> {
404 SharedInputStream {
405 shared: self.clone(),
406 }
407 }
408
409 fn is_exhausted(&self) -> bool {
410 self.inner
411 .lock()
412 .unwrap_or_else(|poison| poison.into_inner())
413 .exhausted
414 }
415}
416
417struct SharedInputStream<In> {
418 shared: SharedInput<In>,
419}
420
421impl<In> Iterator for SharedInputStream<In> {
422 type Item = StreamResult<In>;
423
424 fn next(&mut self) -> Option<Self::Item> {
425 let mut input = {
426 let mut state = self
427 .shared
428 .inner
429 .lock()
430 .unwrap_or_else(|poison| poison.into_inner());
431 if state.exhausted {
432 return None;
433 }
434 state.input.take().expect("shared input present")
435 };
436 let next = input.next();
437 let mut state = self
438 .shared
439 .inner
440 .lock()
441 .unwrap_or_else(|poison| poison.into_inner());
442 if next.is_none() {
443 state.exhausted = true;
444 }
445 state.input = Some(input);
446 next
447 }
448}
449
450pub struct RestartFlow;
457
458impl RestartFlow {
459 #[must_use]
460 pub fn with_backoff<In, Out, Mat, F>(settings: RestartSettings, factory: F) -> Flow<In, Out>
461 where
462 In: Send + 'static,
463 Out: Send + 'static,
464 Mat: Send + 'static,
465 F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
466 {
467 restart_flow(settings, factory, false)
468 }
469
470 #[must_use]
471 pub fn on_failures_with_backoff<In, Out, Mat, F>(
472 settings: RestartSettings,
473 factory: F,
474 ) -> Flow<In, Out>
475 where
476 In: Send + 'static,
477 Out: Send + 'static,
478 Mat: Send + 'static,
479 F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
480 {
481 restart_flow(settings, factory, true)
482 }
483}
484
485fn restart_flow<In, Out, Mat, F>(
486 settings: RestartSettings,
487 factory: F,
488 only_on_failures: bool,
489) -> Flow<In, Out>
490where
491 In: Send + 'static,
492 Out: Send + 'static,
493 Mat: Send + 'static,
494 F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
495{
496 let factory = Arc::new(factory);
497 Flow::from_runtime_transform(move |input, materializer| {
498 let shared = SharedInput::new(input);
499 Ok(Box::new(RestartFlowStream {
500 factory: Arc::clone(&factory),
501 materializer: materializer.clone(),
502 shared,
503 backoff: RestartBackoff::new(settings.clone()),
504 current: None,
505 only_on_failures,
506 terminal: None,
507 _marker: PhantomData::<Mat>,
508 }))
509 })
510}
511
512struct RestartFlowStream<In, Out, Mat, F> {
513 factory: Arc<F>,
514 materializer: Materializer,
515 shared: SharedInput<In>,
516 backoff: RestartBackoff,
517 current: Option<BoxStream<Out>>,
518 only_on_failures: bool,
519 terminal: Option<StreamResult<()>>,
520 _marker: PhantomData<Mat>,
521}
522
523impl<In, Out, Mat, F> RestartFlowStream<In, Out, Mat, F>
524where
525 In: Send + 'static,
526 Out: Send + 'static,
527 Mat: Send + 'static,
528 F: Fn() -> Flow<In, Out, Mat>,
529{
530 fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
531 let flow = invoke_factory("RestartFlow factory", self.factory.as_ref())?;
532 (flow.materialize)()?;
533 let input = Box::new(self.shared.stream()) as BoxStream<In>;
534 match flow.transform {
535 FlowTransform::Pure(transform) => Ok(transform(input)),
536 FlowTransform::Runtime(transform) => transform(input, &self.materializer),
537 }
538 }
539
540 fn restart_or_finish(
541 &mut self,
542 cause: RestartCause,
543 error: Option<StreamError>,
544 ) -> RestartDecision<Out> {
545 if self.shared.is_exhausted() && cause == RestartCause::Completion {
546 return RestartDecision::Finish(None);
547 }
548 if self.only_on_failures && cause == RestartCause::Completion {
549 return RestartDecision::Finish(None);
550 }
551 let Some(delay) = self.backoff.next_delay() else {
552 return RestartDecision::Finish(error.map(Err));
553 };
554 if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
555 return RestartDecision::Finish(Some(Err(wait_error)));
556 }
557 RestartDecision::Restart
558 }
559
560 fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
561 match result {
562 Some(Err(error)) => {
563 self.terminal = Some(Err(error.clone()));
564 Some(Err(error))
565 }
566 Some(Ok(_)) | None => {
567 self.terminal = Some(Ok(()));
568 None
569 }
570 }
571 }
572}
573
574impl<In, Out, Mat, F> Iterator for RestartFlowStream<In, Out, Mat, F>
575where
576 In: Send + 'static,
577 Out: Send + 'static,
578 Mat: Send + 'static,
579 F: Fn() -> Flow<In, Out, Mat>,
580{
581 type Item = StreamResult<Out>;
582
583 fn next(&mut self) -> Option<Self::Item> {
584 if let Some(terminal) = &self.terminal {
585 return match terminal {
586 Ok(()) => None,
587 Err(error) => Some(Err(error.clone())),
588 };
589 }
590 loop {
591 if self.current.is_none() {
592 if self.shared.is_exhausted() {
593 return None;
594 }
595 match self.rematerialize() {
596 Ok(stream) => self.current = Some(stream),
597 Err(error) => {
598 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
599 RestartDecision::Restart => {}
600 RestartDecision::Finish(result) => return self.finish(result),
601 }
602 continue;
603 }
604 }
605 }
606
607 let next = self
608 .current
609 .as_mut()
610 .expect("restart flow child exists")
611 .next();
612 match next {
613 Some(Ok(item)) => return Some(Ok(item)),
614 Some(Err(error)) => {
615 self.current = None;
616 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
617 RestartDecision::Restart => {}
618 RestartDecision::Finish(result) => return self.finish(result),
619 }
620 }
621 None => {
622 self.current = None;
623 match self.restart_or_finish(RestartCause::Completion, None) {
624 RestartDecision::Restart => {}
625 RestartDecision::Finish(result) => return self.finish(result),
626 }
627 }
628 }
629 }
630 }
631}
632
633pub struct RestartSink;
640
641impl RestartSink {
642 #[must_use]
643 pub fn with_backoff<In, F>(
644 settings: RestartSettings,
645 factory: F,
646 ) -> Sink<In, StreamCompletion<NotUsed>>
647 where
648 In: Send + 'static,
649 F: Fn() -> Sink<In, StreamCompletion<NotUsed>> + Send + Sync + 'static,
650 {
651 let factory = Arc::new(factory);
652 Sink::from_runner(move |input, materializer| {
653 let materializer = materializer.clone();
654 let factory = Arc::clone(&factory);
655 let settings = settings.clone();
656 let state = Arc::clone(&materializer.inner.state);
657 Ok(materializer.clone().spawn_stream(move |cancelled| {
658 let checked = runtime_checked_stream(input, state, Some(cancelled));
659 let shared = SharedInput::new(Box::new(checked) as BoxStream<In>);
660 let mut backoff = RestartBackoff::new(settings.clone());
661 loop {
662 if shared.is_exhausted() {
663 return Ok(NotUsed);
664 }
665 let sink = invoke_factory("RestartSink factory", factory.as_ref())?;
666 let completion = sink.run(Box::new(shared.stream()), &materializer)?;
667 match completion.wait() {
668 Ok(_) if shared.is_exhausted() => return Ok(NotUsed),
669 Ok(_) => {
670 let Some(delay) = backoff.next_delay() else {
671 return Ok(NotUsed);
672 };
673 wait_backoff(&materializer, delay)?;
674 }
675 Err(error) => {
676 let Some(delay) = backoff.next_delay() else {
677 return Err(error);
678 };
679 wait_backoff(&materializer, delay)?;
680 }
681 }
682 }
683 }))
684 })
685 }
686}
687
688pub struct RetryFlow;
694
695type RetryDecider<In, Out> = Arc<dyn Fn(&In, &Out) -> Option<In> + Send + Sync>;
696
697impl RetryFlow {
698 #[must_use]
699 pub fn with_backoff<In, Out, Mat, Decide>(
700 min_backoff: Duration,
701 max_backoff: Duration,
702 random_factor: f64,
703 max_retries: usize,
704 flow: Flow<In, Out, Mat>,
705 decide_retry: Decide,
706 ) -> Flow<In, Out>
707 where
708 In: Clone + Send + 'static,
709 Out: Send + 'static,
710 Mat: Send + 'static,
711 Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
712 {
713 retry_flow(
714 min_backoff,
715 max_backoff,
716 random_factor,
717 max_retries,
718 flow,
719 Arc::new(decide_retry),
720 )
721 }
722
723 #[must_use]
724 pub fn with_backoff_and_context<In, CtxIn, Out, CtxOut, Mat, Decide>(
725 min_backoff: Duration,
726 max_backoff: Duration,
727 random_factor: f64,
728 max_retries: usize,
729 flow: FlowWithContext<In, CtxIn, Out, CtxOut, Mat>,
730 decide_retry: Decide,
731 ) -> FlowWithContext<In, CtxIn, Out, CtxOut>
732 where
733 In: Clone + Send + 'static,
734 CtxIn: Clone + Send + 'static,
735 Out: Send + 'static,
736 CtxOut: Send + 'static,
737 Mat: Send + 'static,
738 Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
739 {
740 let decide_retry = Arc::new(decide_retry);
741 let delegate = retry_flow(
742 min_backoff,
743 max_backoff,
744 random_factor,
745 max_retries,
746 flow.as_flow(),
747 Arc::new(move |input: &(In, CtxIn), output: &(Out, CtxOut)| {
748 decide_retry(&input.0, &output.0).map(|next| (next, input.1.clone()))
749 }),
750 );
751 FlowWithContext::from_flow(delegate)
752 }
753}
754
755fn retry_flow<In, Out, Mat>(
756 min_backoff: Duration,
757 max_backoff: Duration,
758 random_factor: f64,
759 max_retries: usize,
760 flow: Flow<In, Out, Mat>,
761 decide_retry: RetryDecider<In, Out>,
762) -> Flow<In, Out>
763where
764 In: Clone + Send + 'static,
765 Out: Send + 'static,
766 Mat: Send + 'static,
767{
768 let settings = RestartSettings::new(min_backoff, max_backoff, random_factor)
769 .with_max_restarts(max_retries, max_backoff.max(min_backoff));
770 Flow::from_runtime_transform(move |input, materializer| {
771 Ok(Box::new(RetryFlowStream {
772 input,
773 materializer: materializer.clone(),
774 flow: flow.clone(),
775 decide_retry: Arc::clone(&decide_retry),
776 settings: settings.clone(),
777 }))
778 })
779}
780
781struct RetryFlowStream<In, Out, Mat> {
782 input: BoxStream<In>,
783 materializer: Materializer,
784 flow: Flow<In, Out, Mat>,
785 decide_retry: RetryDecider<In, Out>,
786 settings: RestartSettings,
787}
788
789impl<In, Out, Mat> RetryFlowStream<In, Out, Mat>
790where
791 In: Clone + Send + 'static,
792 Out: Send + 'static,
793 Mat: Send + 'static,
794{
795 fn run_once(&self, item: In) -> StreamResult<Out> {
796 (self.flow.materialize)()?;
797 let input = Box::new(std::iter::once(Ok(item))) as BoxStream<In>;
798 let mut stream = match &self.flow.transform {
799 FlowTransform::Pure(transform) => transform(input),
800 FlowTransform::Runtime(transform) => transform(input, &self.materializer)?,
801 };
802 let output = match stream.next() {
803 Some(Ok(output)) => output,
804 Some(Err(error)) => return Err(error),
805 None => {
806 return Err(StreamError::Failed(
807 "RetryFlow inner flow produced no output".to_owned(),
808 ));
809 }
810 };
811 if stream.next().is_some() {
812 return Err(StreamError::Failed(
813 "RetryFlow inner flow produced more than one output".to_owned(),
814 ));
815 }
816 Ok(output)
817 }
818}
819
820impl<In, Out, Mat> Iterator for RetryFlowStream<In, Out, Mat>
821where
822 In: Clone + Send + 'static,
823 Out: Send + 'static,
824 Mat: Send + 'static,
825{
826 type Item = StreamResult<Out>;
827
828 fn next(&mut self) -> Option<Self::Item> {
829 let original = match self.input.next()? {
830 Ok(item) => item,
831 Err(error) => return Some(Err(error)),
832 };
833 let mut current = original.clone();
834 let mut retries = 0_usize;
835 let mut backoff = RestartBackoff::new(self.settings.clone());
836 loop {
837 let output = match self.run_once(current.clone()) {
838 Ok(output) => output,
839 Err(error) => return Some(Err(error)),
840 };
841 let retry =
842 match catch_unwind(AssertUnwindSafe(|| (self.decide_retry)(&original, &output))) {
843 Ok(retry) => retry,
844 Err(_) => return Some(Err(panic_stream_error("RetryFlow decider"))),
845 };
846 let Some(next_input) = retry else {
847 return Some(Ok(output));
848 };
849 if retries >= self.settings.max_restarts {
850 return Some(Ok(output));
851 }
852 let delay = backoff.next_delay().unwrap_or(self.settings.max_backoff);
853 if let Err(error) = wait_backoff(&self.materializer, delay) {
854 return Some(Err(error));
855 }
856 retries += 1;
857 current = next_input;
858 }
859 }
860}
861
862#[cfg(test)]
863mod tests {
864 use super::*;
865 use std::sync::{
866 Arc,
867 atomic::{AtomicUsize, Ordering},
868 };
869
870 fn boom() -> StreamError {
871 StreamError::Failed("boom".to_owned())
872 }
873
874 #[test]
875 fn supervised_map_result_stops_by_default_and_resumes_when_requested() {
876 let stopped = Source::from_iter([1, 2, 3, 4])
877 .map_result(|item| if item == 3 { Err(boom()) } else { Ok(item) })
878 .run_collect();
879 assert_eq!(stopped, Err(boom()));
880
881 let resumed = Source::from_iter([1, 2, 3, 4])
882 .map_result_with_supervision(
883 |item| if item == 3 { Err(boom()) } else { Ok(item) },
884 Supervision::resuming_decider(),
885 )
886 .run_collect()
887 .unwrap();
888 assert_eq!(resumed, vec![1, 2, 4]);
889 }
890
891 #[test]
892 fn supervised_scan_restart_resets_state_and_reemits_seed() {
893 let resumed = Source::from_iter([1, 3, -1, 5, 7])
894 .scan_result_with_supervision(
895 0,
896 |acc, item| {
897 if item < 0 {
898 Err(boom())
899 } else {
900 Ok(acc + item)
901 }
902 },
903 Supervision::resuming_decider(),
904 )
905 .run_collect()
906 .unwrap();
907 assert_eq!(resumed, vec![0, 1, 4, 9, 16]);
908
909 let restarted = Source::from_iter([1, 3, -1, 5, 7])
910 .scan_result_with_supervision(
911 0,
912 |acc, item| {
913 if item < 0 {
914 Err(boom())
915 } else {
916 Ok(acc + item)
917 }
918 },
919 Supervision::restarting_decider(),
920 )
921 .run_collect()
922 .unwrap();
923 assert_eq!(restarted, vec![0, 1, 4, 0, 5, 12]);
924 }
925
926 #[test]
927 fn supervised_fold_restart_resets_accumulator() {
928 let resumed = Source::from_iter(1..=5)
929 .run_with(Sink::fold_result_with_supervision(
930 0,
931 |acc, item| {
932 if item == 3 {
933 Err(boom())
934 } else {
935 Ok(acc + item)
936 }
937 },
938 Supervision::resuming_decider(),
939 ))
940 .unwrap()
941 .wait()
942 .unwrap();
943 assert_eq!(resumed, 12);
944
945 let restarted = Source::from_iter(1..=5)
946 .run_with(Sink::fold_result_with_supervision(
947 0,
948 |acc, item| {
949 if item == 3 {
950 Err(boom())
951 } else {
952 Ok(acc + item)
953 }
954 },
955 Supervision::restarting_decider(),
956 ))
957 .unwrap()
958 .wait()
959 .unwrap();
960 assert_eq!(restarted, 9);
961 }
962
963 #[test]
964 fn supervised_map_async_drops_failed_future_once() {
965 let decisions = Arc::new(AtomicUsize::new(0));
966 let decider = {
967 let decisions = Arc::clone(&decisions);
968 Arc::new(move |_: &StreamError| {
969 decisions.fetch_add(1, Ordering::SeqCst);
970 SupervisionDirective::Resume
971 }) as SupervisionDecider
972 };
973
974 let collected = Source::from_iter([1, 2, 3, 4])
975 .map_async_with_supervision(
976 2,
977 |item| async move { if item == 3 { Err(boom()) } else { Ok(item) } },
978 decider,
979 )
980 .run_collect()
981 .unwrap();
982
983 assert_eq!(collected, vec![1, 2, 4]);
984 assert_eq!(decisions.load(Ordering::SeqCst), 1);
985 }
986
987 #[test]
988 fn restart_source_restarts_on_completion_until_cap() {
989 let attempts = Arc::new(AtomicUsize::new(0));
990 let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
991 .with_max_restarts(2, Duration::from_secs(1));
992 let source = RestartSource::with_backoff(settings, {
993 let attempts = Arc::clone(&attempts);
994 move || {
995 let attempt = attempts.fetch_add(1, Ordering::SeqCst);
996 Source::single(attempt)
997 }
998 });
999
1000 let values = source.run_collect().unwrap();
1001 assert_eq!(values, vec![0, 1, 2]);
1002 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1003 }
1004
1005 #[test]
1006 fn restart_source_on_failures_does_not_restart_on_completion() {
1007 let attempts = Arc::new(AtomicUsize::new(0));
1008 let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1009 .with_max_restarts(3, Duration::from_secs(1));
1010 let source = RestartSource::on_failures_with_backoff(settings, {
1011 let attempts = Arc::clone(&attempts);
1012 move || {
1013 if attempts.fetch_add(1, Ordering::SeqCst) == 0 {
1014 Source::failed(boom())
1015 } else {
1016 Source::from_iter([7, 8])
1017 }
1018 }
1019 });
1020
1021 let values = source.run_collect().unwrap();
1022 assert_eq!(values, vec![7, 8]);
1023 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1024 }
1025
1026 #[test]
1027 fn restart_source_cap_resets_after_within_window() {
1028 let attempts = Arc::new(AtomicUsize::new(0));
1029 let settings =
1030 RestartSettings::new(Duration::from_millis(8), Duration::from_millis(8), 0.0)
1031 .with_max_restarts(1, Duration::from_millis(1));
1032 let source = RestartSource::on_failures_with_backoff(settings, {
1033 let attempts = Arc::clone(&attempts);
1034 move || {
1035 let attempt = attempts.fetch_add(1, Ordering::SeqCst);
1036 if attempt < 2 {
1037 Source::failed(boom())
1038 } else {
1039 Source::single(42)
1040 }
1041 }
1042 });
1043
1044 let started = Instant::now();
1045 let values = source.run_collect().unwrap();
1046 assert_eq!(values, vec![42]);
1047 assert!(started.elapsed() >= Duration::from_millis(8));
1048 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1049 }
1050
1051 #[test]
1052 fn restart_flow_drops_failed_in_flight_element_and_continues() {
1053 let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1054 .with_max_restarts(1, Duration::from_secs(1));
1055 let values = Source::from_iter([1, 2, 3, 4, 5])
1056 .via(RestartFlow::on_failures_with_backoff(settings, || {
1057 Flow::identity().map_result(|item| if item == 3 { Err(boom()) } else { Ok(item) })
1058 }))
1059 .run_collect()
1060 .unwrap();
1061
1062 assert_eq!(values, vec![1, 2, 4, 5]);
1063 }
1064
1065 #[test]
1066 fn retry_flow_retries_with_backoff_then_emits_last_output() {
1067 let flow = Flow::identity().map(|item: i32| item / 2);
1068 let values = Source::from_iter([5, 1])
1069 .via(RetryFlow::with_backoff(
1070 Duration::ZERO,
1071 Duration::ZERO,
1072 0.0,
1073 3,
1074 flow,
1075 |_, output| (*output > 0).then_some(*output),
1076 ))
1077 .run_collect()
1078 .unwrap();
1079
1080 assert_eq!(values, vec![0, 0]);
1081 }
1082}