Skip to main content

ai_lib_rust/pipeline/
retry.rs

1//! Retry Operator
2//!
3//! This operator handles automatic retries for transient errors.
4//! It wraps the source stream logic rather than just transforming the output.
5
6use async_trait::async_trait;
7use tokio::time::Duration;
8
9/// Configuration for retry logic
10#[derive(Debug, Clone)]
11pub struct RetryConfig {
12    pub max_retries: u32,
13    pub min_delay: Duration,
14    pub max_delay: Duration,
15    pub jitter: bool,
16    pub retry_on_status: Vec<u16>,
17}
18
19pub struct RetryOperator {
20    config: RetryConfig,
21}
22
23impl RetryOperator {
24    pub fn new(config: RetryConfig) -> Self {
25        Self { config }
26    }
27
28    /// Calculate backoff with jitter
29    fn backoff(&self, attempt: u32) -> Duration {
30        let base = self.config.min_delay.as_millis() as u64;
31        let cap = self.config.max_delay.as_millis() as u64;
32
33        // Exponential backoff: base * 2^attempt
34        let mut delay = base.saturating_mul(1u64 << attempt);
35        if delay > cap {
36            delay = cap;
37        }
38
39        let duration = Duration::from_millis(delay);
40
41        if self.config.jitter {
42            // Simple jitter: random standard distribution +/- 10%
43            // In production, use a proper RNG
44            duration
45        } else {
46            duration
47        }
48    }
49}
50
51// Logic Note:
52// A "Retry Operator" in a stream pipeline is complex because if the *source* fails,
53// the stream is already dead. We can't just "retry the stream" from the middle.
54// True retries need to happen at the *request execution* level, not just the data processing level.
55// However, we can model this as a "Resilience Wrapper" around the transport execution.
56// For the purpose of this refactor, we will define the struct here but integration
57// will happen in the Execution layer (Client Core), as the user requested "Abstract logic as a Pipeline Operator".
58// We will interpret "Pipeline" broadly as the "Request Processing Pipeline", not just "Response Data Pipeline".
59
60#[async_trait]
61pub trait ResiliencePolicy: Send + Sync {
62    async fn should_retry(&self, attempt: u32, error: &crate::Error) -> Option<Duration>;
63}
64
65#[async_trait]
66impl ResiliencePolicy for RetryOperator {
67    async fn should_retry(&self, attempt: u32, error: &crate::Error) -> Option<Duration> {
68        if attempt >= self.config.max_retries {
69            return None;
70        }
71
72        // Check if error is retryable
73        // Priority 1: Use improved ErrorContext 2.0 flags if available
74        if let Some(ctx) = error.context() {
75            if let Some(retryable) = ctx.retryable {
76                if retryable {
77                    return Some(self.backoff(attempt));
78                } else {
79                    return None;
80                }
81            }
82        }
83
84        // For Remote: use its retryable flag when ErrorContext does not apply
85        if let crate::Error::Remote {
86            retryable: true, ..
87        } = error
88        {
89            return Some(self.backoff(attempt));
90        }
91
92        // Priority 2: Fallback to heuristic classification
93        if matches!(error, crate::Error::Runtime { .. })
94            || matches!(error, crate::Error::Transport(_))
95        {
96            return Some(self.backoff(attempt));
97        }
98
99        None
100    }
101}