retry_patterns/
retry_patterns.rs

1#![allow(clippy::uninlined_format_args)]
2//! Resilience and retry strategies for `OpenAI` API calls.
3//!
4//! This example demonstrates:
5//! - Exponential backoff
6//! - Retry with jitter
7//! - Circuit breaker pattern
8//! - Timeout and deadline management
9//! - Request hedging
10//! - Fallback strategies
11//! - Idempotency keys
12//!
13//! Run with: `cargo run --example retry_patterns`
14
15use openai_ergonomic::{Client, Config, Error, Result};
16use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19use tokio::time::{sleep, timeout};
20
21#[tokio::main]
22async fn main() -> Result<()> {
23    println!("=== Retry and Resilience Patterns ===\n");
24
25    // Initialize client
26    let client = Client::from_env()?.build();
27
28    // Example 1: Simple retry
29    println!("1. Simple Retry:");
30    simple_retry(&client).await?;
31
32    // Example 2: Exponential backoff
33    println!("\n2. Exponential Backoff:");
34    exponential_backoff(&client).await?;
35
36    // Example 3: Retry with jitter
37    println!("\n3. Retry with Jitter:");
38    retry_with_jitter(&client).await?;
39
40    // Example 4: Circuit breaker
41    println!("\n4. Circuit Breaker:");
42    circuit_breaker_example(&client).await?;
43
44    // Example 5: Timeout management
45    println!("\n5. Timeout Management:");
46    timeout_management(&client).await;
47
48    // Example 6: Request hedging
49    println!("\n6. Request Hedging:");
50    request_hedging(&client).await?;
51
52    // Example 7: Fallback chain
53    println!("\n7. Fallback Chain:");
54    fallback_chain(&client).await?;
55
56    // Example 8: Idempotency
57    println!("\n8. Idempotency:");
58    idempotency_example(&client).await?;
59
60    Ok(())
61}
62
63async fn simple_retry(client: &Client) -> Result<()> {
64    const MAX_RETRIES: u32 = 3;
65
66    for attempt in 1..=MAX_RETRIES {
67        println!("Attempt {}/{}", attempt, MAX_RETRIES);
68
69        match client.send_chat(client.chat_simple("Hello")).await {
70            Ok(response) => {
71                if let Some(content) = response.content() {
72                    println!("Success: {}", content);
73                } else {
74                    println!("Success: (no content)");
75                }
76                return Ok(());
77            }
78            Err(e) if attempt < MAX_RETRIES => {
79                println!("Failed (attempt {}): {}. Retrying...", attempt, e);
80                sleep(Duration::from_secs(1)).await;
81            }
82            Err(e) => {
83                println!("All retries exhausted");
84                return Err(e);
85            }
86        }
87    }
88
89    Ok(())
90}
91
92async fn exponential_backoff(client: &Client) -> Result<()> {
93    const MAX_RETRIES: u32 = 5;
94    const BASE_DELAY: Duration = Duration::from_millis(100);
95    const MAX_DELAY: Duration = Duration::from_secs(32);
96
97    let mut delay = BASE_DELAY;
98
99    for attempt in 1..=MAX_RETRIES {
100        match client
101            .send_chat(client.chat_simple("Hello with backoff"))
102            .await
103        {
104            Ok(response) => {
105                if let Some(content) = response.content() {
106                    println!("Success after {} attempts: {}", attempt, content);
107                } else {
108                    println!("Success after {} attempts: (no content)", attempt);
109                }
110                return Ok(());
111            }
112            Err(Error::RateLimit(_message)) => {
113                // Use default delay for rate limiting
114                let wait_time = delay;
115                println!(
116                    "Rate limited (attempt {}). Waiting {:?}...",
117                    attempt, wait_time
118                );
119                sleep(wait_time).await;
120
121                // Double the delay for next attempt
122                delay = (delay * 2).min(MAX_DELAY);
123            }
124            Err(e) if attempt < MAX_RETRIES => {
125                println!("Error (attempt {}): {}. Waiting {:?}...", attempt, e, delay);
126                sleep(delay).await;
127
128                // Exponential increase with cap
129                delay = (delay * 2).min(MAX_DELAY);
130            }
131            Err(e) => return Err(e),
132        }
133    }
134
135    Ok(())
136}
137
138async fn retry_with_jitter(client: &Client) -> Result<()> {
139    const MAX_RETRIES: u32 = 5;
140    const BASE_DELAY_MS: u64 = 100;
141
142    for attempt in 1..=MAX_RETRIES {
143        match client
144            .send_chat(client.chat_simple("Hello with jitter"))
145            .await
146        {
147            Ok(response) => {
148                if let Some(content) = response.content() {
149                    println!("Success: {}", content);
150                } else {
151                    println!("Success: (no content)");
152                }
153                return Ok(());
154            }
155            Err(e) if attempt < MAX_RETRIES => {
156                // Calculate delay with jitter using random() instead of thread_rng for Send compatibility
157                let base = BASE_DELAY_MS * 2_u64.pow(attempt - 1);
158                let jitter = rand::random::<u64>() % (base / 2 + 1);
159                let delay = Duration::from_millis(base + jitter);
160
161                println!(
162                    "Attempt {} failed: {}. Retrying in {:?} (with jitter)...",
163                    attempt, e, delay
164                );
165                sleep(delay).await;
166            }
167            Err(e) => return Err(e),
168        }
169    }
170
171    Ok(())
172}
173
174async fn circuit_breaker_example(client: &Client) -> Result<()> {
175    let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(5)));
176
177    for i in 1..=10 {
178        println!("Request {}: ", i);
179
180        // Check circuit state
181        match circuit_breaker
182            .call(|| async {
183                client
184                    .send_chat(client.chat_simple("Circuit breaker test"))
185                    .await
186            })
187            .await
188        {
189            Ok(response) => {
190                if let Some(content) = response.content() {
191                    println!("  Success: {}", content);
192                } else {
193                    println!("  Success: (no content)");
194                }
195            }
196            Err(CircuitBreakerError::Open) => {
197                println!("  Circuit is OPEN - skipping request");
198                sleep(Duration::from_secs(1)).await;
199            }
200            Err(CircuitBreakerError::RequestFailed(e)) => {
201                println!("  Request failed: {}", e);
202            }
203        }
204
205        // Small delay between requests
206        sleep(Duration::from_millis(500)).await;
207    }
208
209    Ok(())
210}
211
212async fn timeout_management(client: &Client) {
213    // Example 1: Per-request timeout
214    println!("Per-request timeout:");
215    match timeout(
216        Duration::from_secs(5),
217        client.send_chat(client.chat_simple("Hello")),
218    )
219    .await
220    {
221        Ok(Ok(response)) => {
222            if let Some(content) = response.content() {
223                println!("Response received: {}", content);
224            } else {
225                println!("Response received: (no content)");
226            }
227        }
228        Ok(Err(e)) => println!("API error: {}", e),
229        Err(_) => println!("Request timed out after 5 seconds"),
230    }
231
232    // Example 2: Deadline-based timeout
233    println!("\nDeadline-based timeout:");
234    let deadline = Instant::now() + Duration::from_secs(10);
235
236    while Instant::now() < deadline {
237        let remaining = deadline - Instant::now();
238        println!("Time remaining: {:?}", remaining);
239
240        match timeout(
241            remaining,
242            client.send_chat(client.chat_simple("Quick response")),
243        )
244        .await
245        {
246            Ok(Ok(response)) => {
247                if let Some(content) = response.content() {
248                    println!("Got response: {}", content);
249                } else {
250                    println!("Got response: (no content)");
251                }
252                break;
253            }
254            Ok(Err(e)) => {
255                println!("Error: {}. Retrying...", e);
256                sleep(Duration::from_secs(1)).await;
257            }
258            Err(_) => {
259                println!("Deadline exceeded");
260                break;
261            }
262        }
263    }
264
265    // Example 3: Adaptive timeout
266    println!("\nAdaptive timeout:");
267    let mut adaptive_timeout = Duration::from_secs(2);
268
269    for _attempt in 1..=3 {
270        let start = Instant::now();
271
272        match timeout(
273            adaptive_timeout,
274            client.send_chat(client.chat_simple("Adaptive")),
275        )
276        .await
277        {
278            Ok(Ok(response)) => {
279                let elapsed = start.elapsed();
280                println!(
281                    "Success in {:?}. Next timeout would be {:?}.",
282                    elapsed,
283                    elapsed * 2
284                );
285                // Adjust timeout based on actual response time for potential future requests
286                // adaptive_timeout = elapsed * 2; // Not used since we break out of the loop
287                if let Some(content) = response.content() {
288                    println!("Response: {}", content);
289                } else {
290                    println!("Response: (no content)");
291                }
292                break;
293            }
294            Ok(Err(e)) => println!("Error: {}", e),
295            Err(_) => {
296                println!(
297                    "Timeout after {:?}. Increasing for next attempt.",
298                    adaptive_timeout
299                );
300                adaptive_timeout *= 2;
301            }
302        }
303    }
304}
305
306async fn request_hedging(client: &Client) -> Result<()> {
307    use futures::future::{select, Either};
308    use std::pin::pin;
309
310    println!("Launching hedged requests...");
311
312    // Launch multiple requests with staggered starts
313    let request1 = async {
314        println!("Request 1 started");
315        client
316            .send_chat(client.chat_simple("Hedged request 1"))
317            .await
318    };
319
320    let request2 = async {
321        sleep(Duration::from_millis(200)).await;
322        println!("Request 2 started (200ms delay)");
323        client
324            .send_chat(client.chat_simple("Hedged request 2"))
325            .await
326    };
327
328    let fut1 = pin!(request1);
329    let fut2 = pin!(request2);
330
331    // Return first successful response
332    match select(fut1, fut2).await {
333        Either::Left((result, _)) => {
334            println!("Request 1 won the race");
335            result.map(|r| {
336                if let Some(content) = r.content() {
337                    println!("Result: {}", content);
338                } else {
339                    println!("Result: (no content)");
340                }
341            })
342        }
343        Either::Right((result, _)) => {
344            println!("Request 2 won the race");
345            result.map(|r| {
346                if let Some(content) = r.content() {
347                    println!("Result: {}", content);
348                } else {
349                    println!("Result: (no content)");
350                }
351            })
352        }
353    }
354}
355
356async fn fallback_chain(client: &Client) -> Result<()> {
357    // Define fallback chain
358    let strategies = vec![
359        ("GPT-4o", "gpt-4o", 1024),
360        ("GPT-4o-mini", "gpt-4o-mini", 512),
361        ("GPT-3.5", "gpt-3.5-turbo", 256),
362    ];
363
364    let prompt = "Explain quantum computing";
365
366    for (name, _model, max_tokens) in strategies {
367        println!("Trying {} (max_tokens: {})", name, max_tokens);
368
369        let builder = client.chat().user(prompt).max_completion_tokens(max_tokens);
370        match client.send_chat(builder).await {
371            Ok(response) => {
372                println!("Success with {}", name);
373                if let Some(content) = response.content() {
374                    println!("Response: {}...", &content[..content.len().min(100)]);
375                }
376                return Ok(());
377            }
378            Err(e) => {
379                println!("Failed with {}: {}", name, e);
380            }
381        }
382    }
383
384    println!("All fallback strategies exhausted");
385    Ok(())
386}
387
388async fn idempotency_example(_client: &Client) -> Result<()> {
389    // Generate idempotency key
390    let idempotency_key = generate_idempotency_key();
391    println!("Using idempotency key: {}", idempotency_key);
392
393    // Simulate retrying the same request
394    for attempt in 1..=3 {
395        println!("\nAttempt {} with same idempotency key", attempt);
396
397        // In a real implementation, you'd pass the idempotency key in headers
398        let mut headers = std::collections::HashMap::new();
399        headers.insert("Idempotency-Key".to_string(), idempotency_key.clone());
400        println!("  Would send {} headers", headers.len());
401
402        let config = Config::builder()
403            .api_key(std::env::var("OPENAI_API_KEY").unwrap_or_default())
404            .build();
405
406        // Note: Headers (including idempotency key) are not yet supported in current API
407
408        let client_with_idempotency = Client::builder(config)?.build();
409
410        match client_with_idempotency
411            .send_chat(client_with_idempotency.chat_simple("Idempotent request"))
412            .await
413        {
414            Ok(response) => {
415                if let Some(content) = response.content() {
416                    println!("Response: {}", content);
417                } else {
418                    println!("Response: (no content)");
419                }
420                // Server should return same response for same idempotency key
421            }
422            Err(e) => println!("Error: {}", e),
423        }
424
425        if attempt < 3 {
426            sleep(Duration::from_secs(1)).await;
427        }
428    }
429
430    Ok(())
431}
432
433fn generate_idempotency_key() -> String {
434    let timestamp = SystemTime::now()
435        .duration_since(UNIX_EPOCH)
436        .unwrap()
437        .as_secs();
438    let random: u32 = rand::random();
439    format!("req-{}-{}", timestamp, random)
440}
441
442// Circuit Breaker Implementation
443#[derive(Debug)]
444enum CircuitState {
445    Closed,
446    Open,
447    HalfOpen,
448}
449
450struct CircuitBreaker {
451    state: Arc<tokio::sync::RwLock<CircuitState>>,
452    failure_count: Arc<AtomicU32>,
453    last_failure_time: Arc<AtomicU64>,
454    threshold: u32,
455    timeout: Duration,
456}
457
458#[derive(Debug)]
459enum CircuitBreakerError {
460    Open,
461    RequestFailed(Error),
462}
463
464impl CircuitBreaker {
465    fn new(threshold: u32, timeout: Duration) -> Self {
466        Self {
467            state: Arc::new(tokio::sync::RwLock::new(CircuitState::Closed)),
468            failure_count: Arc::new(AtomicU32::new(0)),
469            last_failure_time: Arc::new(AtomicU64::new(0)),
470            threshold,
471            timeout,
472        }
473    }
474
475    async fn call<F, Fut, T>(&self, f: F) -> std::result::Result<T, CircuitBreakerError>
476    where
477        F: FnOnce() -> Fut,
478        Fut: std::future::Future<Output = Result<T>>,
479    {
480        // Check if circuit should transition from Open to HalfOpen
481        let mut state = self.state.write().await;
482        if matches!(*state, CircuitState::Open) {
483            let last_failure = self.last_failure_time.load(Ordering::Relaxed);
484            let now = SystemTime::now()
485                .duration_since(UNIX_EPOCH)
486                .unwrap()
487                .as_secs();
488
489            if now - last_failure > self.timeout.as_secs() {
490                println!("  Circuit transitioning to HALF-OPEN");
491                *state = CircuitState::HalfOpen;
492            } else {
493                return Err(CircuitBreakerError::Open);
494            }
495        }
496        drop(state);
497
498        // Execute the request
499        match f().await {
500            Ok(result) => {
501                {
502                    let mut state = self.state.write().await;
503                    if matches!(*state, CircuitState::HalfOpen) {
504                        println!("  Circuit transitioning to CLOSED");
505                        *state = CircuitState::Closed;
506                    }
507                }
508                self.failure_count.store(0, Ordering::Relaxed);
509                Ok(result)
510            }
511            Err(e) => {
512                let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
513                self.last_failure_time.store(
514                    SystemTime::now()
515                        .duration_since(UNIX_EPOCH)
516                        .unwrap()
517                        .as_secs(),
518                    Ordering::Relaxed,
519                );
520
521                {
522                    let mut state = self.state.write().await;
523                    if count >= self.threshold {
524                        println!("  Circuit transitioning to OPEN (failures: {})", count);
525                        *state = CircuitState::Open;
526                    }
527                }
528
529                Err(CircuitBreakerError::RequestFailed(e))
530            }
531        }
532    }
533}