odos_sdk/
client.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use backoff::{backoff::Backoff, ExponentialBackoff};
6use reqwest::{Client, RequestBuilder, Response};
7use tokio::time::timeout;
8use tracing::{debug, info, instrument, warn};
9
10use crate::error::{OdosError, Result};
11
12/// Configuration for the HTTP client
13#[derive(Debug, Clone)]
14pub struct ClientConfig {
15    /// Request timeout duration
16    pub timeout: Duration,
17    /// Connection timeout duration
18    pub connect_timeout: Duration,
19    /// Maximum number of retry attempts
20    pub max_retries: u32,
21    /// Initial retry delay
22    pub initial_retry_delay: Duration,
23    /// Maximum retry delay
24    pub max_retry_delay: Duration,
25    /// Circuit breaker failure threshold
26    pub circuit_breaker_threshold: u32,
27    /// Circuit breaker reset timeout
28    pub circuit_breaker_reset_timeout: Duration,
29    /// Maximum concurrent connections
30    pub max_connections: usize,
31    /// Connection pool idle timeout
32    pub pool_idle_timeout: Duration,
33}
34
35impl Default for ClientConfig {
36    fn default() -> Self {
37        Self {
38            timeout: Duration::from_secs(30),
39            connect_timeout: Duration::from_secs(10),
40            max_retries: 3,
41            initial_retry_delay: Duration::from_millis(100),
42            max_retry_delay: Duration::from_secs(5),
43            circuit_breaker_threshold: 5,
44            circuit_breaker_reset_timeout: Duration::from_secs(60),
45            max_connections: 20,
46            pool_idle_timeout: Duration::from_secs(90),
47        }
48    }
49}
50
51/// Circuit breaker state
52#[derive(Debug, Clone, Copy, PartialEq)]
53enum CircuitBreakerState {
54    Closed,
55    Open,
56    HalfOpen,
57}
58
59/// Enhanced HTTP client with retry logic, timeouts, and circuit breaker
60#[derive(Debug, Clone)]
61pub struct OdosHttpClient {
62    client: Client,
63    config: ClientConfig,
64    circuit_breaker: Arc<CircuitBreaker>,
65}
66
67#[derive(Debug)]
68struct CircuitBreaker {
69    state: std::sync::RwLock<CircuitBreakerState>,
70    failure_count: AtomicU64,
71    last_failure_time: std::sync::RwLock<Option<std::time::Instant>>,
72    config: ClientConfig,
73}
74
75impl CircuitBreaker {
76    fn new(config: ClientConfig) -> Self {
77        Self {
78            state: std::sync::RwLock::new(CircuitBreakerState::Closed),
79            failure_count: AtomicU64::new(0),
80            last_failure_time: std::sync::RwLock::new(None),
81            config,
82        }
83    }
84
85    fn can_execute(&self) -> Result<()> {
86        let state = *self.state.read().unwrap();
87        match state {
88            CircuitBreakerState::Closed => Ok(()),
89            CircuitBreakerState::Open => {
90                // Check if we should transition to half-open
91                if let Some(last_failure) = *self.last_failure_time.read().unwrap() {
92                    if last_failure.elapsed() > self.config.circuit_breaker_reset_timeout {
93                        *self.state.write().unwrap() = CircuitBreakerState::HalfOpen;
94                        info!("Circuit breaker transitioning to half-open state");
95                        Ok(())
96                    } else {
97                        Err(OdosError::circuit_breaker_error("Circuit breaker is open"))
98                    }
99                } else {
100                    Ok(())
101                }
102            }
103            CircuitBreakerState::HalfOpen => Ok(()),
104        }
105    }
106
107    fn record_success(&self) {
108        let current_state = *self.state.read().unwrap();
109        match current_state {
110            CircuitBreakerState::HalfOpen => {
111                *self.state.write().unwrap() = CircuitBreakerState::Closed;
112                self.failure_count.store(0, Ordering::SeqCst);
113                info!("Circuit breaker closed after successful request");
114            }
115            CircuitBreakerState::Closed => {
116                // Reset failure count on successful request
117                self.failure_count.store(0, Ordering::SeqCst);
118            }
119            CircuitBreakerState::Open => {
120                // Should not happen, but handle gracefully
121                warn!("Recorded success while circuit breaker is open");
122            }
123        }
124    }
125
126    fn record_failure(&self) {
127        let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
128        *self.last_failure_time.write().unwrap() = Some(std::time::Instant::now());
129
130        if failure_count >= self.config.circuit_breaker_threshold as u64 {
131            *self.state.write().unwrap() = CircuitBreakerState::Open;
132            warn!("Circuit breaker opened after {} failures", failure_count);
133        }
134    }
135}
136
137impl OdosHttpClient {
138    /// Create a new HTTP client with default configuration
139    pub fn new() -> Result<Self> {
140        Self::with_config(ClientConfig::default())
141    }
142
143    /// Create a new HTTP client with custom configuration
144    pub fn with_config(config: ClientConfig) -> Result<Self> {
145        let client = Client::builder()
146            .timeout(config.timeout)
147            .connect_timeout(config.connect_timeout)
148            .pool_max_idle_per_host(config.max_connections)
149            .pool_idle_timeout(config.pool_idle_timeout)
150            .build()
151            .map_err(OdosError::Http)?;
152
153        Ok(Self {
154            client,
155            config: config.clone(),
156            circuit_breaker: Arc::new(CircuitBreaker::new(config)),
157        })
158    }
159
160    /// Execute a request with retry logic and circuit breaker
161    #[instrument(skip(self, request_builder_fn), level = "debug")]
162    pub async fn execute_with_retry<F>(&self, request_builder_fn: F) -> Result<Response>
163    where
164        F: Fn() -> RequestBuilder + Clone,
165    {
166        // Check circuit breaker
167        self.circuit_breaker.can_execute()?;
168
169        // Configure backoff strategy
170        let mut backoff = ExponentialBackoff {
171            initial_interval: self.config.initial_retry_delay,
172            max_interval: self.config.max_retry_delay,
173            max_elapsed_time: Some(self.config.timeout),
174            ..Default::default()
175        };
176
177        let mut attempt = 0;
178        let mut last_error = None;
179
180        loop {
181            if attempt >= self.config.max_retries {
182                break;
183            }
184
185            attempt += 1;
186            debug!(attempt = attempt, "Executing HTTP request");
187
188            // Execute request with timeout
189            let request = request_builder_fn().build().map_err(OdosError::Http)?;
190            let request_timeout = timeout(self.config.timeout, self.client.execute(request));
191
192            match request_timeout.await {
193                Ok(Ok(response)) => {
194                    // Check if response indicates success
195                    if response.status().is_success() {
196                        debug!(attempt = attempt, status = %response.status(), "Request successful");
197                        self.circuit_breaker.record_success();
198                        return Ok(response);
199                    } else {
200                        // API error - check if retryable
201                        let status = response.status();
202                        let error_text = response
203                            .text()
204                            .await
205                            .unwrap_or_else(|_| "Unknown error".to_string());
206                        let error = OdosError::api_error(status, error_text);
207
208                        if !error.is_retryable() {
209                            self.circuit_breaker.record_failure();
210                            return Err(error);
211                        }
212
213                        warn!(
214                            attempt = attempt,
215                            status = %status,
216                            "Request failed with retryable error, retrying"
217                        );
218                        last_error = Some(error);
219                    }
220                }
221                Ok(Err(reqwest_error)) => {
222                    let error = OdosError::Http(reqwest_error);
223                    if !error.is_retryable() {
224                        self.circuit_breaker.record_failure();
225                        return Err(error);
226                    }
227
228                    warn!(
229                        attempt = attempt,
230                        error = %error,
231                        "Request failed with retryable error, retrying"
232                    );
233                    last_error = Some(error);
234                }
235                Err(_) => {
236                    // Timeout
237                    let error = OdosError::timeout_error("Request timed out");
238                    warn!(
239                        attempt = attempt,
240                        timeout = ?self.config.timeout,
241                        "Request timed out, retrying"
242                    );
243                    last_error = Some(error);
244                }
245            }
246
247            // Wait before retry
248            if attempt < self.config.max_retries {
249                if let Some(delay) = backoff.next_backoff() {
250                    debug!(delay = ?delay, "Waiting before retry");
251                    tokio::time::sleep(delay).await;
252                } else {
253                    break; // Backoff expired
254                }
255            }
256        }
257
258        // All retries exhausted
259        self.circuit_breaker.record_failure();
260        Err(last_error.unwrap_or_else(|| OdosError::internal_error("All retry attempts failed")))
261    }
262
263    /// Get a reference to the underlying reqwest client
264    pub fn inner(&self) -> &Client {
265        &self.client
266    }
267
268    /// Get the client configuration
269    pub fn config(&self) -> &ClientConfig {
270        &self.config
271    }
272
273    /// Get circuit breaker status
274    pub fn circuit_breaker_status(&self) -> String {
275        let state = *self.circuit_breaker.state.read().unwrap();
276        let failure_count = self.circuit_breaker.failure_count.load(Ordering::SeqCst);
277        format!("State: {state:?}, Failures: {failure_count}")
278    }
279}
280
281impl Default for OdosHttpClient {
282    fn default() -> Self {
283        Self::new().expect("Failed to create default HTTP client")
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use std::time::Duration;
291
292    #[test]
293    fn test_client_config_default() {
294        let config = ClientConfig::default();
295        assert_eq!(config.timeout, Duration::from_secs(30));
296        assert_eq!(config.max_retries, 3);
297        assert_eq!(config.circuit_breaker_threshold, 5);
298    }
299
300    #[test]
301    fn test_circuit_breaker_creation() {
302        let config = ClientConfig::default();
303        let cb = CircuitBreaker::new(config);
304        assert_eq!(cb.failure_count.load(Ordering::SeqCst), 0);
305        assert_eq!(*cb.state.read().unwrap(), CircuitBreakerState::Closed);
306    }
307
308    #[test]
309    fn test_circuit_breaker_can_execute() {
310        let config = ClientConfig::default();
311        let cb = CircuitBreaker::new(config);
312        assert!(cb.can_execute().is_ok());
313    }
314
315    #[test]
316    fn test_circuit_breaker_record_success() {
317        let config = ClientConfig::default();
318        let cb = CircuitBreaker::new(config);
319        cb.record_success();
320        assert_eq!(cb.failure_count.load(Ordering::SeqCst), 0);
321    }
322
323    #[test]
324    fn test_circuit_breaker_record_failure() {
325        let config = ClientConfig {
326            circuit_breaker_threshold: 2,
327            ..Default::default()
328        };
329        let cb = CircuitBreaker::new(config);
330
331        // First failure
332        cb.record_failure();
333        assert_eq!(cb.failure_count.load(Ordering::SeqCst), 1);
334        assert_eq!(*cb.state.read().unwrap(), CircuitBreakerState::Closed);
335
336        // Second failure should open circuit
337        cb.record_failure();
338        assert_eq!(cb.failure_count.load(Ordering::SeqCst), 2);
339        assert_eq!(*cb.state.read().unwrap(), CircuitBreakerState::Open);
340    }
341
342    #[tokio::test]
343    async fn test_client_creation() {
344        let client = OdosHttpClient::new();
345        assert!(client.is_ok());
346    }
347
348    #[tokio::test]
349    async fn test_client_with_custom_config() {
350        let config = ClientConfig {
351            timeout: Duration::from_secs(60),
352            max_retries: 5,
353            ..Default::default()
354        };
355        let client = OdosHttpClient::with_config(config.clone());
356        assert!(client.is_ok());
357
358        let client = client.unwrap();
359        assert_eq!(client.config().timeout, Duration::from_secs(60));
360        assert_eq!(client.config().max_retries, 5);
361    }
362}