1use super::*;
10use crate::context::FlowWithContext;
11use crate::stream::error::panic_stream_error;
12use crate::stream::flow::FlowTransform;
13use std::{
14 marker::PhantomData,
15 sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
16 time::Instant,
17};
18
19static BACKOFF_RANDOM_SEED: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
20
21#[derive(Clone, Debug)]
28pub struct RestartSettings {
29 min_backoff: Duration,
30 max_backoff: Duration,
31 random_factor: f64,
32 max_restarts: usize,
33 max_restarts_within: Duration,
34}
35
36impl RestartSettings {
37 #[must_use]
38 pub fn new(min_backoff: Duration, max_backoff: Duration, random_factor: f64) -> Self {
39 assert!(min_backoff <= max_backoff);
40 assert!(random_factor >= 0.0);
41 Self {
42 min_backoff,
43 max_backoff,
44 random_factor,
45 max_restarts: usize::MAX,
46 max_restarts_within: min_backoff,
47 }
48 }
49
50 #[must_use]
51 pub fn min_backoff(&self) -> Duration {
52 self.min_backoff
53 }
54
55 #[must_use]
56 pub fn max_backoff(&self) -> Duration {
57 self.max_backoff
58 }
59
60 #[must_use]
61 pub fn random_factor(&self) -> f64 {
62 self.random_factor
63 }
64
65 #[must_use]
66 pub fn max_restarts(&self) -> usize {
67 self.max_restarts
68 }
69
70 #[must_use]
71 pub fn max_restarts_within(&self) -> Duration {
72 self.max_restarts_within
73 }
74
75 #[must_use]
76 pub fn with_min_backoff(mut self, value: Duration) -> Self {
77 assert!(value <= self.max_backoff);
78 self.min_backoff = value;
79 self
80 }
81
82 #[must_use]
83 pub fn with_max_backoff(mut self, value: Duration) -> Self {
84 assert!(self.min_backoff <= value);
85 self.max_backoff = value;
86 self
87 }
88
89 #[must_use]
90 pub fn with_random_factor(mut self, value: f64) -> Self {
91 assert!(value >= 0.0);
92 self.random_factor = value;
93 self
94 }
95
96 #[must_use]
97 pub fn with_max_restarts(mut self, count: usize, within: Duration) -> Self {
98 self.max_restarts = count;
99 self.max_restarts_within = within;
100 self
101 }
102}
103
104#[derive(Clone, Copy, Debug, PartialEq, Eq)]
105enum RestartCause {
106 Failure,
107 Completion,
108}
109
110enum RestartDecision<T> {
111 Restart,
112 Finish(Option<StreamResult<T>>),
113}
114
115struct RestartBackoff {
116 settings: RestartSettings,
117 window_start: Instant,
118 restarts_in_window: usize,
119}
120
121impl RestartBackoff {
122 fn new(settings: RestartSettings) -> Self {
123 Self {
124 settings,
125 window_start: Instant::now(),
126 restarts_in_window: 0,
127 }
128 }
129
130 fn next_delay(&mut self) -> Option<Duration> {
131 let now = Instant::now();
132 if now.duration_since(self.window_start) > self.settings.max_restarts_within {
133 self.window_start = now;
134 self.restarts_in_window = 0;
135 }
136 if self.restarts_in_window >= self.settings.max_restarts {
137 return None;
138 }
139
140 let exponent = (self.restarts_in_window.min(31) as i32).max(0);
141 let base = self
142 .settings
143 .min_backoff
144 .mul_f64(2_f64.powi(exponent))
145 .min(self.settings.max_backoff);
146 self.restarts_in_window += 1;
147 if self.settings.random_factor == 0.0 || base.is_zero() {
148 Some(base)
149 } else {
150 let jitter = base.mul_f64(next_random_fraction() * self.settings.random_factor);
151 Some(base.saturating_add(jitter))
152 }
153 }
154}
155
156fn next_random_fraction() -> f64 {
157 let mut current = BACKOFF_RANDOM_SEED.load(AtomicOrdering::Relaxed);
158 loop {
159 let mut next = current;
160 next ^= next << 13;
161 next ^= next >> 7;
162 next ^= next << 17;
163 match BACKOFF_RANDOM_SEED.compare_exchange_weak(
164 current,
165 next,
166 AtomicOrdering::Relaxed,
167 AtomicOrdering::Relaxed,
168 ) {
169 Ok(_) => return ((next >> 11) as f64) / ((1_u64 << 53) as f64),
170 Err(observed) => current = observed,
171 }
172 }
173}
174
175fn wait_backoff(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
176 if delay.is_zero() {
177 return Ok(());
178 }
179 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
180 let _timer = materializer.schedule_once(delay, move || {
181 let _ = sender.send(());
182 });
183
184 loop {
185 if materializer.is_shutdown() {
186 return Err(StreamError::AbruptTermination);
187 }
188 if super::runtime::current_stream_cancelled()
189 .as_ref()
190 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
191 {
192 return Err(StreamError::Cancelled);
193 }
194 match receiver.recv_timeout(Duration::from_millis(25)) {
199 Ok(()) => return Ok(()),
200 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
201 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
202 return Err(StreamError::AbruptTermination);
203 }
204 }
205 }
206}
207
208fn invoke_factory<T, F>(context: &str, factory: &F) -> StreamResult<T>
209where
210 F: Fn() -> T,
211{
212 catch_unwind(AssertUnwindSafe(factory)).map_err(|_| panic_stream_error(context))
213}
214
215pub struct RestartSource;
222
223impl RestartSource {
224 #[must_use]
225 pub fn with_backoff<Out, Mat, F>(settings: RestartSettings, factory: F) -> Source<Out>
226 where
227 Out: Send + 'static,
228 Mat: Send + 'static,
229 F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
230 {
231 restart_source(settings, factory, false)
232 }
233
234 #[must_use]
235 pub fn on_failures_with_backoff<Out, Mat, F>(
236 settings: RestartSettings,
237 factory: F,
238 ) -> Source<Out>
239 where
240 Out: Send + 'static,
241 Mat: Send + 'static,
242 F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
243 {
244 restart_source(settings, factory, true)
245 }
246}
247
248fn restart_source<Out, Mat, F>(
249 settings: RestartSettings,
250 factory: F,
251 only_on_failures: bool,
252) -> Source<Out>
253where
254 Out: Send + 'static,
255 Mat: Send + 'static,
256 F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
257{
258 let factory = Arc::new(factory);
259 Source::from_materialized_factory(move |materializer| {
260 Ok((
261 Box::new(RestartSourceStream {
262 factory: Arc::clone(&factory),
263 materializer: materializer.clone(),
264 backoff: RestartBackoff::new(settings.clone()),
265 current: None,
266 only_on_failures,
267 terminal: None,
268 _marker: PhantomData::<Mat>,
269 }),
270 NotUsed,
271 ))
272 })
273}
274
275struct RestartSourceStream<Out, Mat, F> {
276 factory: Arc<F>,
277 materializer: Materializer,
278 backoff: RestartBackoff,
279 current: Option<BoxStream<Out>>,
280 only_on_failures: bool,
281 terminal: Option<StreamResult<()>>,
282 _marker: PhantomData<Mat>,
283}
284
285impl<Out, Mat, F> RestartSourceStream<Out, Mat, F>
286where
287 Out: Send + 'static,
288 Mat: Send + 'static,
289 F: Fn() -> Source<Out, Mat>,
290{
291 fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
292 let source = invoke_factory("RestartSource factory", self.factory.as_ref())?;
293 Arc::clone(&source.factory)
294 .create(&self.materializer)
295 .map(|(stream, _)| stream)
296 }
297
298 fn restart_or_finish(
299 &mut self,
300 cause: RestartCause,
301 error: Option<StreamError>,
302 ) -> RestartDecision<Out> {
303 if self.only_on_failures && cause == RestartCause::Completion {
304 return RestartDecision::Finish(None);
305 }
306 let Some(delay) = self.backoff.next_delay() else {
307 return RestartDecision::Finish(error.map(Err));
308 };
309 if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
310 return RestartDecision::Finish(Some(Err(wait_error)));
311 }
312 RestartDecision::Restart
313 }
314
315 fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
316 match result {
317 Some(Err(error)) => {
318 self.terminal = Some(Err(error.clone()));
319 Some(Err(error))
320 }
321 Some(Ok(_)) | None => {
322 self.terminal = Some(Ok(()));
323 None
324 }
325 }
326 }
327}
328
329impl<Out, Mat, F> Iterator for RestartSourceStream<Out, Mat, F>
330where
331 Out: Send + 'static,
332 Mat: Send + 'static,
333 F: Fn() -> Source<Out, Mat>,
334{
335 type Item = StreamResult<Out>;
336
337 fn next(&mut self) -> Option<Self::Item> {
338 if let Some(terminal) = &self.terminal {
339 return match terminal {
340 Ok(()) => None,
341 Err(error) => Some(Err(error.clone())),
342 };
343 }
344 loop {
345 if self.current.is_none() {
346 match self.rematerialize() {
347 Ok(stream) => self.current = Some(stream),
348 Err(error) => {
349 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
350 RestartDecision::Restart => {}
351 RestartDecision::Finish(result) => return self.finish(result),
352 }
353 continue;
354 }
355 }
356 }
357
358 let next = self
359 .current
360 .as_mut()
361 .expect("restart source child exists")
362 .next();
363 match next {
364 Some(Ok(item)) => return Some(Ok(item)),
365 Some(Err(error)) => {
366 self.current = None;
367 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
368 RestartDecision::Restart => {}
369 RestartDecision::Finish(result) => return self.finish(result),
370 }
371 }
372 None => {
373 self.current = None;
374 match self.restart_or_finish(RestartCause::Completion, None) {
375 RestartDecision::Restart => {}
376 RestartDecision::Finish(result) => return self.finish(result),
377 }
378 }
379 }
380 }
381 }
382}
383
384struct SharedInput<In> {
385 inner: Arc<Mutex<SharedInputState<In>>>,
386}
387
388impl<In> Clone for SharedInput<In> {
389 fn clone(&self) -> Self {
390 Self {
391 inner: Arc::clone(&self.inner),
392 }
393 }
394}
395
396struct SharedInputState<In> {
397 input: Option<BoxStream<In>>,
398 exhausted: bool,
399}
400
401impl<In> SharedInput<In> {
402 fn new(input: BoxStream<In>) -> Self {
403 Self {
404 inner: Arc::new(Mutex::new(SharedInputState {
405 input: Some(input),
406 exhausted: false,
407 })),
408 }
409 }
410
411 fn stream(&self) -> SharedInputStream<In> {
412 SharedInputStream {
413 shared: self.clone(),
414 }
415 }
416
417 fn is_exhausted(&self) -> bool {
418 self.inner
419 .lock()
420 .unwrap_or_else(|poison| poison.into_inner())
421 .exhausted
422 }
423}
424
425struct SharedInputStream<In> {
426 shared: SharedInput<In>,
427}
428
429impl<In> Iterator for SharedInputStream<In> {
430 type Item = StreamResult<In>;
431
432 fn next(&mut self) -> Option<Self::Item> {
433 let mut input = {
434 let mut state = self
435 .shared
436 .inner
437 .lock()
438 .unwrap_or_else(|poison| poison.into_inner());
439 if state.exhausted {
440 return None;
441 }
442 state.input.take().expect("shared input present")
443 };
444 let next = input.next();
445 let mut state = self
446 .shared
447 .inner
448 .lock()
449 .unwrap_or_else(|poison| poison.into_inner());
450 if next.is_none() {
451 state.exhausted = true;
452 }
453 state.input = Some(input);
454 next
455 }
456}
457
458pub struct RestartFlow;
465
466impl RestartFlow {
467 #[must_use]
468 pub fn with_backoff<In, Out, Mat, F>(settings: RestartSettings, factory: F) -> Flow<In, Out>
469 where
470 In: Send + 'static,
471 Out: Send + 'static,
472 Mat: Send + 'static,
473 F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
474 {
475 restart_flow(settings, factory, false)
476 }
477
478 #[must_use]
479 pub fn on_failures_with_backoff<In, Out, Mat, F>(
480 settings: RestartSettings,
481 factory: F,
482 ) -> Flow<In, Out>
483 where
484 In: Send + 'static,
485 Out: Send + 'static,
486 Mat: Send + 'static,
487 F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
488 {
489 restart_flow(settings, factory, true)
490 }
491}
492
493fn restart_flow<In, Out, Mat, F>(
494 settings: RestartSettings,
495 factory: F,
496 only_on_failures: bool,
497) -> Flow<In, Out>
498where
499 In: Send + 'static,
500 Out: Send + 'static,
501 Mat: Send + 'static,
502 F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
503{
504 let factory = Arc::new(factory);
505 Flow::from_runtime_transform(move |input, materializer| {
506 let shared = SharedInput::new(input);
507 Ok(Box::new(RestartFlowStream {
508 factory: Arc::clone(&factory),
509 materializer: materializer.clone(),
510 shared,
511 backoff: RestartBackoff::new(settings.clone()),
512 current: None,
513 only_on_failures,
514 terminal: None,
515 _marker: PhantomData::<Mat>,
516 }))
517 })
518}
519
520struct RestartFlowStream<In, Out, Mat, F> {
521 factory: Arc<F>,
522 materializer: Materializer,
523 shared: SharedInput<In>,
524 backoff: RestartBackoff,
525 current: Option<BoxStream<Out>>,
526 only_on_failures: bool,
527 terminal: Option<StreamResult<()>>,
528 _marker: PhantomData<Mat>,
529}
530
531impl<In, Out, Mat, F> RestartFlowStream<In, Out, Mat, F>
532where
533 In: Send + 'static,
534 Out: Send + 'static,
535 Mat: Send + 'static,
536 F: Fn() -> Flow<In, Out, Mat>,
537{
538 fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
539 let flow = invoke_factory("RestartFlow factory", self.factory.as_ref())?;
540 (flow.materialize)()?;
541 let input = Box::new(self.shared.stream()) as BoxStream<In>;
542 match flow.transform {
543 FlowTransform::Pure(transform) => Ok(transform(input)),
544 FlowTransform::Runtime(transform) => transform(input, &self.materializer),
545 }
546 }
547
548 fn restart_or_finish(
549 &mut self,
550 cause: RestartCause,
551 error: Option<StreamError>,
552 ) -> RestartDecision<Out> {
553 if self.shared.is_exhausted() && cause == RestartCause::Completion {
554 return RestartDecision::Finish(None);
555 }
556 if self.only_on_failures && cause == RestartCause::Completion {
557 return RestartDecision::Finish(None);
558 }
559 let Some(delay) = self.backoff.next_delay() else {
560 return RestartDecision::Finish(error.map(Err));
561 };
562 if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
563 return RestartDecision::Finish(Some(Err(wait_error)));
564 }
565 RestartDecision::Restart
566 }
567
568 fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
569 match result {
570 Some(Err(error)) => {
571 self.terminal = Some(Err(error.clone()));
572 Some(Err(error))
573 }
574 Some(Ok(_)) | None => {
575 self.terminal = Some(Ok(()));
576 None
577 }
578 }
579 }
580}
581
582impl<In, Out, Mat, F> Iterator for RestartFlowStream<In, Out, Mat, F>
583where
584 In: Send + 'static,
585 Out: Send + 'static,
586 Mat: Send + 'static,
587 F: Fn() -> Flow<In, Out, Mat>,
588{
589 type Item = StreamResult<Out>;
590
591 fn next(&mut self) -> Option<Self::Item> {
592 if let Some(terminal) = &self.terminal {
593 return match terminal {
594 Ok(()) => None,
595 Err(error) => Some(Err(error.clone())),
596 };
597 }
598 loop {
599 if self.current.is_none() {
600 if self.shared.is_exhausted() {
601 return None;
602 }
603 match self.rematerialize() {
604 Ok(stream) => self.current = Some(stream),
605 Err(error) => {
606 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
607 RestartDecision::Restart => {}
608 RestartDecision::Finish(result) => return self.finish(result),
609 }
610 continue;
611 }
612 }
613 }
614
615 let next = self
616 .current
617 .as_mut()
618 .expect("restart flow child exists")
619 .next();
620 match next {
621 Some(Ok(item)) => return Some(Ok(item)),
622 Some(Err(error)) => {
623 self.current = None;
624 match self.restart_or_finish(RestartCause::Failure, Some(error)) {
625 RestartDecision::Restart => {}
626 RestartDecision::Finish(result) => return self.finish(result),
627 }
628 }
629 None => {
630 self.current = None;
631 match self.restart_or_finish(RestartCause::Completion, None) {
632 RestartDecision::Restart => {}
633 RestartDecision::Finish(result) => return self.finish(result),
634 }
635 }
636 }
637 }
638 }
639}
640
641pub struct RestartSink;
648
649impl RestartSink {
650 #[must_use]
651 pub fn with_backoff<In, F>(
652 settings: RestartSettings,
653 factory: F,
654 ) -> Sink<In, StreamCompletion<NotUsed>>
655 where
656 In: Send + 'static,
657 F: Fn() -> Sink<In, StreamCompletion<NotUsed>> + Send + Sync + 'static,
658 {
659 let factory = Arc::new(factory);
660 Sink::from_runner(move |input, materializer| {
661 let materializer = materializer.clone();
662 let factory = Arc::clone(&factory);
663 let settings = settings.clone();
664 let state = Arc::clone(&materializer.inner.state);
665 Ok(materializer.clone().spawn_stream(move |cancelled| {
666 let checked = runtime_checked_stream(input, state, Some(cancelled));
667 let shared = SharedInput::new(Box::new(checked) as BoxStream<In>);
668 let mut backoff = RestartBackoff::new(settings.clone());
669 loop {
670 if shared.is_exhausted() {
671 return Ok(NotUsed);
672 }
673 let sink = invoke_factory("RestartSink factory", factory.as_ref())?;
674 let completion = sink.run(Box::new(shared.stream()), &materializer)?;
675 match completion.wait() {
676 Ok(_) if shared.is_exhausted() => return Ok(NotUsed),
677 Ok(_) => {
678 let Some(delay) = backoff.next_delay() else {
679 return Ok(NotUsed);
680 };
681 wait_backoff(&materializer, delay)?;
682 }
683 Err(error) => {
684 let Some(delay) = backoff.next_delay() else {
685 return Err(error);
686 };
687 wait_backoff(&materializer, delay)?;
688 }
689 }
690 }
691 }))
692 })
693 }
694}
695
696pub struct RetryFlow;
702
703type RetryDecider<In, Out> = Arc<dyn Fn(&In, &Out) -> Option<In> + Send + Sync>;
704
705impl RetryFlow {
706 #[must_use]
707 pub fn with_backoff<In, Out, Mat, Decide>(
708 min_backoff: Duration,
709 max_backoff: Duration,
710 random_factor: f64,
711 max_retries: usize,
712 flow: Flow<In, Out, Mat>,
713 decide_retry: Decide,
714 ) -> Flow<In, Out>
715 where
716 In: Clone + Send + 'static,
717 Out: Send + 'static,
718 Mat: Send + 'static,
719 Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
720 {
721 retry_flow(
722 min_backoff,
723 max_backoff,
724 random_factor,
725 max_retries,
726 flow,
727 Arc::new(decide_retry),
728 )
729 }
730
731 #[must_use]
732 pub fn with_backoff_and_context<In, CtxIn, Out, CtxOut, Mat, Decide>(
733 min_backoff: Duration,
734 max_backoff: Duration,
735 random_factor: f64,
736 max_retries: usize,
737 flow: FlowWithContext<In, CtxIn, Out, CtxOut, Mat>,
738 decide_retry: Decide,
739 ) -> FlowWithContext<In, CtxIn, Out, CtxOut>
740 where
741 In: Clone + Send + 'static,
742 CtxIn: Clone + Send + 'static,
743 Out: Send + 'static,
744 CtxOut: Send + 'static,
745 Mat: Send + 'static,
746 Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
747 {
748 let decide_retry = Arc::new(decide_retry);
749 let delegate = retry_flow(
750 min_backoff,
751 max_backoff,
752 random_factor,
753 max_retries,
754 flow.as_flow(),
755 Arc::new(move |input: &(In, CtxIn), output: &(Out, CtxOut)| {
756 decide_retry(&input.0, &output.0).map(|next| (next, input.1.clone()))
757 }),
758 );
759 FlowWithContext::from_flow(delegate)
760 }
761}
762
763fn retry_flow<In, Out, Mat>(
764 min_backoff: Duration,
765 max_backoff: Duration,
766 random_factor: f64,
767 max_retries: usize,
768 flow: Flow<In, Out, Mat>,
769 decide_retry: RetryDecider<In, Out>,
770) -> Flow<In, Out>
771where
772 In: Clone + Send + 'static,
773 Out: Send + 'static,
774 Mat: Send + 'static,
775{
776 let settings = RestartSettings::new(min_backoff, max_backoff, random_factor)
777 .with_max_restarts(max_retries, max_backoff.max(min_backoff));
778 Flow::from_runtime_transform(move |input, materializer| {
779 Ok(Box::new(RetryFlowStream {
780 input,
781 materializer: materializer.clone(),
782 flow: flow.clone(),
783 decide_retry: Arc::clone(&decide_retry),
784 settings: settings.clone(),
785 }))
786 })
787}
788
789struct RetryFlowStream<In, Out, Mat> {
790 input: BoxStream<In>,
791 materializer: Materializer,
792 flow: Flow<In, Out, Mat>,
793 decide_retry: RetryDecider<In, Out>,
794 settings: RestartSettings,
795}
796
797impl<In, Out, Mat> RetryFlowStream<In, Out, Mat>
798where
799 In: Clone + Send + 'static,
800 Out: Send + 'static,
801 Mat: Send + 'static,
802{
803 fn run_once(&self, item: In) -> StreamResult<Out> {
804 (self.flow.materialize)()?;
805 let input = Box::new(std::iter::once(Ok(item))) as BoxStream<In>;
806 let mut stream = match &self.flow.transform {
807 FlowTransform::Pure(transform) => transform(input),
808 FlowTransform::Runtime(transform) => transform(input, &self.materializer)?,
809 };
810 let output = match stream.next() {
811 Some(Ok(output)) => output,
812 Some(Err(error)) => return Err(error),
813 None => {
814 return Err(StreamError::Failed(
815 "RetryFlow inner flow produced no output".to_owned(),
816 ));
817 }
818 };
819 if stream.next().is_some() {
820 return Err(StreamError::Failed(
821 "RetryFlow inner flow produced more than one output".to_owned(),
822 ));
823 }
824 Ok(output)
825 }
826}
827
828impl<In, Out, Mat> Iterator for RetryFlowStream<In, Out, Mat>
829where
830 In: Clone + Send + 'static,
831 Out: Send + 'static,
832 Mat: Send + 'static,
833{
834 type Item = StreamResult<Out>;
835
836 fn next(&mut self) -> Option<Self::Item> {
837 let original = match self.input.next()? {
838 Ok(item) => item,
839 Err(error) => return Some(Err(error)),
840 };
841 let mut current = original.clone();
842 let mut retries = 0_usize;
843 let mut backoff = RestartBackoff::new(self.settings.clone());
844 loop {
845 let output = match self.run_once(current.clone()) {
846 Ok(output) => output,
847 Err(error) => return Some(Err(error)),
848 };
849 let retry =
850 match catch_unwind(AssertUnwindSafe(|| (self.decide_retry)(&original, &output))) {
851 Ok(retry) => retry,
852 Err(_) => return Some(Err(panic_stream_error("RetryFlow decider"))),
853 };
854 let Some(next_input) = retry else {
855 return Some(Ok(output));
856 };
857 if retries >= self.settings.max_restarts {
858 return Some(Ok(output));
859 }
860 let delay = backoff.next_delay().unwrap_or(self.settings.max_backoff);
861 if let Err(error) = wait_backoff(&self.materializer, delay) {
862 return Some(Err(error));
863 }
864 retries += 1;
865 current = next_input;
866 }
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873 use std::sync::{
874 Arc,
875 atomic::{AtomicUsize, Ordering},
876 };
877
878 fn boom() -> StreamError {
879 StreamError::Failed("boom".to_owned())
880 }
881
882 #[test]
883 fn supervised_map_result_stops_by_default_and_resumes_when_requested() {
884 let stopped = Source::from_iter([1, 2, 3, 4])
885 .try_map(|item| if item == 3 { Err(boom()) } else { Ok(item) })
886 .run_collect();
887 assert_eq!(stopped, Err(boom()));
888
889 let resumed = Source::from_iter([1, 2, 3, 4])
890 .map_result_with_supervision(
891 |item| if item == 3 { Err(boom()) } else { Ok(item) },
892 Supervision::resuming_decider(),
893 )
894 .run_collect()
895 .unwrap();
896 assert_eq!(resumed, vec![1, 2, 4]);
897 }
898
899 #[test]
900 fn supervised_scan_restart_resets_state_and_reemits_seed() {
901 let resumed = Source::from_iter([1, 3, -1, 5, 7])
902 .scan_result_with_supervision(
903 0,
904 |acc, item| {
905 if item < 0 {
906 Err(boom())
907 } else {
908 Ok(acc + item)
909 }
910 },
911 Supervision::resuming_decider(),
912 )
913 .run_collect()
914 .unwrap();
915 assert_eq!(resumed, vec![0, 1, 4, 9, 16]);
916
917 let restarted = Source::from_iter([1, 3, -1, 5, 7])
918 .scan_result_with_supervision(
919 0,
920 |acc, item| {
921 if item < 0 {
922 Err(boom())
923 } else {
924 Ok(acc + item)
925 }
926 },
927 Supervision::restarting_decider(),
928 )
929 .run_collect()
930 .unwrap();
931 assert_eq!(restarted, vec![0, 1, 4, 0, 5, 12]);
932 }
933
934 #[test]
935 fn supervised_fold_restart_resets_accumulator() {
936 let resumed = Source::from_iter(1..=5)
937 .run_with(Sink::fold_result_with_supervision(
938 0,
939 |acc, item| {
940 if item == 3 {
941 Err(boom())
942 } else {
943 Ok(acc + item)
944 }
945 },
946 Supervision::resuming_decider(),
947 ))
948 .unwrap()
949 .wait()
950 .unwrap();
951 assert_eq!(resumed, 12);
952
953 let restarted = Source::from_iter(1..=5)
954 .run_with(Sink::fold_result_with_supervision(
955 0,
956 |acc, item| {
957 if item == 3 {
958 Err(boom())
959 } else {
960 Ok(acc + item)
961 }
962 },
963 Supervision::restarting_decider(),
964 ))
965 .unwrap()
966 .wait()
967 .unwrap();
968 assert_eq!(restarted, 9);
969 }
970
971 #[test]
972 fn supervised_map_async_drops_failed_future_once() {
973 let decisions = Arc::new(AtomicUsize::new(0));
974 let decider = {
975 let decisions = Arc::clone(&decisions);
976 Arc::new(move |_: &StreamError| {
977 decisions.fetch_add(1, Ordering::SeqCst);
978 SupervisionDirective::Resume
979 }) as SupervisionDecider
980 };
981
982 let collected = Source::from_iter([1, 2, 3, 4])
983 .map_async_with_supervision(
984 2,
985 |item| async move { if item == 3 { Err(boom()) } else { Ok(item) } },
986 decider,
987 )
988 .run_collect()
989 .unwrap();
990
991 assert_eq!(collected, vec![1, 2, 4]);
992 assert_eq!(decisions.load(Ordering::SeqCst), 1);
993 }
994
995 #[test]
996 fn restart_source_restarts_on_completion_until_cap() {
997 let attempts = Arc::new(AtomicUsize::new(0));
998 let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
999 .with_max_restarts(2, Duration::from_secs(1));
1000 let source = RestartSource::with_backoff(settings, {
1001 let attempts = Arc::clone(&attempts);
1002 move || {
1003 let attempt = attempts.fetch_add(1, Ordering::SeqCst);
1004 Source::single(attempt)
1005 }
1006 });
1007
1008 let values = source.run_collect().unwrap();
1009 assert_eq!(values, vec![0, 1, 2]);
1010 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1011 }
1012
1013 #[test]
1014 fn restart_source_on_failures_does_not_restart_on_completion() {
1015 let attempts = Arc::new(AtomicUsize::new(0));
1016 let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1017 .with_max_restarts(3, Duration::from_secs(1));
1018 let source = RestartSource::on_failures_with_backoff(settings, {
1019 let attempts = Arc::clone(&attempts);
1020 move || {
1021 if attempts.fetch_add(1, Ordering::SeqCst) == 0 {
1022 Source::failed(boom())
1023 } else {
1024 Source::from_iter([7, 8])
1025 }
1026 }
1027 });
1028
1029 let values = source.run_collect().unwrap();
1030 assert_eq!(values, vec![7, 8]);
1031 assert_eq!(attempts.load(Ordering::SeqCst), 2);
1032 }
1033
1034 #[test]
1035 fn restart_source_cap_resets_after_within_window() {
1036 let attempts = Arc::new(AtomicUsize::new(0));
1037 let settings =
1038 RestartSettings::new(Duration::from_millis(8), Duration::from_millis(8), 0.0)
1039 .with_max_restarts(1, Duration::from_millis(1));
1040 let source = RestartSource::on_failures_with_backoff(settings, {
1041 let attempts = Arc::clone(&attempts);
1042 move || {
1043 let attempt = attempts.fetch_add(1, Ordering::SeqCst);
1044 if attempt < 2 {
1045 Source::failed(boom())
1046 } else {
1047 Source::single(42)
1048 }
1049 }
1050 });
1051
1052 let started = Instant::now();
1053 let values = source.run_collect().unwrap();
1054 assert_eq!(values, vec![42]);
1055 assert!(started.elapsed() >= Duration::from_millis(8));
1056 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1057 }
1058
1059 #[test]
1060 fn restart_flow_drops_failed_in_flight_element_and_continues() {
1061 let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
1062 .with_max_restarts(1, Duration::from_secs(1));
1063 let values = Source::from_iter([1, 2, 3, 4, 5])
1064 .via(RestartFlow::on_failures_with_backoff(settings, || {
1065 Flow::identity().try_map(|item| if item == 3 { Err(boom()) } else { Ok(item) })
1066 }))
1067 .run_collect()
1068 .unwrap();
1069
1070 assert_eq!(values, vec![1, 2, 4, 5]);
1071 }
1072
1073 #[test]
1074 fn retry_flow_retries_with_backoff_then_emits_last_output() {
1075 let flow = Flow::identity().map(|item: i32| item / 2);
1076 let values = Source::from_iter([5, 1])
1077 .via(RetryFlow::with_backoff(
1078 Duration::ZERO,
1079 Duration::ZERO,
1080 0.0,
1081 3,
1082 flow,
1083 |_, output| (*output > 0).then_some(*output),
1084 ))
1085 .run_collect()
1086 .unwrap();
1087
1088 assert_eq!(values, vec![0, 0]);
1089 }
1090}