1mod retry_strategy;
8pub use retry_strategy::{RetryBackoffType, RetryStrategy};
9
10use bytes::Bytes;
11use http::HeaderMap;
12use libdd_capabilities::{HttpClientCapability, HttpError, SleepCapability};
13use libdd_common::Endpoint;
14use std::time::Duration;
15use tracing::{debug, error};
16
17pub type Attempts = u32;
18
19pub type SendWithRetryResult = Result<(http::Response<Bytes>, Attempts), SendWithRetryError>;
20
21#[derive(Debug)]
23pub enum SendWithRetryError {
24 Http(http::Response<Bytes>, Attempts),
26 Timeout(Attempts),
28 Network(HttpError, Attempts),
30 ResponseBody(Attempts),
32 Build(Attempts),
34}
35
36impl std::fmt::Display for SendWithRetryError {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 SendWithRetryError::Http(_, _) => write!(f, "Http error code received"),
40 SendWithRetryError::Timeout(_) => write!(f, "Request timed out"),
41 SendWithRetryError::Network(error, _) => write!(f, "Network error: {error}"),
42 SendWithRetryError::ResponseBody(_) => write!(f, "Failed to read response body"),
43 SendWithRetryError::Build(_) => {
44 write!(f, "Failed to build request due to invalid property")
45 }
46 }
47 }
48}
49
50impl std::error::Error for SendWithRetryError {}
51
52pub async fn send_with_retry<C: HttpClientCapability + SleepCapability>(
90 capabilities: &C,
91 target: &Endpoint,
92 payload: Vec<u8>,
93 headers: &HeaderMap,
94 retry_strategy: &RetryStrategy,
95) -> SendWithRetryResult {
96 let mut request_attempt = 0;
97 let timeout = Duration::from_millis(target.timeout_ms);
98
99 debug!(
100 url = %target.url,
101 payload_size = payload.len(),
102 max_retries = retry_strategy.max_retries(),
103 "Sending with retry"
104 );
105
106 let payload = Bytes::from(payload);
107 loop {
108 request_attempt += 1;
109
110 debug!(
111 attempt = request_attempt,
112 max_retries = retry_strategy.max_retries(),
113 "Attempting request"
114 );
115
116 let mut builder = http::Request::builder()
117 .method(http::Method::POST)
118 .uri(target.url.clone());
119 builder =
120 target.set_standard_headers(builder, concat!("Tracer/", env!("CARGO_PKG_VERSION")));
121 for (key, value) in headers {
122 builder = builder.header(key, value);
123 }
124 let req = match builder.body(payload.clone()) {
125 Ok(r) => r,
126 Err(_) => {
127 return Err(SendWithRetryError::Build(request_attempt));
128 }
129 };
130
131 let result = tokio::select! {
132 biased;
133 r = capabilities.request(req) => Ok(r),
134 _ = capabilities.sleep(timeout) => Err(()),
135 };
136
137 match result {
138 Ok(Ok(response)) => {
139 let status = response.status();
140 debug!(
141 status = status.as_u16(),
142 attempt = request_attempt,
143 "Received response"
144 );
145
146 if status.is_client_error() || status.is_server_error() {
147 debug!(
148 status = status.as_u16(),
149 attempt = request_attempt,
150 max_retries = retry_strategy.max_retries(),
151 "Received error status code"
152 );
153
154 if request_attempt < retry_strategy.max_retries() {
155 debug!(
156 attempt = request_attempt,
157 remaining_retries = retry_strategy.max_retries() - request_attempt,
158 "Retrying after error status code"
159 );
160 retry_strategy.delay(request_attempt, capabilities).await;
161 continue;
162 } else {
163 error!(
164 status = status.as_u16(),
165 attempts = request_attempt,
166 "Max retries exceeded, returning HTTP error"
167 );
168 return Err(SendWithRetryError::Http(response, request_attempt));
169 }
170 } else {
171 debug!(
172 status = status.as_u16(),
173 attempts = request_attempt,
174 "Request succeeded"
175 );
176 return Ok((response, request_attempt));
177 }
178 }
179 Ok(Err(e)) => {
180 debug!(
181 error = ?e,
182 attempt = request_attempt,
183 max_retries = retry_strategy.max_retries(),
184 "Request failed with error"
185 );
186
187 if request_attempt < retry_strategy.max_retries() {
188 debug!(
189 attempt = request_attempt,
190 remaining_retries = retry_strategy.max_retries() - request_attempt,
191 "Retrying after request error"
192 );
193 retry_strategy.delay(request_attempt, capabilities).await;
194 continue;
195 } else {
196 let classified_error = match e {
197 HttpError::Timeout => SendWithRetryError::Timeout(request_attempt),
198 HttpError::InvalidRequest(_) => SendWithRetryError::Build(request_attempt),
199 HttpError::ResponseBody(_) => {
200 SendWithRetryError::ResponseBody(request_attempt)
201 }
202 other => SendWithRetryError::Network(other, request_attempt),
203 };
204 error!(
205 error = ?classified_error,
206 attempts = request_attempt,
207 "Max retries exceeded, returning request error"
208 );
209 return Err(classified_error);
210 }
211 }
212 Err(_) => {
213 debug!(
214 attempt = request_attempt,
215 max_retries = retry_strategy.max_retries(),
216 "Request timed out"
217 );
218
219 if request_attempt < retry_strategy.max_retries() {
220 debug!(
221 attempt = request_attempt,
222 remaining_retries = retry_strategy.max_retries() - request_attempt,
223 "Retrying after timeout"
224 );
225 retry_strategy.delay(request_attempt, capabilities).await;
226 continue;
227 } else {
228 error!(
229 attempts = request_attempt,
230 "Max retries exceeded, returning timeout error"
231 );
232 return Err(SendWithRetryError::Timeout(request_attempt));
233 }
234 }
235 }
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use crate::test_utils::poll_for_mock_hit;
243 use httpmock::MockServer;
244 use libdd_capabilities::HttpClientCapability;
245 use libdd_capabilities_impl::NativeCapabilities;
246
247 #[cfg_attr(miri, ignore)]
248 #[tokio::test]
249 async fn test_zero_retries_on_error() {
250 let server = MockServer::start();
251
252 let mut mock_503 = server
253 .mock_async(|_when, then| {
254 then.status(503)
255 .header("content-type", "application/json")
256 .body(r#"{"status":"error"}"#);
257 })
258 .await;
259
260 let _mock_202 = server
261 .mock_async(|_when, then| {
262 then.status(202)
263 .header("content-type", "application/json")
264 .body(r#"{"status":"ok"}"#);
265 })
266 .await;
267
268 let target_endpoint = Endpoint {
269 url: server.url("").to_owned().parse().unwrap(),
270 api_key: Some("test-key".into()),
271 ..Default::default()
272 };
273
274 let strategy = RetryStrategy::new(0, 2, RetryBackoffType::Constant, None);
275 let capabilities = NativeCapabilities::new_client();
276
277 tokio::spawn(async move {
278 let result = send_with_retry(
279 &capabilities,
280 &target_endpoint,
281 vec![0, 1, 2, 3],
282 &HeaderMap::new(),
283 &strategy,
284 )
285 .await;
286 assert!(result.is_err(), "Expected an error result");
287 assert!(
288 matches!(result.unwrap_err(), SendWithRetryError::Http(_, 1)),
289 "Expected an http error with one attempt"
290 );
291 });
292
293 assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await);
294 }
295
296 #[cfg_attr(miri, ignore)]
297 #[tokio::test]
298 async fn test_retry_logic_error_then_success() {
299 let server = MockServer::start();
300
301 let mut mock_503 = server
302 .mock_async(|_when, then| {
303 then.status(503)
304 .header("content-type", "application/json")
305 .body(r#"{"status":"error"}"#);
306 })
307 .await;
308
309 let mut mock_202 = server
310 .mock_async(|_when, then| {
311 then.status(202)
312 .header("content-type", "application/json")
313 .body(r#"{"status":"ok"}"#);
314 })
315 .await;
316
317 let target_endpoint = Endpoint {
318 url: server.url("").to_owned().parse().unwrap(),
319 api_key: Some("test-key".into()),
320 ..Default::default()
321 };
322
323 let strategy = RetryStrategy::new(2, 250, RetryBackoffType::Constant, None);
324 let capabilities = NativeCapabilities::new_client();
325
326 tokio::spawn(async move {
327 let result = send_with_retry(
328 &capabilities,
329 &target_endpoint,
330 vec![0, 1, 2, 3],
331 &HeaderMap::new(),
332 &strategy,
333 )
334 .await;
335 assert!(
336 matches!(result.unwrap(), (_, 2)),
337 "Expected an ok result after two attempts"
338 );
339 });
340
341 assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await);
342 assert!(
343 poll_for_mock_hit(&mut mock_202, 10, 100, 1, true).await,
344 "Expected a retry request after a 5xx error"
345 );
346 }
347
348 #[cfg_attr(miri, ignore)]
349 #[tokio::test]
350 async fn test_retry_logic_max_errors() {
351 let server = MockServer::start();
352 let expected_retry_attempts = 3;
353 let mut mock_503 = server
354 .mock_async(|_when, then| {
355 then.status(503)
356 .header("content-type", "application/json")
357 .body(r#"{"status":"error"}"#);
358 })
359 .await;
360
361 let target_endpoint = Endpoint {
362 url: server.url("").to_owned().parse().unwrap(),
363 api_key: Some("test-key".into()),
364 ..Default::default()
365 };
366
367 let strategy = RetryStrategy::new(
368 expected_retry_attempts,
369 10,
370 RetryBackoffType::Constant,
371 None,
372 );
373 let capabilities = NativeCapabilities::new_client();
374
375 tokio::spawn(async move {
376 let result = send_with_retry(
377 &capabilities,
378 &target_endpoint,
379 vec![0, 1, 2, 3],
380 &HeaderMap::new(),
381 &strategy,
382 )
383 .await;
384 assert!(
385 matches!(result.unwrap_err(), SendWithRetryError::Http(_, attempts) if attempts == expected_retry_attempts),
386 "Expected an error result after max retry attempts"
387 );
388 });
389
390 assert!(
391 poll_for_mock_hit(
392 &mut mock_503,
393 10,
394 100,
395 expected_retry_attempts as usize,
396 true
397 )
398 .await,
399 "Expected max retry attempts"
400 );
401 }
402
403 #[cfg_attr(miri, ignore)]
404 #[tokio::test]
405 async fn test_retry_logic_no_errors() {
406 let server = MockServer::start();
407 let mut mock_202 = server
408 .mock_async(|_when, then| {
409 then.status(202)
410 .header("content-type", "application/json")
411 .body(r#"{"status":"Ok"}"#);
412 })
413 .await;
414
415 let target_endpoint = Endpoint {
416 url: server.url("").to_owned().parse().unwrap(),
417 api_key: Some("test-key".into()),
418 ..Default::default()
419 };
420
421 let strategy = RetryStrategy::new(2, 10, RetryBackoffType::Constant, None);
422 let capabilities = NativeCapabilities::new_client();
423
424 tokio::spawn(async move {
425 let result = send_with_retry(
426 &capabilities,
427 &target_endpoint,
428 vec![0, 1, 2, 3],
429 &HeaderMap::new(),
430 &strategy,
431 )
432 .await;
433 assert!(
434 matches!(result, Ok((_, attempts)) if attempts == 1),
435 "Expected an ok result after one attempts"
436 );
437 });
438
439 assert!(
440 poll_for_mock_hit(&mut mock_202, 10, 250, 1, true).await,
441 "Expected only one request attempt"
442 );
443 }
444}