progenitor_extras/retry.rs
1//! Retry logic for operations against Progenitor-generated API clients.
2//!
3//! The primary entry points are:
4//!
5//! * [`retry_operation`] and [`retry_operation_indefinitely`], for retries; and
6//! * [`retry_operation_while`] and [`retry_operation_while_indefinitely`], for
7//! retries with an additional "gone check" that can abort the loop when the
8//! target is permanently unavailable.
9//!
10//! The `_indefinitely` variants never produce a "retries exhausted" error and
11//! will retry transient failures forever (or until the gone check aborts).
12//!
13//! Retry uses a backoff policy via the [`backon`] crate. Call
14//! [`default_retry_policy`] or [`default_indefinite_retry_policy`] for
15//! reasonable defaults, or construct your own [`backon::BackoffBuilder`] for
16//! custom behavior.
17//!
18//! Note that the retry operations currently assume a Tokio backend, matching
19//! Progenitor.
20
21use backon::{BackoffBuilder, ExponentialBuilder};
22use std::{
23 convert::Infallible, error::Error, fmt, future::Future, time::Duration,
24};
25
26/// Result of a gone check passed to [`retry_operation_while`] and
27/// [`retry_operation_while_indefinitely`].
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum GoneCheckResult {
30 /// The target is still available; continue retrying.
31 StillAvailable,
32
33 /// The target is permanently gone; abort the retry loop.
34 Gone,
35}
36
37/// Error produced by [`retry_operation`].
38#[derive(Debug)]
39pub struct RetryOperationError<E> {
40 /// One-indexed attempt number at which the error occurred.
41 pub attempt: usize,
42 /// The kind of error.
43 pub kind: RetryOperationErrorKind<E>,
44}
45
46/// The kind of error in a [`RetryOperationError`].
47#[derive(Debug)]
48pub enum RetryOperationErrorKind<E> {
49 /// The operation failed with a non-retryable error.
50 OperationError(progenitor_client::Error<E>),
51
52 /// All retry attempts were exhausted without success.
53 ///
54 /// The contained error is the last retryable error encountered.
55 RetriesExhausted(progenitor_client::Error<E>),
56}
57
58impl<E> fmt::Display for RetryOperationErrorKind<E> {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 match self {
61 Self::OperationError(_) => {
62 f.write_str("progenitor API operation failed")
63 }
64 Self::RetriesExhausted(_) => f.write_str("retries exhausted"),
65 }
66 }
67}
68
69impl<E> fmt::Display for RetryOperationError<E> {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 write!(f, "failed at attempt {}: ", self.attempt)?;
72 self.kind.fmt(f)
73 }
74}
75
76impl<E> Error for RetryOperationError<E>
77where
78 E: fmt::Debug + 'static,
79{
80 fn source(&self) -> Option<&(dyn Error + 'static)> {
81 match &self.kind {
82 RetryOperationErrorKind::OperationError(e)
83 | RetryOperationErrorKind::RetriesExhausted(e) => Some(e),
84 }
85 }
86}
87
88impl<E> RetryOperationError<E> {
89 /// Returns true if the underlying operation error is a 404 Not Found.
90 pub fn is_not_found(&self) -> bool {
91 match &self.kind {
92 // In practice, 404 is not retryable, so this will only match
93 // OperationError. But something outside of this crate can
94 // artificially construct a RetriesExhausted with a 404, so match
95 // against that as well.
96 RetryOperationErrorKind::OperationError(e)
97 | RetryOperationErrorKind::RetriesExhausted(e) => {
98 e.status() == Some(http::StatusCode::NOT_FOUND)
99 }
100 }
101 }
102}
103
104/// Error produced by [`retry_operation_while`].
105#[derive(Debug)]
106pub struct RetryOperationWhileError<E, GoneErr = Infallible> {
107 /// One-indexed attempt number at which the error occurred.
108 pub attempt: usize,
109 /// The kind of error.
110 pub kind: RetryOperationWhileErrorKind<E, GoneErr>,
111}
112
113/// The kind of error in a [`RetryOperationWhileError`].
114#[derive(Debug)]
115pub enum RetryOperationWhileErrorKind<E, GoneErr = Infallible> {
116 /// The gone check indicated that the remote server is permanently
117 /// unavailable.
118 Gone,
119
120 /// The gone check itself failed.
121 GoneCheckError(GoneErr),
122
123 /// The operation failed with a non-retryable error.
124 OperationError(progenitor_client::Error<E>),
125
126 /// All retry attempts were exhausted without success.
127 ///
128 /// The contained error is the last retryable error encountered.
129 RetriesExhausted(progenitor_client::Error<E>),
130}
131
132impl<E, GoneErr> fmt::Display for RetryOperationWhileErrorKind<E, GoneErr> {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 match self {
135 Self::Gone => f.write_str("remote server is gone"),
136 Self::GoneCheckError(_) => {
137 f.write_str("failed to determine whether remote server is gone")
138 }
139 Self::OperationError(_) => {
140 f.write_str("progenitor API operation failed")
141 }
142 Self::RetriesExhausted(_) => f.write_str("retries exhausted"),
143 }
144 }
145}
146
147impl<E, GoneErr> fmt::Display for RetryOperationWhileError<E, GoneErr> {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 write!(f, "failed at attempt {}: ", self.attempt)?;
150 self.kind.fmt(f)
151 }
152}
153
154impl<E, GoneErr> Error for RetryOperationWhileError<E, GoneErr>
155where
156 E: fmt::Debug + 'static,
157 GoneErr: Error + 'static,
158{
159 fn source(&self) -> Option<&(dyn Error + 'static)> {
160 match &self.kind {
161 RetryOperationWhileErrorKind::Gone => None,
162 RetryOperationWhileErrorKind::GoneCheckError(e) => Some(e),
163 RetryOperationWhileErrorKind::OperationError(e)
164 | RetryOperationWhileErrorKind::RetriesExhausted(e) => Some(e),
165 }
166 }
167}
168
169impl<E, GoneErr> RetryOperationWhileError<E, GoneErr> {
170 /// Returns true if the underlying operation error is a 404 Not Found.
171 pub fn is_not_found(&self) -> bool {
172 match &self.kind {
173 // In practice, 404 is not retryable, so this will only match
174 // OperationError. But something outside of this crate can
175 // artificially construct a RetriesExhausted with a 404, so match
176 // against that as well.
177 RetryOperationWhileErrorKind::OperationError(e)
178 | RetryOperationWhileErrorKind::RetriesExhausted(e) => {
179 e.status() == Some(http::StatusCode::NOT_FOUND)
180 }
181 RetryOperationWhileErrorKind::Gone
182 | RetryOperationWhileErrorKind::GoneCheckError(_) => false,
183 }
184 }
185
186 /// Returns `true` if the remote server is gone.
187 pub fn is_gone(&self) -> bool {
188 match &self.kind {
189 RetryOperationWhileErrorKind::Gone => true,
190 RetryOperationWhileErrorKind::GoneCheckError(_)
191 | RetryOperationWhileErrorKind::OperationError(_)
192 | RetryOperationWhileErrorKind::RetriesExhausted(_) => false,
193 }
194 }
195}
196
197/// Error produced by [`retry_operation_indefinitely`].
198///
199/// Unlike [`RetryOperationError`], this error has no `RetriesExhausted`
200/// variant: the indefinite retry function retries transient failures forever,
201/// so the only way it can fail is with a non-retryable operation error.
202#[derive(Debug)]
203pub struct IndefiniteRetryOperationError<E> {
204 /// One-indexed attempt number at which the error occurred.
205 pub attempt: usize,
206 /// The non-retryable operation error.
207 pub error: progenitor_client::Error<E>,
208}
209
210impl<E> fmt::Display for IndefiniteRetryOperationError<E> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 write!(
213 f,
214 "failed at attempt {}: progenitor API operation failed",
215 self.attempt,
216 )
217 }
218}
219
220impl<E> Error for IndefiniteRetryOperationError<E>
221where
222 E: fmt::Debug + 'static,
223{
224 fn source(&self) -> Option<&(dyn Error + 'static)> {
225 Some(&self.error)
226 }
227}
228
229impl<E> IndefiniteRetryOperationError<E> {
230 /// Returns true if the underlying operation error is a 404 Not Found.
231 pub fn is_not_found(&self) -> bool {
232 self.error.status() == Some(http::StatusCode::NOT_FOUND)
233 }
234}
235
236/// Error produced by [`retry_operation_while_indefinitely`].
237#[derive(Debug)]
238pub struct IndefiniteRetryOperationWhileError<E, GoneErr = Infallible> {
239 /// One-indexed attempt number at which the error occurred.
240 pub attempt: usize,
241 /// The kind of error.
242 pub kind: IndefiniteRetryOperationWhileErrorKind<E, GoneErr>,
243}
244
245/// The kind of error in an [`IndefiniteRetryOperationWhileError`].
246#[derive(Debug)]
247pub enum IndefiniteRetryOperationWhileErrorKind<E, GoneErr = Infallible> {
248 /// The gone check indicated that the remote server is permanently
249 /// unavailable.
250 Gone,
251
252 /// The gone check itself failed.
253 GoneCheckError(GoneErr),
254
255 /// The operation failed with a non-retryable error.
256 OperationError(progenitor_client::Error<E>),
257}
258
259impl<E, GoneErr> fmt::Display
260 for IndefiniteRetryOperationWhileErrorKind<E, GoneErr>
261{
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 match self {
264 Self::Gone => f.write_str("remote server is gone"),
265 Self::GoneCheckError(_) => {
266 f.write_str("failed to determine whether remote server is gone")
267 }
268 Self::OperationError(_) => {
269 f.write_str("progenitor API operation failed")
270 }
271 }
272 }
273}
274
275impl<E, GoneErr> fmt::Display
276 for IndefiniteRetryOperationWhileError<E, GoneErr>
277{
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 write!(f, "failed at attempt {}: ", self.attempt)?;
280 self.kind.fmt(f)
281 }
282}
283
284impl<E, GoneErr> Error for IndefiniteRetryOperationWhileError<E, GoneErr>
285where
286 E: fmt::Debug + 'static,
287 GoneErr: Error + 'static,
288{
289 fn source(&self) -> Option<&(dyn Error + 'static)> {
290 match &self.kind {
291 IndefiniteRetryOperationWhileErrorKind::Gone => None,
292 IndefiniteRetryOperationWhileErrorKind::GoneCheckError(e) => {
293 Some(e)
294 }
295 IndefiniteRetryOperationWhileErrorKind::OperationError(e) => {
296 Some(e)
297 }
298 }
299 }
300}
301
302impl<E, GoneErr> IndefiniteRetryOperationWhileError<E, GoneErr> {
303 /// Returns true if the underlying operation error is a 404 Not Found.
304 pub fn is_not_found(&self) -> bool {
305 match &self.kind {
306 IndefiniteRetryOperationWhileErrorKind::OperationError(e) => {
307 e.status() == Some(http::StatusCode::NOT_FOUND)
308 }
309 IndefiniteRetryOperationWhileErrorKind::Gone
310 | IndefiniteRetryOperationWhileErrorKind::GoneCheckError(_) => {
311 false
312 }
313 }
314 }
315
316 /// Returns `true` if the remote server is gone.
317 pub fn is_gone(&self) -> bool {
318 match &self.kind {
319 IndefiniteRetryOperationWhileErrorKind::Gone => true,
320 IndefiniteRetryOperationWhileErrorKind::GoneCheckError(_)
321 | IndefiniteRetryOperationWhileErrorKind::OperationError(_) => {
322 false
323 }
324 }
325 }
326}
327
328/// Returns a reasonable default retry policy.
329///
330/// This policy is an exponential backoff that sets:
331///
332/// * the initial retry interval to ~250ms (mean, with jitter)
333/// * the maximum interval to 3 minutes
334/// * a backoff multiplier of 2.0
335/// * up to 13 retries
336/// * with jitter enabled
337///
338/// The base delay is set to 167ms rather than 250ms to compensate for
339/// `backon`'s additive jitter, which distributes each delay `d` over
340/// `[d, 2d)` (mean = 1.5d). With a 167ms base, the mean first retry
341/// delay is ~250ms.
342pub fn default_retry_policy() -> ExponentialBuilder {
343 ExponentialBuilder::default()
344 .with_factor(2.0)
345 .with_min_delay(Duration::from_millis(167))
346 .with_max_delay(Duration::from_secs(60 * 3))
347 .with_max_times(13)
348 .with_jitter()
349}
350
351/// Exponential backoff parameters for the indefinite retry functions.
352///
353/// All fields must be specified explicitly. Use
354/// [`default_indefinite_retry_policy`] for reasonable defaults.
355#[derive(Clone, Copy, Debug)]
356pub struct IndefiniteBackoffParams {
357 /// Multiplier applied to the delay after each retry.
358 pub factor: f32,
359 /// The minimum (initial) delay between retries.
360 pub min_delay: Duration,
361 /// The maximum delay between retries.
362 pub max_delay: Duration,
363 /// Whether to add random jitter to each delay.
364 ///
365 /// When enabled, `backon` distributes each computed delay `d` uniformly
366 /// over `[d, 2d)`. Account for this when choosing `min_delay` if you
367 /// need a specific mean initial interval.
368 pub jitter: bool,
369}
370
371impl IndefiniteBackoffParams {
372 /// Builds an infinite delay iterator from these parameters.
373 fn build(self) -> impl Iterator<Item = Duration> {
374 let mut builder = ExponentialBuilder::default()
375 .with_factor(self.factor)
376 .with_min_delay(self.min_delay)
377 .with_max_delay(self.max_delay)
378 // backon 1.6.0 sets a max times of 3.
379 .without_max_times()
380 // backon 1.6.0 does not set a max total delay, but set one
381 // explicitly anyway.
382 .with_total_delay(None);
383
384 if self.jitter {
385 builder = builder.with_jitter();
386 }
387 builder.build()
388 }
389}
390
391/// Returns a reasonable default indefinite retry policy.
392///
393/// This policy is an exponential backoff that sets:
394///
395/// * the initial retry interval to ~250ms (mean, with jitter)
396/// * the maximum interval to 3 minutes
397/// * a backoff multiplier of 2.0
398/// * with jitter enabled
399///
400/// The base delay is set to 167ms rather than 250ms to compensate for
401/// `backon`'s additive jitter, which distributes each delay `d` over
402/// `[d, 2d)` (mean = 1.5d). With a 167ms base, the mean first retry
403/// delay is ~250ms.
404pub fn default_indefinite_retry_policy() -> IndefiniteBackoffParams {
405 IndefiniteBackoffParams {
406 factor: 2.0,
407 min_delay: Duration::from_millis(167),
408 max_delay: Duration::from_secs(60 * 3),
409 jitter: true,
410 }
411}
412
413/// Data passed into notify functions.
414#[derive(Debug)]
415#[non_exhaustive]
416pub struct RetryNotification<E> {
417 /// One-indexed attempt number. The first transient failure produces
418 /// `attempt = 1`, the second produces `attempt = 2`, and so on.
419 ///
420 /// For finite retry functions ([`retry_operation`],
421 /// [`retry_operation_while`]), the notify function is not called when
422 /// retries are exhausted. Instead, an error is returned.
423 pub attempt: usize,
424 /// The retryable error that caused this retry. This error is always
425 /// retryable (i.e., `error.is_retryable()` returns `true`).
426 pub error: progenitor_client::Error<E>,
427 /// The delay before the next attempt.
428 pub delay: Duration,
429}
430
431/// Retries a progenitor client operation until it succeeds or fails
432/// permanently.
433///
434/// Transient (retryable) errors are retried according to the supplied backoff
435/// policy. All other errors are returned immediately as
436/// [`RetryOperationErrorKind::OperationError`].
437///
438/// If all retries are exhausted, the last transient error is returned as
439/// [`RetryOperationErrorKind::RetriesExhausted`].
440///
441/// `notify` is called on each transient failure with the error and the delay
442/// before the next attempt. It is not called when retries are exhausted;
443/// the terminal failure is communicated through the
444/// [`RetriesExhausted`](RetryOperationErrorKind::RetriesExhausted) return variant.
445///
446/// The `operation` must be idempotent.
447///
448/// # Examples
449///
450/// ```
451/// # #[tokio::main(flavor = "current_thread")]
452/// # async fn main() {
453/// use progenitor_extras::retry::{default_retry_policy, retry_operation};
454///
455/// // In practice, replace the closure body with a progenitor client
456/// // call, e.g. `|| async { client.some_endpoint().send().await }`.
457/// let result = retry_operation(
458/// default_retry_policy(),
459/// || async { Ok::<_, progenitor_client::Error<()>>(42u32) },
460/// |notification| {
461/// eprintln!(
462/// "transient error ({:?}), retrying in {:?}",
463/// notification.error, notification.delay,
464/// );
465/// },
466/// )
467/// .await;
468///
469/// assert_eq!(result.unwrap(), 42);
470/// # }
471/// ```
472pub async fn retry_operation<T, E, B, N, F, Fut>(
473 backoff: B,
474 mut operation: F,
475 mut notify: N,
476) -> Result<T, RetryOperationError<E>>
477where
478 B: BackoffBuilder,
479 N: FnMut(RetryNotification<E>),
480 F: FnMut() -> Fut,
481 Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
482{
483 // This function implements its own retry loop rather than delegating to
484 // backon's `Retryable` trait so that the notify function can be called with
485 // an owned error.
486 let mut delays = backoff.build();
487 let mut attempt = 1;
488
489 loop {
490 match (operation)().await {
491 Ok(v) => return Ok(v),
492 Err(error) => {
493 if !error.is_retryable() {
494 return Err(RetryOperationError {
495 attempt,
496 kind: RetryOperationErrorKind::OperationError(error),
497 });
498 }
499 match delays.next() {
500 Some(delay) => {
501 notify(RetryNotification { attempt, error, delay });
502 tokio::time::sleep(delay).await;
503 attempt += 1;
504 }
505 None => {
506 return Err(RetryOperationError {
507 attempt,
508 kind: RetryOperationErrorKind::RetriesExhausted(
509 error,
510 ),
511 });
512 }
513 }
514 }
515 }
516 }
517}
518
519/// Retries a progenitor client operation with an additional "gone check."
520///
521/// This function is intended for service mesh-type scenarios, where a
522/// service being gone is determined independently of the operation itself.
523///
524/// Before each attempt, `gone_check` is called. If it returns
525/// `Ok(GoneCheckResult::Gone)`, the loop is aborted with
526/// [`RetryOperationWhileErrorKind::Gone`]. If the gone check itself fails, the
527/// loop is aborted with [`RetryOperationWhileErrorKind::GoneCheckError`].
528///
529/// Transient errors (as classified by
530/// [`progenitor_client::Error::is_retryable`]) are retried according to the
531/// supplied `backoff` policy. Non-retryable errors are returned as
532/// [`RetryOperationWhileErrorKind::OperationError`]. If all retries are
533/// exhausted, the last transient error is returned as
534/// [`RetryOperationWhileErrorKind::RetriesExhausted`].
535///
536/// Gone-check errors ([`RetryOperationWhileErrorKind::GoneCheckError`]) are
537/// treated as permanent and abort the loop immediately. If the gone check
538/// itself can fail transiently, handle retries within the `gone_check`
539/// closure.
540///
541/// `notify` is called on each transient failure with the error and the
542/// delay before the next attempt. It is not called when retries are
543/// exhausted; the terminal failure is communicated through the
544/// [`RetryOperationWhileErrorKind::RetriesExhausted`] return variant.
545///
546/// The `operation` must be idempotent.
547///
548/// # Examples
549///
550/// ```
551/// # #[tokio::main(flavor = "current_thread")]
552/// # async fn main() {
553/// use progenitor_extras::retry::{
554/// GoneCheckResult, default_retry_policy, retry_operation_while,
555/// };
556///
557/// // In practice, replace these closure bodies with real client calls
558/// // and a real gone check (e.g. querying whether a sled is in service).
559/// let result = retry_operation_while(
560/// default_retry_policy(),
561/// || async { Ok::<_, progenitor_client::Error<()>>(42u32) },
562/// || async {
563/// Ok::<_, std::convert::Infallible>(GoneCheckResult::StillAvailable)
564/// },
565/// |notification| {
566/// eprintln!(
567/// "transient error ({:?}), retrying in {:?}",
568/// notification.error, notification.delay,
569/// );
570/// },
571/// )
572/// .await;
573///
574/// assert_eq!(result.unwrap(), 42);
575/// # }
576/// ```
577///
578/// The gone check can abort the loop early:
579///
580/// ```
581/// # #[tokio::main(flavor = "current_thread")]
582/// # async fn main() {
583/// use progenitor_extras::retry::{
584/// GoneCheckResult, RetryOperationWhileError, default_retry_policy,
585/// retry_operation_while,
586/// };
587///
588/// let result: Result<(), RetryOperationWhileError<()>> =
589/// retry_operation_while(
590/// default_retry_policy(),
591/// || async { Ok::<_, progenitor_client::Error<()>>(()) },
592/// // Target is gone; abort immediately.
593/// || async {
594/// Ok::<_, std::convert::Infallible>(GoneCheckResult::Gone)
595/// },
596/// |_notification| {},
597/// )
598/// .await;
599///
600/// assert!(result.unwrap_err().is_gone());
601/// # }
602/// ```
603pub async fn retry_operation_while<T, E, GoneErr, B, N, F, Fut, GF, GFut>(
604 backoff: B,
605 mut operation: F,
606 mut gone_check: GF,
607 mut notify: N,
608) -> Result<T, RetryOperationWhileError<E, GoneErr>>
609where
610 B: BackoffBuilder,
611 N: FnMut(RetryNotification<E>),
612 F: FnMut() -> Fut,
613 Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
614 GF: FnMut() -> GFut,
615 GFut: Future<Output = Result<GoneCheckResult, GoneErr>>,
616{
617 // This function implements its own retry loop rather than delegating
618 // to backon's `Retryable` trait so that:
619 //
620 // * the gone check can be interleaved before each operation attempt.
621 // * the notify function can be called with an owned error.
622 let mut delays = backoff.build();
623
624 let mut attempt = 1;
625 loop {
626 // Check if the target is still available before attempting
627 // the operation.
628 //
629 // An interesting question is: in this loop, should `gone_check` be
630 // called before or after `operation`? There is an inherent TOCTTOU race
631 // between `gone_check` and `operation`, and both before and after are
632 // defensible. We call `gone_check` before `operation` to maintain
633 // parity with Omicron from which this code was adapted, but we may want
634 // to change this in the future.
635 match (gone_check)().await {
636 Ok(GoneCheckResult::Gone) => {
637 return Err(RetryOperationWhileError {
638 attempt,
639 kind: RetryOperationWhileErrorKind::Gone,
640 });
641 }
642 Ok(GoneCheckResult::StillAvailable) => {}
643 Err(e) => {
644 return Err(RetryOperationWhileError {
645 attempt,
646 kind: RetryOperationWhileErrorKind::GoneCheckError(e),
647 });
648 }
649 }
650
651 match (operation)().await {
652 Ok(v) => return Ok(v),
653 Err(error) => {
654 if !error.is_retryable() {
655 return Err(RetryOperationWhileError {
656 attempt,
657 kind: RetryOperationWhileErrorKind::OperationError(
658 error,
659 ),
660 });
661 }
662 match delays.next() {
663 Some(delay) => {
664 notify(RetryNotification { attempt, error, delay });
665 tokio::time::sleep(delay).await;
666 attempt += 1;
667 }
668 None => {
669 return Err(RetryOperationWhileError {
670 attempt,
671 kind:
672 RetryOperationWhileErrorKind::RetriesExhausted(
673 error,
674 ),
675 });
676 }
677 }
678 }
679 }
680 }
681}
682
683/// Retries a progenitor client operation indefinitely until it succeeds or
684/// fails permanently.
685///
686/// Unlike [`retry_operation`], this function never returns a "retries
687/// exhausted" error. Transient (retryable) errors are retried according to
688/// the supplied [`IndefiniteBackoffParams`], indefinitely.
689///
690/// All non-retryable errors are returned immediately as
691/// [`IndefiniteRetryOperationError`].
692///
693/// `notify` is called on each transient failure with the error and the delay
694/// before the next attempt.
695///
696/// The `operation` must be idempotent.
697///
698/// # Examples
699///
700/// ```
701/// # #[tokio::main(flavor = "current_thread")]
702/// # async fn main() {
703/// use progenitor_extras::retry::{
704/// default_indefinite_retry_policy, retry_operation_indefinitely,
705/// };
706///
707/// // In practice, replace the closure body with a progenitor client
708/// // call, e.g. `|| async { client.some_endpoint().send().await }`.
709/// let result = retry_operation_indefinitely(
710/// default_indefinite_retry_policy(),
711/// || async { Ok::<_, progenitor_client::Error<()>>(42u32) },
712/// |notification| {
713/// eprintln!(
714/// "transient error ({:?}), retrying in {:?}",
715/// notification.error, notification.delay,
716/// );
717/// },
718/// )
719/// .await;
720///
721/// assert_eq!(result.unwrap(), 42);
722/// # }
723/// ```
724pub async fn retry_operation_indefinitely<T, E, N, F, Fut>(
725 backoff: IndefiniteBackoffParams,
726 mut operation: F,
727 mut notify: N,
728) -> Result<T, IndefiniteRetryOperationError<E>>
729where
730 N: FnMut(RetryNotification<E>),
731 F: FnMut() -> Fut,
732 Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
733{
734 // The iterator is infinite by construction.
735 let mut delays = backoff.build();
736 let mut attempt = 1;
737
738 loop {
739 match (operation)().await {
740 Ok(v) => return Ok(v),
741 Err(error) => {
742 if !error.is_retryable() {
743 return Err(IndefiniteRetryOperationError {
744 attempt,
745 error,
746 });
747 }
748 let delay = delays.next().unwrap_or_else(|| {
749 panic!(
750 "infinite backoff iterator produced a delay \
751 at attempt {attempt} (was usize::MAX exceeded?)"
752 )
753 });
754 notify(RetryNotification { attempt, error, delay });
755 tokio::time::sleep(delay).await;
756 attempt += 1;
757 }
758 }
759 }
760}
761
762/// Retries a progenitor client operation indefinitely with an additional
763/// "gone check."
764///
765/// Unlike [`retry_operation_while`], this function never returns a "retries
766/// exhausted" error. Transient errors are retried according to the supplied
767/// [`IndefiniteBackoffParams`], indefinitely.
768///
769/// Before each attempt, `gone_check` is called. If it returns
770/// `Ok(GoneCheckResult::Gone)`, the loop is aborted with
771/// [`IndefiniteRetryOperationWhileErrorKind::Gone`]. If the gone check
772/// itself fails, the loop is aborted with
773/// [`IndefiniteRetryOperationWhileErrorKind::GoneCheckError`].
774///
775/// Non-retryable errors are returned as
776/// [`IndefiniteRetryOperationWhileErrorKind::OperationError`].
777///
778/// Gone-check errors
779/// ([`IndefiniteRetryOperationWhileErrorKind::GoneCheckError`]) are treated
780/// as permanent and abort the loop immediately. If the gone check itself can
781/// fail transiently, handle retries within the `gone_check` closure.
782///
783/// `notify` is called on each transient failure with the error and the delay
784/// before the next attempt.
785///
786/// The `operation` must be idempotent.
787///
788/// # Examples
789///
790/// ```
791/// # #[tokio::main(flavor = "current_thread")]
792/// # async fn main() {
793/// use progenitor_extras::retry::{
794/// GoneCheckResult, default_indefinite_retry_policy,
795/// retry_operation_while_indefinitely,
796/// };
797///
798/// // In practice, replace these closure bodies with real client calls
799/// // and a real gone check (e.g. querying whether a sled is in service).
800/// let result = retry_operation_while_indefinitely(
801/// default_indefinite_retry_policy(),
802/// || async { Ok::<_, progenitor_client::Error<()>>(42u32) },
803/// || async {
804/// Ok::<_, std::convert::Infallible>(GoneCheckResult::StillAvailable)
805/// },
806/// |notification| {
807/// eprintln!(
808/// "transient error ({:?}), retrying in {:?}",
809/// notification.error, notification.delay,
810/// );
811/// },
812/// )
813/// .await;
814///
815/// assert_eq!(result.unwrap(), 42);
816/// # }
817/// ```
818pub async fn retry_operation_while_indefinitely<
819 T,
820 E,
821 GoneErr,
822 N,
823 F,
824 Fut,
825 GF,
826 GFut,
827>(
828 backoff: IndefiniteBackoffParams,
829 mut operation: F,
830 mut gone_check: GF,
831 mut notify: N,
832) -> Result<T, IndefiniteRetryOperationWhileError<E, GoneErr>>
833where
834 N: FnMut(RetryNotification<E>),
835 F: FnMut() -> Fut,
836 Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
837 GF: FnMut() -> GFut,
838 GFut: Future<Output = Result<GoneCheckResult, GoneErr>>,
839{
840 // The iterator is infinite by construction.
841 let mut delays = backoff.build();
842 let mut attempt = 1;
843
844 loop {
845 // Check if the target is still available before attempting
846 // the operation. See the comment in `retry_operation_while` for
847 // discussion of the before-vs-after ordering.
848 match (gone_check)().await {
849 Ok(GoneCheckResult::Gone) => {
850 return Err(IndefiniteRetryOperationWhileError {
851 attempt,
852 kind: IndefiniteRetryOperationWhileErrorKind::Gone,
853 });
854 }
855 Ok(GoneCheckResult::StillAvailable) => {}
856 Err(e) => {
857 return Err(IndefiniteRetryOperationWhileError {
858 attempt,
859 kind:
860 IndefiniteRetryOperationWhileErrorKind::GoneCheckError(
861 e,
862 ),
863 });
864 }
865 }
866
867 match (operation)().await {
868 Ok(v) => return Ok(v),
869 Err(error) => {
870 if !error.is_retryable() {
871 return Err(IndefiniteRetryOperationWhileError {
872 attempt,
873 kind:
874 IndefiniteRetryOperationWhileErrorKind::OperationError(
875 error,
876 ),
877 });
878 }
879 let delay = delays.next().unwrap_or_else(|| {
880 panic!(
881 "infinite backoff iterator produced a delay \
882 at attempt {attempt} (was usize::MAX exceeded?)"
883 )
884 });
885 notify(RetryNotification { attempt, error, delay });
886 tokio::time::sleep(delay).await;
887 attempt += 1;
888 }
889 }
890 }
891}