entelix_runnable/
fallback.rs1use std::marker::PhantomData;
12use std::sync::Arc;
13
14use async_trait::async_trait;
15
16use entelix_core::ExecutionContext;
17use entelix_core::error::{Error, Result};
18use entelix_core::transports::{DefaultRetryClassifier, RetryClassifier};
19
20use crate::runnable::Runnable;
21
22pub struct Fallback<R, F, I, O>
29where
30 R: Runnable<I, O> + 'static,
31 F: Runnable<I, O> + 'static,
32 I: Clone + Send + 'static,
33 O: Send + 'static,
34{
35 primary: Arc<R>,
36 fallbacks: Vec<Arc<F>>,
37 classifier: Arc<dyn RetryClassifier>,
38 _io: PhantomData<fn(I) -> O>,
39}
40
41impl<R, F, I, O> Fallback<R, F, I, O>
42where
43 R: Runnable<I, O> + 'static,
44 F: Runnable<I, O> + 'static,
45 I: Clone + Send + 'static,
46 O: Send + 'static,
47{
48 pub fn new(primary: R, fallbacks: Vec<F>) -> Self {
52 Self {
53 primary: Arc::new(primary),
54 fallbacks: fallbacks.into_iter().map(Arc::new).collect(),
55 classifier: Arc::new(DefaultRetryClassifier),
56 _io: PhantomData,
57 }
58 }
59
60 #[must_use]
64 pub fn with_classifier(mut self, classifier: Arc<dyn RetryClassifier>) -> Self {
65 self.classifier = classifier;
66 self
67 }
68}
69
70impl<R, F, I, O> std::fmt::Debug for Fallback<R, F, I, O>
71where
72 R: Runnable<I, O> + 'static,
73 F: Runnable<I, O> + 'static,
74 I: Clone + Send + 'static,
75 O: Send + 'static,
76{
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("Fallback")
79 .field("fallback_count", &self.fallbacks.len())
80 .finish_non_exhaustive()
81 }
82}
83
84#[async_trait]
85impl<R, F, I, O> Runnable<I, O> for Fallback<R, F, I, O>
86where
87 R: Runnable<I, O> + 'static,
88 F: Runnable<I, O> + 'static,
89 I: Clone + Send + 'static,
90 O: Send + 'static,
91{
92 async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O> {
93 if ctx.is_cancelled() {
94 return Err(Error::Cancelled);
95 }
96 let mut attempt: u32 = 0;
97 let primary_result = self.primary.invoke(input.clone(), ctx).await;
98 let mut last_err = match primary_result {
99 Ok(value) => return Ok(value),
100 Err(err) => {
101 if !self.classifier.should_retry(&err, attempt).retry {
105 return Err(err);
106 }
107 err
108 }
109 };
110 for fallback in &self.fallbacks {
111 attempt = attempt.saturating_add(1);
112 if ctx.is_cancelled() {
113 return Err(Error::Cancelled);
114 }
115 match fallback.invoke(input.clone(), ctx).await {
116 Ok(value) => return Ok(value),
117 Err(err) => {
118 if !self.classifier.should_retry(&err, attempt).retry {
119 return Err(err);
120 }
121 last_err = err;
122 }
123 }
124 }
125 Err(last_err)
126 }
127}