1#![allow(clippy::uninlined_format_args)]
2use openai_ergonomic::{Client, Config, Error, Result};
16use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19use tokio::time::{sleep, timeout};
20
21#[tokio::main]
22async fn main() -> Result<()> {
23 println!("=== Retry and Resilience Patterns ===\n");
24
25 let client = Client::from_env()?.build();
27
28 println!("1. Simple Retry:");
30 simple_retry(&client).await?;
31
32 println!("\n2. Exponential Backoff:");
34 exponential_backoff(&client).await?;
35
36 println!("\n3. Retry with Jitter:");
38 retry_with_jitter(&client).await?;
39
40 println!("\n4. Circuit Breaker:");
42 circuit_breaker_example(&client).await?;
43
44 println!("\n5. Timeout Management:");
46 timeout_management(&client).await;
47
48 println!("\n6. Request Hedging:");
50 request_hedging(&client).await?;
51
52 println!("\n7. Fallback Chain:");
54 fallback_chain(&client).await?;
55
56 println!("\n8. Idempotency:");
58 idempotency_example(&client).await?;
59
60 Ok(())
61}
62
63async fn simple_retry(client: &Client) -> Result<()> {
64 const MAX_RETRIES: u32 = 3;
65
66 for attempt in 1..=MAX_RETRIES {
67 println!("Attempt {}/{}", attempt, MAX_RETRIES);
68
69 match client.send_chat(client.chat_simple("Hello")).await {
70 Ok(response) => {
71 if let Some(content) = response.content() {
72 println!("Success: {}", content);
73 } else {
74 println!("Success: (no content)");
75 }
76 return Ok(());
77 }
78 Err(e) if attempt < MAX_RETRIES => {
79 println!("Failed (attempt {}): {}. Retrying...", attempt, e);
80 sleep(Duration::from_secs(1)).await;
81 }
82 Err(e) => {
83 println!("All retries exhausted");
84 return Err(e);
85 }
86 }
87 }
88
89 Ok(())
90}
91
92async fn exponential_backoff(client: &Client) -> Result<()> {
93 const MAX_RETRIES: u32 = 5;
94 const BASE_DELAY: Duration = Duration::from_millis(100);
95 const MAX_DELAY: Duration = Duration::from_secs(32);
96
97 let mut delay = BASE_DELAY;
98
99 for attempt in 1..=MAX_RETRIES {
100 match client
101 .send_chat(client.chat_simple("Hello with backoff"))
102 .await
103 {
104 Ok(response) => {
105 if let Some(content) = response.content() {
106 println!("Success after {} attempts: {}", attempt, content);
107 } else {
108 println!("Success after {} attempts: (no content)", attempt);
109 }
110 return Ok(());
111 }
112 Err(Error::RateLimit(_message)) => {
113 let wait_time = delay;
115 println!(
116 "Rate limited (attempt {}). Waiting {:?}...",
117 attempt, wait_time
118 );
119 sleep(wait_time).await;
120
121 delay = (delay * 2).min(MAX_DELAY);
123 }
124 Err(e) if attempt < MAX_RETRIES => {
125 println!("Error (attempt {}): {}. Waiting {:?}...", attempt, e, delay);
126 sleep(delay).await;
127
128 delay = (delay * 2).min(MAX_DELAY);
130 }
131 Err(e) => return Err(e),
132 }
133 }
134
135 Ok(())
136}
137
138async fn retry_with_jitter(client: &Client) -> Result<()> {
139 const MAX_RETRIES: u32 = 5;
140 const BASE_DELAY_MS: u64 = 100;
141
142 for attempt in 1..=MAX_RETRIES {
143 match client
144 .send_chat(client.chat_simple("Hello with jitter"))
145 .await
146 {
147 Ok(response) => {
148 if let Some(content) = response.content() {
149 println!("Success: {}", content);
150 } else {
151 println!("Success: (no content)");
152 }
153 return Ok(());
154 }
155 Err(e) if attempt < MAX_RETRIES => {
156 let base = BASE_DELAY_MS * 2_u64.pow(attempt - 1);
158 let jitter = rand::random::<u64>() % (base / 2 + 1);
159 let delay = Duration::from_millis(base + jitter);
160
161 println!(
162 "Attempt {} failed: {}. Retrying in {:?} (with jitter)...",
163 attempt, e, delay
164 );
165 sleep(delay).await;
166 }
167 Err(e) => return Err(e),
168 }
169 }
170
171 Ok(())
172}
173
174async fn circuit_breaker_example(client: &Client) -> Result<()> {
175 let circuit_breaker = Arc::new(CircuitBreaker::new(3, Duration::from_secs(5)));
176
177 for i in 1..=10 {
178 println!("Request {}: ", i);
179
180 match circuit_breaker
182 .call(|| async {
183 client
184 .send_chat(client.chat_simple("Circuit breaker test"))
185 .await
186 })
187 .await
188 {
189 Ok(response) => {
190 if let Some(content) = response.content() {
191 println!(" Success: {}", content);
192 } else {
193 println!(" Success: (no content)");
194 }
195 }
196 Err(CircuitBreakerError::Open) => {
197 println!(" Circuit is OPEN - skipping request");
198 sleep(Duration::from_secs(1)).await;
199 }
200 Err(CircuitBreakerError::RequestFailed(e)) => {
201 println!(" Request failed: {}", e);
202 }
203 }
204
205 sleep(Duration::from_millis(500)).await;
207 }
208
209 Ok(())
210}
211
212async fn timeout_management(client: &Client) {
213 println!("Per-request timeout:");
215 match timeout(
216 Duration::from_secs(5),
217 client.send_chat(client.chat_simple("Hello")),
218 )
219 .await
220 {
221 Ok(Ok(response)) => {
222 if let Some(content) = response.content() {
223 println!("Response received: {}", content);
224 } else {
225 println!("Response received: (no content)");
226 }
227 }
228 Ok(Err(e)) => println!("API error: {}", e),
229 Err(_) => println!("Request timed out after 5 seconds"),
230 }
231
232 println!("\nDeadline-based timeout:");
234 let deadline = Instant::now() + Duration::from_secs(10);
235
236 while Instant::now() < deadline {
237 let remaining = deadline - Instant::now();
238 println!("Time remaining: {:?}", remaining);
239
240 match timeout(
241 remaining,
242 client.send_chat(client.chat_simple("Quick response")),
243 )
244 .await
245 {
246 Ok(Ok(response)) => {
247 if let Some(content) = response.content() {
248 println!("Got response: {}", content);
249 } else {
250 println!("Got response: (no content)");
251 }
252 break;
253 }
254 Ok(Err(e)) => {
255 println!("Error: {}. Retrying...", e);
256 sleep(Duration::from_secs(1)).await;
257 }
258 Err(_) => {
259 println!("Deadline exceeded");
260 break;
261 }
262 }
263 }
264
265 println!("\nAdaptive timeout:");
267 let mut adaptive_timeout = Duration::from_secs(2);
268
269 for _attempt in 1..=3 {
270 let start = Instant::now();
271
272 match timeout(
273 adaptive_timeout,
274 client.send_chat(client.chat_simple("Adaptive")),
275 )
276 .await
277 {
278 Ok(Ok(response)) => {
279 let elapsed = start.elapsed();
280 println!(
281 "Success in {:?}. Next timeout would be {:?}.",
282 elapsed,
283 elapsed * 2
284 );
285 if let Some(content) = response.content() {
288 println!("Response: {}", content);
289 } else {
290 println!("Response: (no content)");
291 }
292 break;
293 }
294 Ok(Err(e)) => println!("Error: {}", e),
295 Err(_) => {
296 println!(
297 "Timeout after {:?}. Increasing for next attempt.",
298 adaptive_timeout
299 );
300 adaptive_timeout *= 2;
301 }
302 }
303 }
304}
305
306async fn request_hedging(client: &Client) -> Result<()> {
307 use futures::future::{select, Either};
308 use std::pin::pin;
309
310 println!("Launching hedged requests...");
311
312 let request1 = async {
314 println!("Request 1 started");
315 client
316 .send_chat(client.chat_simple("Hedged request 1"))
317 .await
318 };
319
320 let request2 = async {
321 sleep(Duration::from_millis(200)).await;
322 println!("Request 2 started (200ms delay)");
323 client
324 .send_chat(client.chat_simple("Hedged request 2"))
325 .await
326 };
327
328 let fut1 = pin!(request1);
329 let fut2 = pin!(request2);
330
331 match select(fut1, fut2).await {
333 Either::Left((result, _)) => {
334 println!("Request 1 won the race");
335 result.map(|r| {
336 if let Some(content) = r.content() {
337 println!("Result: {}", content);
338 } else {
339 println!("Result: (no content)");
340 }
341 })
342 }
343 Either::Right((result, _)) => {
344 println!("Request 2 won the race");
345 result.map(|r| {
346 if let Some(content) = r.content() {
347 println!("Result: {}", content);
348 } else {
349 println!("Result: (no content)");
350 }
351 })
352 }
353 }
354}
355
356async fn fallback_chain(client: &Client) -> Result<()> {
357 let strategies = vec![
359 ("GPT-4o", "gpt-4o", 1024),
360 ("GPT-4o-mini", "gpt-4o-mini", 512),
361 ("GPT-3.5", "gpt-3.5-turbo", 256),
362 ];
363
364 let prompt = "Explain quantum computing";
365
366 for (name, _model, max_tokens) in strategies {
367 println!("Trying {} (max_tokens: {})", name, max_tokens);
368
369 let builder = client.chat().user(prompt).max_completion_tokens(max_tokens);
370 match client.send_chat(builder).await {
371 Ok(response) => {
372 println!("Success with {}", name);
373 if let Some(content) = response.content() {
374 println!("Response: {}...", &content[..content.len().min(100)]);
375 }
376 return Ok(());
377 }
378 Err(e) => {
379 println!("Failed with {}: {}", name, e);
380 }
381 }
382 }
383
384 println!("All fallback strategies exhausted");
385 Ok(())
386}
387
388async fn idempotency_example(_client: &Client) -> Result<()> {
389 let idempotency_key = generate_idempotency_key();
391 println!("Using idempotency key: {}", idempotency_key);
392
393 for attempt in 1..=3 {
395 println!("\nAttempt {} with same idempotency key", attempt);
396
397 let mut headers = std::collections::HashMap::new();
399 headers.insert("Idempotency-Key".to_string(), idempotency_key.clone());
400 println!(" Would send {} headers", headers.len());
401
402 let config = Config::builder()
403 .api_key(std::env::var("OPENAI_API_KEY").unwrap_or_default())
404 .build();
405
406 let client_with_idempotency = Client::builder(config)?.build();
409
410 match client_with_idempotency
411 .send_chat(client_with_idempotency.chat_simple("Idempotent request"))
412 .await
413 {
414 Ok(response) => {
415 if let Some(content) = response.content() {
416 println!("Response: {}", content);
417 } else {
418 println!("Response: (no content)");
419 }
420 }
422 Err(e) => println!("Error: {}", e),
423 }
424
425 if attempt < 3 {
426 sleep(Duration::from_secs(1)).await;
427 }
428 }
429
430 Ok(())
431}
432
433fn generate_idempotency_key() -> String {
434 let timestamp = SystemTime::now()
435 .duration_since(UNIX_EPOCH)
436 .unwrap()
437 .as_secs();
438 let random: u32 = rand::random();
439 format!("req-{}-{}", timestamp, random)
440}
441
442#[derive(Debug)]
444enum CircuitState {
445 Closed,
446 Open,
447 HalfOpen,
448}
449
450struct CircuitBreaker {
451 state: Arc<tokio::sync::RwLock<CircuitState>>,
452 failure_count: Arc<AtomicU32>,
453 last_failure_time: Arc<AtomicU64>,
454 threshold: u32,
455 timeout: Duration,
456}
457
458#[derive(Debug)]
459enum CircuitBreakerError {
460 Open,
461 RequestFailed(Error),
462}
463
464impl CircuitBreaker {
465 fn new(threshold: u32, timeout: Duration) -> Self {
466 Self {
467 state: Arc::new(tokio::sync::RwLock::new(CircuitState::Closed)),
468 failure_count: Arc::new(AtomicU32::new(0)),
469 last_failure_time: Arc::new(AtomicU64::new(0)),
470 threshold,
471 timeout,
472 }
473 }
474
475 async fn call<F, Fut, T>(&self, f: F) -> std::result::Result<T, CircuitBreakerError>
476 where
477 F: FnOnce() -> Fut,
478 Fut: std::future::Future<Output = Result<T>>,
479 {
480 let mut state = self.state.write().await;
482 if matches!(*state, CircuitState::Open) {
483 let last_failure = self.last_failure_time.load(Ordering::Relaxed);
484 let now = SystemTime::now()
485 .duration_since(UNIX_EPOCH)
486 .unwrap()
487 .as_secs();
488
489 if now - last_failure > self.timeout.as_secs() {
490 println!(" Circuit transitioning to HALF-OPEN");
491 *state = CircuitState::HalfOpen;
492 } else {
493 return Err(CircuitBreakerError::Open);
494 }
495 }
496 drop(state);
497
498 match f().await {
500 Ok(result) => {
501 {
502 let mut state = self.state.write().await;
503 if matches!(*state, CircuitState::HalfOpen) {
504 println!(" Circuit transitioning to CLOSED");
505 *state = CircuitState::Closed;
506 }
507 }
508 self.failure_count.store(0, Ordering::Relaxed);
509 Ok(result)
510 }
511 Err(e) => {
512 let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
513 self.last_failure_time.store(
514 SystemTime::now()
515 .duration_since(UNIX_EPOCH)
516 .unwrap()
517 .as_secs(),
518 Ordering::Relaxed,
519 );
520
521 {
522 let mut state = self.state.write().await;
523 if count >= self.threshold {
524 println!(" Circuit transitioning to OPEN (failures: {})", count);
525 *state = CircuitState::Open;
526 }
527 }
528
529 Err(CircuitBreakerError::RequestFailed(e))
530 }
531 }
532 }
533}