synaptic_runnables/
retry.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use synaptic_core::{RunnableConfig, SynapseError};
5
6use crate::runnable::{BoxRunnable, Runnable};
7
8pub struct RetryPolicy {
13 pub max_attempts: usize,
15 pub base_delay: Duration,
18 pub max_delay: Duration,
20 #[allow(clippy::type_complexity)]
23 retry_on: Option<Box<dyn Fn(&SynapseError) -> bool + Send + Sync>>,
24}
25
26impl std::fmt::Debug for RetryPolicy {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("RetryPolicy")
29 .field("max_attempts", &self.max_attempts)
30 .field("base_delay", &self.base_delay)
31 .field("max_delay", &self.max_delay)
32 .field("retry_on", &self.retry_on.as_ref().map(|_| "..."))
33 .finish()
34 }
35}
36
37impl Default for RetryPolicy {
38 fn default() -> Self {
39 Self {
40 max_attempts: 3,
41 base_delay: Duration::from_millis(100),
42 max_delay: Duration::from_secs(10),
43 retry_on: None,
44 }
45 }
46}
47
48impl RetryPolicy {
49 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
51 self.max_attempts = max_attempts;
52 self
53 }
54
55 pub fn with_base_delay(mut self, base_delay: Duration) -> Self {
57 self.base_delay = base_delay;
58 self
59 }
60
61 pub fn with_max_delay(mut self, max_delay: Duration) -> Self {
63 self.max_delay = max_delay;
64 self
65 }
66
67 pub fn with_retry_on(
70 mut self,
71 predicate: impl Fn(&SynapseError) -> bool + Send + Sync + 'static,
72 ) -> Self {
73 self.retry_on = Some(Box::new(predicate));
74 self
75 }
76
77 fn delay_for_attempt(&self, attempt: usize) -> Duration {
79 let delay = self.base_delay.saturating_mul(1 << attempt);
80 std::cmp::min(delay, self.max_delay)
81 }
82
83 fn should_retry(&self, error: &SynapseError) -> bool {
85 match &self.retry_on {
86 Some(predicate) => predicate(error),
87 None => true,
88 }
89 }
90}
91
92pub struct RunnableRetry<I: Send + Clone + 'static, O: Send + 'static> {
104 inner: BoxRunnable<I, O>,
105 policy: RetryPolicy,
106}
107
108impl<I: Send + Clone + 'static, O: Send + 'static> RunnableRetry<I, O> {
109 pub fn new(inner: BoxRunnable<I, O>, policy: RetryPolicy) -> Self {
110 Self { inner, policy }
111 }
112}
113
114#[async_trait]
115impl<I: Send + Clone + 'static, O: Send + 'static> Runnable<I, O> for RunnableRetry<I, O> {
116 async fn invoke(&self, input: I, config: &RunnableConfig) -> Result<O, SynapseError> {
117 let mut last_error: Option<SynapseError> = None;
118
119 for attempt in 0..self.policy.max_attempts {
120 let input_clone = input.clone();
121 match self.inner.invoke(input_clone, config).await {
122 Ok(output) => return Ok(output),
123 Err(e) => {
124 let is_last_attempt = attempt + 1 >= self.policy.max_attempts;
125 if is_last_attempt || !self.policy.should_retry(&e) {
126 return Err(e);
127 }
128
129 let delay = self.policy.delay_for_attempt(attempt);
130 tokio::time::sleep(delay).await;
131 last_error = Some(e);
132 }
133 }
134 }
135
136 Err(last_error.unwrap_or_else(|| {
138 SynapseError::Config("RunnableRetry: max_attempts must be >= 1".into())
139 }))
140 }
141}