1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Arc;
3use std::time::Duration;
4
5use backoff::{backoff::Backoff, ExponentialBackoff};
6use reqwest::{Client, RequestBuilder, Response};
7use tokio::time::timeout;
8use tracing::{debug, info, instrument, warn};
9
10use crate::error::{OdosError, Result};
11
12#[derive(Debug, Clone)]
14pub struct ClientConfig {
15 pub timeout: Duration,
17 pub connect_timeout: Duration,
19 pub max_retries: u32,
21 pub initial_retry_delay: Duration,
23 pub max_retry_delay: Duration,
25 pub circuit_breaker_threshold: u32,
27 pub circuit_breaker_reset_timeout: Duration,
29 pub max_connections: usize,
31 pub pool_idle_timeout: Duration,
33}
34
35impl Default for ClientConfig {
36 fn default() -> Self {
37 Self {
38 timeout: Duration::from_secs(30),
39 connect_timeout: Duration::from_secs(10),
40 max_retries: 3,
41 initial_retry_delay: Duration::from_millis(100),
42 max_retry_delay: Duration::from_secs(5),
43 circuit_breaker_threshold: 5,
44 circuit_breaker_reset_timeout: Duration::from_secs(60),
45 max_connections: 20,
46 pool_idle_timeout: Duration::from_secs(90),
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq)]
53enum CircuitBreakerState {
54 Closed,
55 Open,
56 HalfOpen,
57}
58
59#[derive(Debug, Clone)]
61pub struct OdosHttpClient {
62 client: Client,
63 config: ClientConfig,
64 circuit_breaker: Arc<CircuitBreaker>,
65}
66
67#[derive(Debug)]
68struct CircuitBreaker {
69 state: std::sync::RwLock<CircuitBreakerState>,
70 failure_count: AtomicU64,
71 last_failure_time: std::sync::RwLock<Option<std::time::Instant>>,
72 config: ClientConfig,
73}
74
75impl CircuitBreaker {
76 fn new(config: ClientConfig) -> Self {
77 Self {
78 state: std::sync::RwLock::new(CircuitBreakerState::Closed),
79 failure_count: AtomicU64::new(0),
80 last_failure_time: std::sync::RwLock::new(None),
81 config,
82 }
83 }
84
85 fn can_execute(&self) -> Result<()> {
86 let state = *self.state.read().unwrap();
87 match state {
88 CircuitBreakerState::Closed => Ok(()),
89 CircuitBreakerState::Open => {
90 if let Some(last_failure) = *self.last_failure_time.read().unwrap() {
92 if last_failure.elapsed() > self.config.circuit_breaker_reset_timeout {
93 *self.state.write().unwrap() = CircuitBreakerState::HalfOpen;
94 info!("Circuit breaker transitioning to half-open state");
95 Ok(())
96 } else {
97 Err(OdosError::circuit_breaker_error("Circuit breaker is open"))
98 }
99 } else {
100 Ok(())
101 }
102 }
103 CircuitBreakerState::HalfOpen => Ok(()),
104 }
105 }
106
107 fn record_success(&self) {
108 let current_state = *self.state.read().unwrap();
109 match current_state {
110 CircuitBreakerState::HalfOpen => {
111 *self.state.write().unwrap() = CircuitBreakerState::Closed;
112 self.failure_count.store(0, Ordering::SeqCst);
113 info!("Circuit breaker closed after successful request");
114 }
115 CircuitBreakerState::Closed => {
116 self.failure_count.store(0, Ordering::SeqCst);
118 }
119 CircuitBreakerState::Open => {
120 warn!("Recorded success while circuit breaker is open");
122 }
123 }
124 }
125
126 fn record_failure(&self) {
127 let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
128 *self.last_failure_time.write().unwrap() = Some(std::time::Instant::now());
129
130 if failure_count >= self.config.circuit_breaker_threshold as u64 {
131 *self.state.write().unwrap() = CircuitBreakerState::Open;
132 warn!("Circuit breaker opened after {} failures", failure_count);
133 }
134 }
135}
136
137impl OdosHttpClient {
138 pub fn new() -> Result<Self> {
140 Self::with_config(ClientConfig::default())
141 }
142
143 pub fn with_config(config: ClientConfig) -> Result<Self> {
145 let client = Client::builder()
146 .timeout(config.timeout)
147 .connect_timeout(config.connect_timeout)
148 .pool_max_idle_per_host(config.max_connections)
149 .pool_idle_timeout(config.pool_idle_timeout)
150 .build()
151 .map_err(OdosError::Http)?;
152
153 Ok(Self {
154 client,
155 config: config.clone(),
156 circuit_breaker: Arc::new(CircuitBreaker::new(config)),
157 })
158 }
159
160 #[instrument(skip(self, request_builder_fn), level = "debug")]
162 pub async fn execute_with_retry<F>(&self, request_builder_fn: F) -> Result<Response>
163 where
164 F: Fn() -> RequestBuilder + Clone,
165 {
166 self.circuit_breaker.can_execute()?;
168
169 let mut backoff = ExponentialBackoff {
171 initial_interval: self.config.initial_retry_delay,
172 max_interval: self.config.max_retry_delay,
173 max_elapsed_time: Some(self.config.timeout),
174 ..Default::default()
175 };
176
177 let mut attempt = 0;
178 let mut last_error = None;
179
180 loop {
181 if attempt >= self.config.max_retries {
182 break;
183 }
184
185 attempt += 1;
186 debug!(attempt = attempt, "Executing HTTP request");
187
188 let request = request_builder_fn().build().map_err(OdosError::Http)?;
190 let request_timeout = timeout(self.config.timeout, self.client.execute(request));
191
192 match request_timeout.await {
193 Ok(Ok(response)) => {
194 if response.status().is_success() {
196 debug!(attempt = attempt, status = %response.status(), "Request successful");
197 self.circuit_breaker.record_success();
198 return Ok(response);
199 } else {
200 let status = response.status();
202 let error_text = response
203 .text()
204 .await
205 .unwrap_or_else(|_| "Unknown error".to_string());
206 let error = OdosError::api_error(status, error_text);
207
208 if !error.is_retryable() {
209 self.circuit_breaker.record_failure();
210 return Err(error);
211 }
212
213 warn!(
214 attempt = attempt,
215 status = %status,
216 "Request failed with retryable error, retrying"
217 );
218 last_error = Some(error);
219 }
220 }
221 Ok(Err(reqwest_error)) => {
222 let error = OdosError::Http(reqwest_error);
223 if !error.is_retryable() {
224 self.circuit_breaker.record_failure();
225 return Err(error);
226 }
227
228 warn!(
229 attempt = attempt,
230 error = %error,
231 "Request failed with retryable error, retrying"
232 );
233 last_error = Some(error);
234 }
235 Err(_) => {
236 let error = OdosError::timeout_error("Request timed out");
238 warn!(
239 attempt = attempt,
240 timeout = ?self.config.timeout,
241 "Request timed out, retrying"
242 );
243 last_error = Some(error);
244 }
245 }
246
247 if attempt < self.config.max_retries {
249 if let Some(delay) = backoff.next_backoff() {
250 debug!(delay = ?delay, "Waiting before retry");
251 tokio::time::sleep(delay).await;
252 } else {
253 break; }
255 }
256 }
257
258 self.circuit_breaker.record_failure();
260 Err(last_error.unwrap_or_else(|| OdosError::internal_error("All retry attempts failed")))
261 }
262
263 pub fn inner(&self) -> &Client {
265 &self.client
266 }
267
268 pub fn config(&self) -> &ClientConfig {
270 &self.config
271 }
272
273 pub fn circuit_breaker_status(&self) -> String {
275 let state = *self.circuit_breaker.state.read().unwrap();
276 let failure_count = self.circuit_breaker.failure_count.load(Ordering::SeqCst);
277 format!("State: {state:?}, Failures: {failure_count}")
278 }
279}
280
281impl Default for OdosHttpClient {
282 fn default() -> Self {
283 Self::new().expect("Failed to create default HTTP client")
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use std::time::Duration;
291
292 #[test]
293 fn test_client_config_default() {
294 let config = ClientConfig::default();
295 assert_eq!(config.timeout, Duration::from_secs(30));
296 assert_eq!(config.max_retries, 3);
297 assert_eq!(config.circuit_breaker_threshold, 5);
298 }
299
300 #[test]
301 fn test_circuit_breaker_creation() {
302 let config = ClientConfig::default();
303 let cb = CircuitBreaker::new(config);
304 assert_eq!(cb.failure_count.load(Ordering::SeqCst), 0);
305 assert_eq!(*cb.state.read().unwrap(), CircuitBreakerState::Closed);
306 }
307
308 #[test]
309 fn test_circuit_breaker_can_execute() {
310 let config = ClientConfig::default();
311 let cb = CircuitBreaker::new(config);
312 assert!(cb.can_execute().is_ok());
313 }
314
315 #[test]
316 fn test_circuit_breaker_record_success() {
317 let config = ClientConfig::default();
318 let cb = CircuitBreaker::new(config);
319 cb.record_success();
320 assert_eq!(cb.failure_count.load(Ordering::SeqCst), 0);
321 }
322
323 #[test]
324 fn test_circuit_breaker_record_failure() {
325 let config = ClientConfig {
326 circuit_breaker_threshold: 2,
327 ..Default::default()
328 };
329 let cb = CircuitBreaker::new(config);
330
331 cb.record_failure();
333 assert_eq!(cb.failure_count.load(Ordering::SeqCst), 1);
334 assert_eq!(*cb.state.read().unwrap(), CircuitBreakerState::Closed);
335
336 cb.record_failure();
338 assert_eq!(cb.failure_count.load(Ordering::SeqCst), 2);
339 assert_eq!(*cb.state.read().unwrap(), CircuitBreakerState::Open);
340 }
341
342 #[tokio::test]
343 async fn test_client_creation() {
344 let client = OdosHttpClient::new();
345 assert!(client.is_ok());
346 }
347
348 #[tokio::test]
349 async fn test_client_with_custom_config() {
350 let config = ClientConfig {
351 timeout: Duration::from_secs(60),
352 max_retries: 5,
353 ..Default::default()
354 };
355 let client = OdosHttpClient::with_config(config.clone());
356 assert!(client.is_ok());
357
358 let client = client.unwrap();
359 assert_eq!(client.config().timeout, Duration::from_secs(60));
360 assert_eq!(client.config().max_retries, 5);
361 }
362}