agentik_sdk/http/
retry.rs1use std::time::{Duration, Instant};
2use crate::types::{AnthropicError, Result};
4
5#[derive(Debug, Clone)]
7pub struct RetryPolicy {
8 pub max_retries: u32,
10 pub initial_delay: Duration,
12 pub max_delay: Duration,
14 pub multiplier: f64,
16 pub jitter: bool,
18 pub max_elapsed_time: Option<Duration>,
20 pub retry_conditions: Vec<RetryCondition>,
22}
23
24#[derive(Debug, Clone, PartialEq)]
26pub enum RetryCondition {
27 Timeout,
29 ConnectionError,
31 HttpStatus(u16),
33 RateLimit,
35 ServerError,
37 AuthenticationError,
39 All,
41}
42
43#[derive(Debug)]
45pub struct RetryExecutor {
46 policy: RetryPolicy,
47}
48
49#[derive(Debug)]
51pub enum RetryResult<T> {
52 Success(T),
54 Failed(AnthropicError),
56}
57
58impl Default for RetryPolicy {
59 fn default() -> Self {
60 Self {
61 max_retries: 3,
62 initial_delay: Duration::from_millis(100),
63 max_delay: Duration::from_secs(30),
64 multiplier: 2.0,
65 jitter: true,
66 max_elapsed_time: Some(Duration::from_secs(60)),
67 retry_conditions: vec![
68 RetryCondition::Timeout,
69 RetryCondition::ConnectionError,
70 RetryCondition::RateLimit,
71 RetryCondition::ServerError,
72 ],
73 }
74 }
75}
76
77impl RetryPolicy {
78 pub fn exponential() -> Self {
80 Self::default()
81 }
82
83 pub fn max_retries(mut self, max_retries: u32) -> Self {
85 self.max_retries = max_retries;
86 self
87 }
88
89 pub fn initial_delay(mut self, delay: Duration) -> Self {
91 self.initial_delay = delay;
92 self
93 }
94
95 pub fn max_delay(mut self, delay: Duration) -> Self {
97 self.max_delay = delay;
98 self
99 }
100
101 pub fn multiplier(mut self, multiplier: f64) -> Self {
103 self.multiplier = multiplier;
104 self
105 }
106
107 pub fn jitter(mut self, jitter: bool) -> Self {
109 self.jitter = jitter;
110 self
111 }
112
113 pub fn max_elapsed_time(mut self, max_elapsed: Duration) -> Self {
115 self.max_elapsed_time = Some(max_elapsed);
116 self
117 }
118
119 pub fn retry_conditions(mut self, conditions: Vec<RetryCondition>) -> Self {
121 self.retry_conditions = conditions;
122 self
123 }
124
125 pub fn should_retry(&self, error: &AnthropicError) -> bool {
127 for condition in &self.retry_conditions {
128 match condition {
129 RetryCondition::All => return true,
130 RetryCondition::Timeout => {
131 if matches!(error, AnthropicError::Timeout) {
132 return true;
133 }
134 }
135 RetryCondition::ConnectionError => {
136 if matches!(error, AnthropicError::NetworkError(_)) {
137 return true;
138 }
139 }
140 RetryCondition::HttpStatus(code) => {
141 if let AnthropicError::HttpError { status, .. } = error {
142 if status == code {
143 return true;
144 }
145 }
146 }
147 RetryCondition::RateLimit => {
148 if let AnthropicError::HttpError { status, .. } = error {
149 if *status == 429 {
150 return true;
151 }
152 }
153 }
154 RetryCondition::ServerError => {
155 if let AnthropicError::HttpError { status, .. } = error {
156 if *status >= 500 && *status < 600 {
157 return true;
158 }
159 }
160 }
161 RetryCondition::AuthenticationError => {
162 if let AnthropicError::HttpError { status, .. } = error {
163 if *status == 401 {
164 return true;
165 }
166 }
167 }
168 }
169 }
170 false
171 }
172
173 pub fn calculate_delay(&self, attempt: u32) -> Duration {
175 let base_delay = self.initial_delay.as_millis() as f64;
176 let delay_ms = base_delay * self.multiplier.powi(attempt as i32);
177 let delay = Duration::from_millis(delay_ms as u64);
178
179 let delay = std::cmp::min(delay, self.max_delay);
180
181 if self.jitter {
182 self.add_jitter(delay)
183 } else {
184 delay
185 }
186 }
187
188 fn add_jitter(&self, delay: Duration) -> Duration {
189 let jitter_range = delay.as_millis() as f64 * 0.1; let jitter = (std::ptr::addr_of!(self) as usize % 100) as f64 / 100.0 * jitter_range;
192 let jittered_ms = (delay.as_millis() as f64 + jitter) as u64;
193 Duration::from_millis(jittered_ms)
194 }
195}
196
197impl RetryExecutor {
198 pub fn new(policy: RetryPolicy) -> Self {
200 Self { policy }
201 }
202
203 pub async fn execute<T, F, Fut>(&self, operation: F) -> RetryResult<T>
205 where
206 F: Fn() -> Fut,
207 Fut: std::future::Future<Output = Result<T>>,
208 {
209 let start_time = Instant::now();
210 let mut last_error = None;
211
212 for attempt in 0..=self.policy.max_retries {
213 if let Some(max_elapsed) = self.policy.max_elapsed_time {
215 if start_time.elapsed() >= max_elapsed {
216 break;
217 }
218 }
219
220 match operation().await {
221 Ok(result) => {
222 return RetryResult::Success(result);
223 }
224 Err(error) => {
225 last_error = Some(error.clone());
226
227 if attempt < self.policy.max_retries && self.policy.should_retry(&error) {
229 let delay = self.policy.calculate_delay(attempt);
230 tracing::debug!(
231 "Request failed (attempt {}/{}): {}. Retrying in {:?}",
232 attempt + 1,
233 self.policy.max_retries + 1,
234 error,
235 delay
236 );
237 tokio::time::sleep(delay).await;
238 } else {
239 break;
240 }
241 }
242 }
243 }
244
245 RetryResult::Failed(last_error.unwrap_or_else(|| {
246 AnthropicError::Other("Unknown error in retry executor".to_string())
247 }))
248 }
249
250 pub fn get_policy(&self) -> &RetryPolicy {
252 &self.policy
253 }
254}
255
256pub fn default_retry() -> RetryExecutor {
258 RetryExecutor::new(RetryPolicy::default())
259}
260
261pub fn api_retry() -> RetryExecutor {
263 RetryExecutor::new(
264 RetryPolicy::exponential()
265 .max_retries(3)
266 .initial_delay(Duration::from_millis(500))
267 .max_delay(Duration::from_secs(30))
268 .retry_conditions(vec![
269 RetryCondition::RateLimit,
270 RetryCondition::ServerError,
271 RetryCondition::Timeout,
272 RetryCondition::ConnectionError,
273 ])
274 )
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn test_retry_policy_should_retry() {
283 let policy = RetryPolicy::default();
284
285 assert!(policy.should_retry(&AnthropicError::Timeout));
286 assert!(policy.should_retry(&AnthropicError::HttpError {
287 status: 429,
288 message: "Rate limited".to_string(),
289 }));
290 assert!(policy.should_retry(&AnthropicError::HttpError {
291 status: 500,
292 message: "Server error".to_string(),
293 }));
294 assert!(!policy.should_retry(&AnthropicError::InvalidApiKey));
295 }
296
297 #[test]
298 fn test_delay_calculation() {
299 let policy = RetryPolicy::exponential()
300 .initial_delay(Duration::from_millis(100))
301 .multiplier(2.0)
302 .jitter(false);
303
304 assert_eq!(policy.calculate_delay(0), Duration::from_millis(100));
305 assert_eq!(policy.calculate_delay(1), Duration::from_millis(200));
306 assert_eq!(policy.calculate_delay(2), Duration::from_millis(400));
307 }
308
309 #[tokio::test]
310 async fn test_retry_executor_success() {
311 let policy = RetryPolicy::exponential().max_retries(2);
312 let executor = RetryExecutor::new(policy);
313
314 let result = executor.execute(|| async {
315 Ok::<i32, AnthropicError>(42)
316 }).await;
317
318 match result {
319 RetryResult::Success(value) => assert_eq!(value, 42),
320 _ => panic!("Expected success"),
321 }
322 }
323
324 #[tokio::test]
325 async fn test_retry_executor_failure() {
326 let policy = RetryPolicy::exponential()
327 .max_retries(1)
328 .initial_delay(Duration::from_millis(1));
329 let executor = RetryExecutor::new(policy);
330
331 let result = executor.execute(|| async {
332 Err::<i32, AnthropicError>(AnthropicError::InvalidApiKey)
333 }).await;
334
335 match result {
336 RetryResult::Failed(_) => {},
337 _ => panic!("Expected failure"),
338 }
339 }
340}