ai_lib_rust/pipeline/retry.rs
1//! Retry Operator
2//!
3//! This operator handles automatic retries for transient errors.
4//! It wraps the source stream logic rather than just transforming the output.
5
6use async_trait::async_trait;
7use tokio::time::Duration;
8
9/// Configuration for retry logic
10#[derive(Debug, Clone)]
11pub struct RetryConfig {
12 pub max_retries: u32,
13 pub min_delay: Duration,
14 pub max_delay: Duration,
15 pub jitter: bool,
16 pub retry_on_status: Vec<u16>,
17}
18
19pub struct RetryOperator {
20 config: RetryConfig,
21}
22
23impl RetryOperator {
24 pub fn new(config: RetryConfig) -> Self {
25 Self { config }
26 }
27
28 /// Calculate backoff with jitter
29 fn backoff(&self, attempt: u32) -> Duration {
30 let base = self.config.min_delay.as_millis() as u64;
31 let cap = self.config.max_delay.as_millis() as u64;
32
33 // Exponential backoff: base * 2^attempt
34 let mut delay = base.saturating_mul(1u64 << attempt);
35 if delay > cap {
36 delay = cap;
37 }
38
39 let duration = Duration::from_millis(delay);
40
41 if self.config.jitter {
42 // Simple jitter: random standard distribution +/- 10%
43 // In production, use a proper RNG
44 duration
45 } else {
46 duration
47 }
48 }
49}
50
51// Logic Note:
52// A "Retry Operator" in a stream pipeline is complex because if the *source* fails,
53// the stream is already dead. We can't just "retry the stream" from the middle.
54// True retries need to happen at the *request execution* level, not just the data processing level.
55// However, we can model this as a "Resilience Wrapper" around the transport execution.
56// For the purpose of this refactor, we will define the struct here but integration
57// will happen in the Execution layer (Client Core), as the user requested "Abstract logic as a Pipeline Operator".
58// We will interpret "Pipeline" broadly as the "Request Processing Pipeline", not just "Response Data Pipeline".
59
60#[async_trait]
61pub trait ResiliencePolicy: Send + Sync {
62 async fn should_retry(&self, attempt: u32, error: &crate::Error) -> Option<Duration>;
63}
64
65#[async_trait]
66impl ResiliencePolicy for RetryOperator {
67 async fn should_retry(&self, attempt: u32, error: &crate::Error) -> Option<Duration> {
68 if attempt >= self.config.max_retries {
69 return None;
70 }
71
72 // Check if error is retryable
73 // Priority 1: Use improved ErrorContext 2.0 flags if available
74 if let Some(ctx) = error.context() {
75 if let Some(retryable) = ctx.retryable {
76 if retryable {
77 return Some(self.backoff(attempt));
78 } else {
79 return None;
80 }
81 }
82 }
83
84 // For Remote: use its retryable flag when ErrorContext does not apply
85 if let crate::Error::Remote {
86 retryable: true, ..
87 } = error
88 {
89 return Some(self.backoff(attempt));
90 }
91
92 // Priority 2: Fallback to heuristic classification
93 if matches!(error, crate::Error::Runtime { .. })
94 || matches!(error, crate::Error::Transport(_))
95 {
96 return Some(self.backoff(attempt));
97 }
98
99 None
100 }
101}