Skip to main content

libdd_trace_utils/send_with_retry/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4//! Provide [`send_with_retry`] utility to send a payload to an [`Endpoint`] with retries if the
5//! request fails.
6
7mod retry_strategy;
8pub use retry_strategy::{RetryBackoffType, RetryStrategy};
9
10use bytes::Bytes;
11use http::HeaderMap;
12use libdd_capabilities::{HttpClientCapability, HttpError, SleepCapability};
13use libdd_common::Endpoint;
14use std::time::Duration;
15use tracing::{debug, error};
16
17pub type Attempts = u32;
18
19pub type SendWithRetryResult = Result<(http::Response<Bytes>, Attempts), SendWithRetryError>;
20
21/// All errors contain the number of attempts after which the final error was returned
22#[derive(Debug)]
23pub enum SendWithRetryError {
24    /// The request received an error HTTP code.
25    Http(http::Response<Bytes>, Attempts),
26    /// Treats timeout errors originated in the transport layer.
27    Timeout(Attempts),
28    /// Treats errors coming from networking.
29    Network(HttpError, Attempts),
30    /// Treats errors while reading the response body.
31    ResponseBody(Attempts),
32    /// Treats errors coming from building the request
33    Build(Attempts),
34}
35
36impl std::fmt::Display for SendWithRetryError {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            SendWithRetryError::Http(_, _) => write!(f, "Http error code received"),
40            SendWithRetryError::Timeout(_) => write!(f, "Request timed out"),
41            SendWithRetryError::Network(error, _) => write!(f, "Network error: {error}"),
42            SendWithRetryError::ResponseBody(_) => write!(f, "Failed to read response body"),
43            SendWithRetryError::Build(_) => {
44                write!(f, "Failed to build request due to invalid property")
45            }
46        }
47    }
48}
49
50impl std::error::Error for SendWithRetryError {}
51
52/// Send the `payload` with a POST request to `target` using the provided `retry_strategy` if the
53/// request fails.
54///
55/// Standard endpoint headers (user-agent, api-key, test-token, entity headers) are set
56/// automatically via [`Endpoint::set_standard_headers`]. Additional `headers` are appended to the
57/// request. The request is executed with a timeout of [`Endpoint::timeout_ms`].
58///
59/// # Returns
60///
61/// Return a [`SendWithRetryResult`] containing the response and the number of attempts or an error
62/// describing the last attempt failure.
63///
64/// # Errors
65/// Fail if the request didn't succeed after applying the retry strategy.
66///
67/// # Example
68///
69/// ```rust, no_run
70/// # use libdd_common::Endpoint;
71/// # use libdd_capabilities::{HttpClientCapability, SleepCapability};
72/// # use libdd_trace_utils::send_with_retry::*;
73/// # async fn run() -> SendWithRetryResult {
74/// let payload: Vec<u8> = vec![0, 1, 2, 3];
75/// let target = Endpoint {
76///     url: "localhost:8126/v04/traces".parse::<hyper::Uri>().unwrap(),
77///     ..Endpoint::default()
78/// };
79/// let mut headers = http::HeaderMap::new();
80/// headers.insert(
81///     http::HeaderName::from_static("content-type"),
82///     http::HeaderValue::from_static("application/msgpack"),
83/// );
84/// let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5));
85/// let capabilities = libdd_capabilities_impl::NativeCapabilities::new_client();
86/// send_with_retry(&capabilities, &target, payload, &headers, &retry_strategy).await
87/// # }
88/// ```
89pub async fn send_with_retry<C: HttpClientCapability + SleepCapability>(
90    capabilities: &C,
91    target: &Endpoint,
92    payload: Vec<u8>,
93    headers: &HeaderMap,
94    retry_strategy: &RetryStrategy,
95) -> SendWithRetryResult {
96    let mut request_attempt = 0;
97    let timeout = Duration::from_millis(target.timeout_ms);
98
99    debug!(
100        url = %target.url,
101        payload_size = payload.len(),
102        max_retries = retry_strategy.max_retries(),
103        "Sending with retry"
104    );
105
106    let payload = Bytes::from(payload);
107    loop {
108        request_attempt += 1;
109
110        debug!(
111            attempt = request_attempt,
112            max_retries = retry_strategy.max_retries(),
113            "Attempting request"
114        );
115
116        let mut builder = http::Request::builder()
117            .method(http::Method::POST)
118            .uri(target.url.clone());
119        builder =
120            target.set_standard_headers(builder, concat!("Tracer/", env!("CARGO_PKG_VERSION")));
121        for (key, value) in headers {
122            builder = builder.header(key, value);
123        }
124        let req = match builder.body(payload.clone()) {
125            Ok(r) => r,
126            Err(_) => {
127                return Err(SendWithRetryError::Build(request_attempt));
128            }
129        };
130
131        let result = tokio::select! {
132            biased;
133            r = capabilities.request(req) => Ok(r),
134            _ = capabilities.sleep(timeout) => Err(()),
135        };
136
137        match result {
138            Ok(Ok(response)) => {
139                let status = response.status();
140                debug!(
141                    status = status.as_u16(),
142                    attempt = request_attempt,
143                    "Received response"
144                );
145
146                if status.is_client_error() || status.is_server_error() {
147                    debug!(
148                        status = status.as_u16(),
149                        attempt = request_attempt,
150                        max_retries = retry_strategy.max_retries(),
151                        "Received error status code"
152                    );
153
154                    if request_attempt < retry_strategy.max_retries() {
155                        debug!(
156                            attempt = request_attempt,
157                            remaining_retries = retry_strategy.max_retries() - request_attempt,
158                            "Retrying after error status code"
159                        );
160                        retry_strategy.delay(request_attempt, capabilities).await;
161                        continue;
162                    } else {
163                        error!(
164                            status = status.as_u16(),
165                            attempts = request_attempt,
166                            "Max retries exceeded, returning HTTP error"
167                        );
168                        return Err(SendWithRetryError::Http(response, request_attempt));
169                    }
170                } else {
171                    debug!(
172                        status = status.as_u16(),
173                        attempts = request_attempt,
174                        "Request succeeded"
175                    );
176                    return Ok((response, request_attempt));
177                }
178            }
179            Ok(Err(e)) => {
180                debug!(
181                    error = ?e,
182                    attempt = request_attempt,
183                    max_retries = retry_strategy.max_retries(),
184                    "Request failed with error"
185                );
186
187                if request_attempt < retry_strategy.max_retries() {
188                    debug!(
189                        attempt = request_attempt,
190                        remaining_retries = retry_strategy.max_retries() - request_attempt,
191                        "Retrying after request error"
192                    );
193                    retry_strategy.delay(request_attempt, capabilities).await;
194                    continue;
195                } else {
196                    let classified_error = match e {
197                        HttpError::Timeout => SendWithRetryError::Timeout(request_attempt),
198                        HttpError::InvalidRequest(_) => SendWithRetryError::Build(request_attempt),
199                        HttpError::ResponseBody(_) => {
200                            SendWithRetryError::ResponseBody(request_attempt)
201                        }
202                        other => SendWithRetryError::Network(other, request_attempt),
203                    };
204                    error!(
205                        error = ?classified_error,
206                        attempts = request_attempt,
207                        "Max retries exceeded, returning request error"
208                    );
209                    return Err(classified_error);
210                }
211            }
212            Err(_) => {
213                debug!(
214                    attempt = request_attempt,
215                    max_retries = retry_strategy.max_retries(),
216                    "Request timed out"
217                );
218
219                if request_attempt < retry_strategy.max_retries() {
220                    debug!(
221                        attempt = request_attempt,
222                        remaining_retries = retry_strategy.max_retries() - request_attempt,
223                        "Retrying after timeout"
224                    );
225                    retry_strategy.delay(request_attempt, capabilities).await;
226                    continue;
227                } else {
228                    error!(
229                        attempts = request_attempt,
230                        "Max retries exceeded, returning timeout error"
231                    );
232                    return Err(SendWithRetryError::Timeout(request_attempt));
233                }
234            }
235        }
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use crate::test_utils::poll_for_mock_hit;
243    use httpmock::MockServer;
244    use libdd_capabilities::HttpClientCapability;
245    use libdd_capabilities_impl::NativeCapabilities;
246
247    #[cfg_attr(miri, ignore)]
248    #[tokio::test]
249    async fn test_zero_retries_on_error() {
250        let server = MockServer::start();
251
252        let mut mock_503 = server
253            .mock_async(|_when, then| {
254                then.status(503)
255                    .header("content-type", "application/json")
256                    .body(r#"{"status":"error"}"#);
257            })
258            .await;
259
260        let _mock_202 = server
261            .mock_async(|_when, then| {
262                then.status(202)
263                    .header("content-type", "application/json")
264                    .body(r#"{"status":"ok"}"#);
265            })
266            .await;
267
268        let target_endpoint = Endpoint {
269            url: server.url("").to_owned().parse().unwrap(),
270            api_key: Some("test-key".into()),
271            ..Default::default()
272        };
273
274        let strategy = RetryStrategy::new(0, 2, RetryBackoffType::Constant, None);
275        let capabilities = NativeCapabilities::new_client();
276
277        tokio::spawn(async move {
278            let result = send_with_retry(
279                &capabilities,
280                &target_endpoint,
281                vec![0, 1, 2, 3],
282                &HeaderMap::new(),
283                &strategy,
284            )
285            .await;
286            assert!(result.is_err(), "Expected an error result");
287            assert!(
288                matches!(result.unwrap_err(), SendWithRetryError::Http(_, 1)),
289                "Expected an http error with one attempt"
290            );
291        });
292
293        assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await);
294    }
295
296    #[cfg_attr(miri, ignore)]
297    #[tokio::test]
298    async fn test_retry_logic_error_then_success() {
299        let server = MockServer::start();
300
301        let mut mock_503 = server
302            .mock_async(|_when, then| {
303                then.status(503)
304                    .header("content-type", "application/json")
305                    .body(r#"{"status":"error"}"#);
306            })
307            .await;
308
309        let mut mock_202 = server
310            .mock_async(|_when, then| {
311                then.status(202)
312                    .header("content-type", "application/json")
313                    .body(r#"{"status":"ok"}"#);
314            })
315            .await;
316
317        let target_endpoint = Endpoint {
318            url: server.url("").to_owned().parse().unwrap(),
319            api_key: Some("test-key".into()),
320            ..Default::default()
321        };
322
323        let strategy = RetryStrategy::new(2, 250, RetryBackoffType::Constant, None);
324        let capabilities = NativeCapabilities::new_client();
325
326        tokio::spawn(async move {
327            let result = send_with_retry(
328                &capabilities,
329                &target_endpoint,
330                vec![0, 1, 2, 3],
331                &HeaderMap::new(),
332                &strategy,
333            )
334            .await;
335            assert!(
336                matches!(result.unwrap(), (_, 2)),
337                "Expected an ok result after two attempts"
338            );
339        });
340
341        assert!(poll_for_mock_hit(&mut mock_503, 10, 100, 1, true).await);
342        assert!(
343            poll_for_mock_hit(&mut mock_202, 10, 100, 1, true).await,
344            "Expected a retry request after a 5xx error"
345        );
346    }
347
348    #[cfg_attr(miri, ignore)]
349    #[tokio::test]
350    async fn test_retry_logic_max_errors() {
351        let server = MockServer::start();
352        let expected_retry_attempts = 3;
353        let mut mock_503 = server
354            .mock_async(|_when, then| {
355                then.status(503)
356                    .header("content-type", "application/json")
357                    .body(r#"{"status":"error"}"#);
358            })
359            .await;
360
361        let target_endpoint = Endpoint {
362            url: server.url("").to_owned().parse().unwrap(),
363            api_key: Some("test-key".into()),
364            ..Default::default()
365        };
366
367        let strategy = RetryStrategy::new(
368            expected_retry_attempts,
369            10,
370            RetryBackoffType::Constant,
371            None,
372        );
373        let capabilities = NativeCapabilities::new_client();
374
375        tokio::spawn(async move {
376            let result = send_with_retry(
377                &capabilities,
378                &target_endpoint,
379                vec![0, 1, 2, 3],
380                &HeaderMap::new(),
381                &strategy,
382            )
383            .await;
384            assert!(
385                matches!(result.unwrap_err(), SendWithRetryError::Http(_, attempts) if attempts == expected_retry_attempts),
386                "Expected an error result after max retry attempts"
387            );
388        });
389
390        assert!(
391            poll_for_mock_hit(
392                &mut mock_503,
393                10,
394                100,
395                expected_retry_attempts as usize,
396                true
397            )
398            .await,
399            "Expected max retry attempts"
400        );
401    }
402
403    #[cfg_attr(miri, ignore)]
404    #[tokio::test]
405    async fn test_retry_logic_no_errors() {
406        let server = MockServer::start();
407        let mut mock_202 = server
408            .mock_async(|_when, then| {
409                then.status(202)
410                    .header("content-type", "application/json")
411                    .body(r#"{"status":"Ok"}"#);
412            })
413            .await;
414
415        let target_endpoint = Endpoint {
416            url: server.url("").to_owned().parse().unwrap(),
417            api_key: Some("test-key".into()),
418            ..Default::default()
419        };
420
421        let strategy = RetryStrategy::new(2, 10, RetryBackoffType::Constant, None);
422        let capabilities = NativeCapabilities::new_client();
423
424        tokio::spawn(async move {
425            let result = send_with_retry(
426                &capabilities,
427                &target_endpoint,
428                vec![0, 1, 2, 3],
429                &HeaderMap::new(),
430                &strategy,
431            )
432            .await;
433            assert!(
434                matches!(result, Ok((_, attempts)) if attempts == 1),
435                "Expected an ok result after one attempts"
436            );
437        });
438
439        assert!(
440            poll_for_mock_hit(&mut mock_202, 10, 250, 1, true).await,
441            "Expected only one request attempt"
442        );
443    }
444}