use super::*;
use crate::context::FlowWithContext;
use crate::stream::error::panic_stream_error;
use crate::stream::flow::FlowTransform;
use std::{
marker::PhantomData,
sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
time::Instant,
};
static BACKOFF_RANDOM_SEED: AtomicU64 = AtomicU64::new(0x9e37_79b9_7f4a_7c15);
#[derive(Clone, Debug)]
pub struct RestartSettings {
min_backoff: Duration,
max_backoff: Duration,
random_factor: f64,
max_restarts: usize,
max_restarts_within: Duration,
}
impl RestartSettings {
#[must_use]
pub fn new(min_backoff: Duration, max_backoff: Duration, random_factor: f64) -> Self {
assert!(min_backoff <= max_backoff);
assert!(random_factor >= 0.0);
Self {
min_backoff,
max_backoff,
random_factor,
max_restarts: usize::MAX,
max_restarts_within: min_backoff,
}
}
#[must_use]
pub fn min_backoff(&self) -> Duration {
self.min_backoff
}
#[must_use]
pub fn max_backoff(&self) -> Duration {
self.max_backoff
}
#[must_use]
pub fn random_factor(&self) -> f64 {
self.random_factor
}
#[must_use]
pub fn max_restarts(&self) -> usize {
self.max_restarts
}
#[must_use]
pub fn max_restarts_within(&self) -> Duration {
self.max_restarts_within
}
#[must_use]
pub fn with_min_backoff(mut self, value: Duration) -> Self {
assert!(value <= self.max_backoff);
self.min_backoff = value;
self
}
#[must_use]
pub fn with_max_backoff(mut self, value: Duration) -> Self {
assert!(self.min_backoff <= value);
self.max_backoff = value;
self
}
#[must_use]
pub fn with_random_factor(mut self, value: f64) -> Self {
assert!(value >= 0.0);
self.random_factor = value;
self
}
#[must_use]
pub fn with_max_restarts(mut self, count: usize, within: Duration) -> Self {
self.max_restarts = count;
self.max_restarts_within = within;
self
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RestartCause {
Failure,
Completion,
}
enum RestartDecision<T> {
Restart,
Finish(Option<StreamResult<T>>),
}
struct RestartBackoff {
settings: RestartSettings,
window_start: Instant,
restarts_in_window: usize,
}
impl RestartBackoff {
fn new(settings: RestartSettings) -> Self {
Self {
settings,
window_start: Instant::now(),
restarts_in_window: 0,
}
}
fn next_delay(&mut self) -> Option<Duration> {
let now = Instant::now();
if now.duration_since(self.window_start) > self.settings.max_restarts_within {
self.window_start = now;
self.restarts_in_window = 0;
}
if self.restarts_in_window >= self.settings.max_restarts {
return None;
}
let exponent = (self.restarts_in_window.min(31) as i32).max(0);
let base = self
.settings
.min_backoff
.mul_f64(2_f64.powi(exponent))
.min(self.settings.max_backoff);
self.restarts_in_window += 1;
if self.settings.random_factor == 0.0 || base.is_zero() {
Some(base)
} else {
let jitter = base.mul_f64(next_random_fraction() * self.settings.random_factor);
Some(base.saturating_add(jitter))
}
}
}
fn next_random_fraction() -> f64 {
let mut current = BACKOFF_RANDOM_SEED.load(AtomicOrdering::Relaxed);
loop {
let mut next = current;
next ^= next << 13;
next ^= next >> 7;
next ^= next << 17;
match BACKOFF_RANDOM_SEED.compare_exchange_weak(
current,
next,
AtomicOrdering::Relaxed,
AtomicOrdering::Relaxed,
) {
Ok(_) => return ((next >> 11) as f64) / ((1_u64 << 53) as f64),
Err(observed) => current = observed,
}
}
}
fn wait_backoff(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
if delay.is_zero() {
return Ok(());
}
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let _timer = materializer.schedule_once(delay, move || {
let _ = sender.send(());
});
loop {
if materializer.is_shutdown() {
return Err(StreamError::AbruptTermination);
}
if super::runtime::current_stream_cancelled()
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
{
return Err(StreamError::Cancelled);
}
match receiver.recv_timeout(Duration::from_millis(25)) {
Ok(()) => return Ok(()),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
return Err(StreamError::AbruptTermination);
}
}
}
}
fn invoke_factory<T, F>(context: &str, factory: &F) -> StreamResult<T>
where
F: Fn() -> T,
{
catch_unwind(AssertUnwindSafe(factory)).map_err(|_| panic_stream_error(context))
}
pub struct RestartSource;
impl RestartSource {
#[must_use]
pub fn with_backoff<Out, Mat, F>(settings: RestartSettings, factory: F) -> Source<Out>
where
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
{
restart_source(settings, factory, false)
}
#[must_use]
pub fn on_failures_with_backoff<Out, Mat, F>(
settings: RestartSettings,
factory: F,
) -> Source<Out>
where
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
{
restart_source(settings, factory, true)
}
}
fn restart_source<Out, Mat, F>(
settings: RestartSettings,
factory: F,
only_on_failures: bool,
) -> Source<Out>
where
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Source<Out, Mat> + Send + Sync + 'static,
{
let factory = Arc::new(factory);
Source::from_materialized_factory(move |materializer| {
Ok((
Box::new(RestartSourceStream {
factory: Arc::clone(&factory),
materializer: materializer.clone(),
backoff: RestartBackoff::new(settings.clone()),
current: None,
only_on_failures,
terminal: None,
_marker: PhantomData::<Mat>,
}),
NotUsed,
))
})
}
struct RestartSourceStream<Out, Mat, F> {
factory: Arc<F>,
materializer: Materializer,
backoff: RestartBackoff,
current: Option<BoxStream<Out>>,
only_on_failures: bool,
terminal: Option<StreamResult<()>>,
_marker: PhantomData<Mat>,
}
impl<Out, Mat, F> RestartSourceStream<Out, Mat, F>
where
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Source<Out, Mat>,
{
fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
let source = invoke_factory("RestartSource factory", self.factory.as_ref())?;
Arc::clone(&source.factory)
.create(&self.materializer)
.map(|(stream, _)| stream)
}
fn restart_or_finish(
&mut self,
cause: RestartCause,
error: Option<StreamError>,
) -> RestartDecision<Out> {
if self.only_on_failures && cause == RestartCause::Completion {
return RestartDecision::Finish(None);
}
let Some(delay) = self.backoff.next_delay() else {
return RestartDecision::Finish(error.map(Err));
};
if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
return RestartDecision::Finish(Some(Err(wait_error)));
}
RestartDecision::Restart
}
fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
match result {
Some(Err(error)) => {
self.terminal = Some(Err(error.clone()));
Some(Err(error))
}
Some(Ok(_)) | None => {
self.terminal = Some(Ok(()));
None
}
}
}
}
impl<Out, Mat, F> Iterator for RestartSourceStream<Out, Mat, F>
where
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Source<Out, Mat>,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = &self.terminal {
return match terminal {
Ok(()) => None,
Err(error) => Some(Err(error.clone())),
};
}
loop {
if self.current.is_none() {
match self.rematerialize() {
Ok(stream) => self.current = Some(stream),
Err(error) => {
match self.restart_or_finish(RestartCause::Failure, Some(error)) {
RestartDecision::Restart => {}
RestartDecision::Finish(result) => return self.finish(result),
}
continue;
}
}
}
let next = self
.current
.as_mut()
.expect("restart source child exists")
.next();
match next {
Some(Ok(item)) => return Some(Ok(item)),
Some(Err(error)) => {
self.current = None;
match self.restart_or_finish(RestartCause::Failure, Some(error)) {
RestartDecision::Restart => {}
RestartDecision::Finish(result) => return self.finish(result),
}
}
None => {
self.current = None;
match self.restart_or_finish(RestartCause::Completion, None) {
RestartDecision::Restart => {}
RestartDecision::Finish(result) => return self.finish(result),
}
}
}
}
}
}
struct SharedInput<In> {
inner: Arc<Mutex<SharedInputState<In>>>,
}
impl<In> Clone for SharedInput<In> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
struct SharedInputState<In> {
input: Option<BoxStream<In>>,
exhausted: bool,
}
impl<In> SharedInput<In> {
fn new(input: BoxStream<In>) -> Self {
Self {
inner: Arc::new(Mutex::new(SharedInputState {
input: Some(input),
exhausted: false,
})),
}
}
fn stream(&self) -> SharedInputStream<In> {
SharedInputStream {
shared: self.clone(),
}
}
fn is_exhausted(&self) -> bool {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
.exhausted
}
}
struct SharedInputStream<In> {
shared: SharedInput<In>,
}
impl<In> Iterator for SharedInputStream<In> {
type Item = StreamResult<In>;
fn next(&mut self) -> Option<Self::Item> {
let mut input = {
let mut state = self
.shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.exhausted {
return None;
}
state.input.take().expect("shared input present")
};
let next = input.next();
let mut state = self
.shared
.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if next.is_none() {
state.exhausted = true;
}
state.input = Some(input);
next
}
}
pub struct RestartFlow;
impl RestartFlow {
#[must_use]
pub fn with_backoff<In, Out, Mat, F>(settings: RestartSettings, factory: F) -> Flow<In, Out>
where
In: Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
{
restart_flow(settings, factory, false)
}
#[must_use]
pub fn on_failures_with_backoff<In, Out, Mat, F>(
settings: RestartSettings,
factory: F,
) -> Flow<In, Out>
where
In: Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
{
restart_flow(settings, factory, true)
}
}
fn restart_flow<In, Out, Mat, F>(
settings: RestartSettings,
factory: F,
only_on_failures: bool,
) -> Flow<In, Out>
where
In: Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Flow<In, Out, Mat> + Send + Sync + 'static,
{
let factory = Arc::new(factory);
Flow::from_runtime_transform(move |input, materializer| {
let shared = SharedInput::new(input);
Ok(Box::new(RestartFlowStream {
factory: Arc::clone(&factory),
materializer: materializer.clone(),
shared,
backoff: RestartBackoff::new(settings.clone()),
current: None,
only_on_failures,
terminal: None,
_marker: PhantomData::<Mat>,
}))
})
}
struct RestartFlowStream<In, Out, Mat, F> {
factory: Arc<F>,
materializer: Materializer,
shared: SharedInput<In>,
backoff: RestartBackoff,
current: Option<BoxStream<Out>>,
only_on_failures: bool,
terminal: Option<StreamResult<()>>,
_marker: PhantomData<Mat>,
}
impl<In, Out, Mat, F> RestartFlowStream<In, Out, Mat, F>
where
In: Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Flow<In, Out, Mat>,
{
fn rematerialize(&self) -> StreamResult<BoxStream<Out>> {
let flow = invoke_factory("RestartFlow factory", self.factory.as_ref())?;
(flow.materialize)()?;
let input = Box::new(self.shared.stream()) as BoxStream<In>;
match flow.transform {
FlowTransform::Pure(transform) => Ok(transform(input)),
FlowTransform::Runtime(transform) => transform(input, &self.materializer),
}
}
fn restart_or_finish(
&mut self,
cause: RestartCause,
error: Option<StreamError>,
) -> RestartDecision<Out> {
if self.shared.is_exhausted() && cause == RestartCause::Completion {
return RestartDecision::Finish(None);
}
if self.only_on_failures && cause == RestartCause::Completion {
return RestartDecision::Finish(None);
}
let Some(delay) = self.backoff.next_delay() else {
return RestartDecision::Finish(error.map(Err));
};
if let Err(wait_error) = wait_backoff(&self.materializer, delay) {
return RestartDecision::Finish(Some(Err(wait_error)));
}
RestartDecision::Restart
}
fn finish(&mut self, result: Option<StreamResult<Out>>) -> Option<StreamResult<Out>> {
match result {
Some(Err(error)) => {
self.terminal = Some(Err(error.clone()));
Some(Err(error))
}
Some(Ok(_)) | None => {
self.terminal = Some(Ok(()));
None
}
}
}
}
impl<In, Out, Mat, F> Iterator for RestartFlowStream<In, Out, Mat, F>
where
In: Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
F: Fn() -> Flow<In, Out, Mat>,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = &self.terminal {
return match terminal {
Ok(()) => None,
Err(error) => Some(Err(error.clone())),
};
}
loop {
if self.current.is_none() {
if self.shared.is_exhausted() {
return None;
}
match self.rematerialize() {
Ok(stream) => self.current = Some(stream),
Err(error) => {
match self.restart_or_finish(RestartCause::Failure, Some(error)) {
RestartDecision::Restart => {}
RestartDecision::Finish(result) => return self.finish(result),
}
continue;
}
}
}
let next = self
.current
.as_mut()
.expect("restart flow child exists")
.next();
match next {
Some(Ok(item)) => return Some(Ok(item)),
Some(Err(error)) => {
self.current = None;
match self.restart_or_finish(RestartCause::Failure, Some(error)) {
RestartDecision::Restart => {}
RestartDecision::Finish(result) => return self.finish(result),
}
}
None => {
self.current = None;
match self.restart_or_finish(RestartCause::Completion, None) {
RestartDecision::Restart => {}
RestartDecision::Finish(result) => return self.finish(result),
}
}
}
}
}
}
pub struct RestartSink;
impl RestartSink {
#[must_use]
pub fn with_backoff<In, F>(
settings: RestartSettings,
factory: F,
) -> Sink<In, StreamCompletion<NotUsed>>
where
In: Send + 'static,
F: Fn() -> Sink<In, StreamCompletion<NotUsed>> + Send + Sync + 'static,
{
let factory = Arc::new(factory);
Sink::from_runner(move |input, materializer| {
let materializer = materializer.clone();
let factory = Arc::clone(&factory);
let settings = settings.clone();
let state = Arc::clone(&materializer.inner.state);
Ok(materializer.clone().spawn_stream(move |cancelled| {
let checked = runtime_checked_stream(input, state, Some(cancelled));
let shared = SharedInput::new(Box::new(checked) as BoxStream<In>);
let mut backoff = RestartBackoff::new(settings.clone());
loop {
if shared.is_exhausted() {
return Ok(NotUsed);
}
let sink = invoke_factory("RestartSink factory", factory.as_ref())?;
let completion = sink.run(Box::new(shared.stream()), &materializer)?;
match completion.wait() {
Ok(_) if shared.is_exhausted() => return Ok(NotUsed),
Ok(_) => {
let Some(delay) = backoff.next_delay() else {
return Ok(NotUsed);
};
wait_backoff(&materializer, delay)?;
}
Err(error) => {
let Some(delay) = backoff.next_delay() else {
return Err(error);
};
wait_backoff(&materializer, delay)?;
}
}
}
}))
})
}
}
pub struct RetryFlow;
type RetryDecider<In, Out> = Arc<dyn Fn(&In, &Out) -> Option<In> + Send + Sync>;
impl RetryFlow {
#[must_use]
pub fn with_backoff<In, Out, Mat, Decide>(
min_backoff: Duration,
max_backoff: Duration,
random_factor: f64,
max_retries: usize,
flow: Flow<In, Out, Mat>,
decide_retry: Decide,
) -> Flow<In, Out>
where
In: Clone + Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
{
retry_flow(
min_backoff,
max_backoff,
random_factor,
max_retries,
flow,
Arc::new(decide_retry),
)
}
#[must_use]
pub fn with_backoff_and_context<In, CtxIn, Out, CtxOut, Mat, Decide>(
min_backoff: Duration,
max_backoff: Duration,
random_factor: f64,
max_retries: usize,
flow: FlowWithContext<In, CtxIn, Out, CtxOut, Mat>,
decide_retry: Decide,
) -> FlowWithContext<In, CtxIn, Out, CtxOut>
where
In: Clone + Send + 'static,
CtxIn: Clone + Send + 'static,
Out: Send + 'static,
CtxOut: Send + 'static,
Mat: Send + 'static,
Decide: Fn(&In, &Out) -> Option<In> + Send + Sync + 'static,
{
let decide_retry = Arc::new(decide_retry);
let delegate = retry_flow(
min_backoff,
max_backoff,
random_factor,
max_retries,
flow.as_flow(),
Arc::new(move |input: &(In, CtxIn), output: &(Out, CtxOut)| {
decide_retry(&input.0, &output.0).map(|next| (next, input.1.clone()))
}),
);
FlowWithContext::from_flow(delegate)
}
}
fn retry_flow<In, Out, Mat>(
min_backoff: Duration,
max_backoff: Duration,
random_factor: f64,
max_retries: usize,
flow: Flow<In, Out, Mat>,
decide_retry: RetryDecider<In, Out>,
) -> Flow<In, Out>
where
In: Clone + Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
{
let settings = RestartSettings::new(min_backoff, max_backoff, random_factor)
.with_max_restarts(max_retries, max_backoff.max(min_backoff));
Flow::from_runtime_transform(move |input, materializer| {
Ok(Box::new(RetryFlowStream {
input,
materializer: materializer.clone(),
flow: flow.clone(),
decide_retry: Arc::clone(&decide_retry),
settings: settings.clone(),
}))
})
}
struct RetryFlowStream<In, Out, Mat> {
input: BoxStream<In>,
materializer: Materializer,
flow: Flow<In, Out, Mat>,
decide_retry: RetryDecider<In, Out>,
settings: RestartSettings,
}
impl<In, Out, Mat> RetryFlowStream<In, Out, Mat>
where
In: Clone + Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
{
fn run_once(&self, item: In) -> StreamResult<Out> {
(self.flow.materialize)()?;
let input = Box::new(std::iter::once(Ok(item))) as BoxStream<In>;
let mut stream = match &self.flow.transform {
FlowTransform::Pure(transform) => transform(input),
FlowTransform::Runtime(transform) => transform(input, &self.materializer)?,
};
let output = match stream.next() {
Some(Ok(output)) => output,
Some(Err(error)) => return Err(error),
None => {
return Err(StreamError::Failed(
"RetryFlow inner flow produced no output".to_owned(),
));
}
};
if stream.next().is_some() {
return Err(StreamError::Failed(
"RetryFlow inner flow produced more than one output".to_owned(),
));
}
Ok(output)
}
}
impl<In, Out, Mat> Iterator for RetryFlowStream<In, Out, Mat>
where
In: Clone + Send + 'static,
Out: Send + 'static,
Mat: Send + 'static,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
let original = match self.input.next()? {
Ok(item) => item,
Err(error) => return Some(Err(error)),
};
let mut current = original.clone();
let mut retries = 0_usize;
let mut backoff = RestartBackoff::new(self.settings.clone());
loop {
let output = match self.run_once(current.clone()) {
Ok(output) => output,
Err(error) => return Some(Err(error)),
};
let retry =
match catch_unwind(AssertUnwindSafe(|| (self.decide_retry)(&original, &output))) {
Ok(retry) => retry,
Err(_) => return Some(Err(panic_stream_error("RetryFlow decider"))),
};
let Some(next_input) = retry else {
return Some(Ok(output));
};
if retries >= self.settings.max_restarts {
return Some(Ok(output));
}
let delay = backoff.next_delay().unwrap_or(self.settings.max_backoff);
if let Err(error) = wait_backoff(&self.materializer, delay) {
return Some(Err(error));
}
retries += 1;
current = next_input;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
fn boom() -> StreamError {
StreamError::Failed("boom".to_owned())
}
#[test]
fn supervised_map_result_stops_by_default_and_resumes_when_requested() {
let stopped = Source::from_iter([1, 2, 3, 4])
.map_result(|item| if item == 3 { Err(boom()) } else { Ok(item) })
.run_collect();
assert_eq!(stopped, Err(boom()));
let resumed = Source::from_iter([1, 2, 3, 4])
.map_result_with_supervision(
|item| if item == 3 { Err(boom()) } else { Ok(item) },
Supervision::resuming_decider(),
)
.run_collect()
.unwrap();
assert_eq!(resumed, vec![1, 2, 4]);
}
#[test]
fn supervised_scan_restart_resets_state_and_reemits_seed() {
let resumed = Source::from_iter([1, 3, -1, 5, 7])
.scan_result_with_supervision(
0,
|acc, item| {
if item < 0 {
Err(boom())
} else {
Ok(acc + item)
}
},
Supervision::resuming_decider(),
)
.run_collect()
.unwrap();
assert_eq!(resumed, vec![0, 1, 4, 9, 16]);
let restarted = Source::from_iter([1, 3, -1, 5, 7])
.scan_result_with_supervision(
0,
|acc, item| {
if item < 0 {
Err(boom())
} else {
Ok(acc + item)
}
},
Supervision::restarting_decider(),
)
.run_collect()
.unwrap();
assert_eq!(restarted, vec![0, 1, 4, 0, 5, 12]);
}
#[test]
fn supervised_fold_restart_resets_accumulator() {
let resumed = Source::from_iter(1..=5)
.run_with(Sink::fold_result_with_supervision(
0,
|acc, item| {
if item == 3 {
Err(boom())
} else {
Ok(acc + item)
}
},
Supervision::resuming_decider(),
))
.unwrap()
.wait()
.unwrap();
assert_eq!(resumed, 12);
let restarted = Source::from_iter(1..=5)
.run_with(Sink::fold_result_with_supervision(
0,
|acc, item| {
if item == 3 {
Err(boom())
} else {
Ok(acc + item)
}
},
Supervision::restarting_decider(),
))
.unwrap()
.wait()
.unwrap();
assert_eq!(restarted, 9);
}
#[test]
fn supervised_map_async_drops_failed_future_once() {
let decisions = Arc::new(AtomicUsize::new(0));
let decider = {
let decisions = Arc::clone(&decisions);
Arc::new(move |_: &StreamError| {
decisions.fetch_add(1, Ordering::SeqCst);
SupervisionDirective::Resume
}) as SupervisionDecider
};
let collected = Source::from_iter([1, 2, 3, 4])
.map_async_with_supervision(
2,
|item| async move { if item == 3 { Err(boom()) } else { Ok(item) } },
decider,
)
.run_collect()
.unwrap();
assert_eq!(collected, vec![1, 2, 4]);
assert_eq!(decisions.load(Ordering::SeqCst), 1);
}
#[test]
fn restart_source_restarts_on_completion_until_cap() {
let attempts = Arc::new(AtomicUsize::new(0));
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(2, Duration::from_secs(1));
let source = RestartSource::with_backoff(settings, {
let attempts = Arc::clone(&attempts);
move || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Source::single(attempt)
}
});
let values = source.run_collect().unwrap();
assert_eq!(values, vec![0, 1, 2]);
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[test]
fn restart_source_on_failures_does_not_restart_on_completion() {
let attempts = Arc::new(AtomicUsize::new(0));
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(3, Duration::from_secs(1));
let source = RestartSource::on_failures_with_backoff(settings, {
let attempts = Arc::clone(&attempts);
move || {
if attempts.fetch_add(1, Ordering::SeqCst) == 0 {
Source::failed(boom())
} else {
Source::from_iter([7, 8])
}
}
});
let values = source.run_collect().unwrap();
assert_eq!(values, vec![7, 8]);
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[test]
fn restart_source_cap_resets_after_within_window() {
let attempts = Arc::new(AtomicUsize::new(0));
let settings =
RestartSettings::new(Duration::from_millis(8), Duration::from_millis(8), 0.0)
.with_max_restarts(1, Duration::from_millis(1));
let source = RestartSource::on_failures_with_backoff(settings, {
let attempts = Arc::clone(&attempts);
move || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
if attempt < 2 {
Source::failed(boom())
} else {
Source::single(42)
}
}
});
let started = Instant::now();
let values = source.run_collect().unwrap();
assert_eq!(values, vec![42]);
assert!(started.elapsed() >= Duration::from_millis(8));
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[test]
fn restart_flow_drops_failed_in_flight_element_and_continues() {
let settings = RestartSettings::new(Duration::ZERO, Duration::ZERO, 0.0)
.with_max_restarts(1, Duration::from_secs(1));
let values = Source::from_iter([1, 2, 3, 4, 5])
.via(RestartFlow::on_failures_with_backoff(settings, || {
Flow::identity().map_result(|item| if item == 3 { Err(boom()) } else { Ok(item) })
}))
.run_collect()
.unwrap();
assert_eq!(values, vec![1, 2, 4, 5]);
}
#[test]
fn retry_flow_retries_with_backoff_then_emits_last_output() {
let flow = Flow::identity().map(|item: i32| item / 2);
let values = Source::from_iter([5, 1])
.via(RetryFlow::with_backoff(
Duration::ZERO,
Duration::ZERO,
0.0,
3,
flow,
|_, output| (*output > 0).then_some(*output),
))
.run_collect()
.unwrap();
assert_eq!(values, vec![0, 0]);
}
}