Skip to main content

ironflow_engine/notify/
retry.rs

1//! Retry with exponential backoff for outbound HTTP deliveries.
2//!
3//! Shared by all built-in subscribers ([`WebhookSubscriber`](super::WebhookSubscriber),
4//! [`BetterStackSubscriber`](super::BetterStackSubscriber), etc.) so that
5//! retry logic is defined once.
6
7use std::fmt::Display;
8use std::time::Duration;
9
10use reqwest::{Client, RequestBuilder, Response, StatusCode};
11use tokio::time::sleep;
12use tracing::{error, info, warn};
13
14/// Configuration for retry behaviour on outbound HTTP calls.
15///
16/// All built-in subscribers share this configuration. Construct with
17/// [`RetryConfig::new`] or use [`RetryConfig::default`] for sensible
18/// defaults (3 attempts, 5 s timeout, 500 ms base backoff).
19///
20/// # Examples
21///
22/// ```
23/// use ironflow_engine::notify::RetryConfig;
24///
25/// let config = RetryConfig::default();
26/// assert_eq!(config.max_retries(), 3);
27/// ```
28#[derive(Debug, Clone)]
29pub struct RetryConfig {
30    max_retries: u32,
31    timeout: Duration,
32    base_backoff: Duration,
33}
34
35impl RetryConfig {
36    /// Default timeout for outbound HTTP calls.
37    const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
38
39    /// Default maximum number of retry attempts.
40    const DEFAULT_MAX_RETRIES: u32 = 3;
41
42    /// Default base delay for exponential backoff (doubled each retry).
43    const DEFAULT_BASE_BACKOFF: Duration = Duration::from_millis(500);
44
45    /// Create a new retry configuration.
46    ///
47    /// # Examples
48    ///
49    /// ```
50    /// use std::time::Duration;
51    /// use ironflow_engine::notify::RetryConfig;
52    ///
53    /// let config = RetryConfig::new(5, Duration::from_secs(10), Duration::from_secs(1));
54    /// assert_eq!(config.max_retries(), 5);
55    /// assert_eq!(config.timeout(), Duration::from_secs(10));
56    /// assert_eq!(config.base_backoff(), Duration::from_secs(1));
57    /// ```
58    pub fn new(max_retries: u32, timeout: Duration, base_backoff: Duration) -> Self {
59        Self {
60            max_retries,
61            timeout,
62            base_backoff,
63        }
64    }
65
66    /// Maximum number of retry attempts.
67    pub fn max_retries(&self) -> u32 {
68        self.max_retries
69    }
70
71    /// Timeout per HTTP request.
72    pub fn timeout(&self) -> Duration {
73        self.timeout
74    }
75
76    /// Base delay for exponential backoff (doubled each retry).
77    pub fn base_backoff(&self) -> Duration {
78        self.base_backoff
79    }
80
81    /// Build an HTTP client configured with this timeout.
82    ///
83    /// # Panics
84    ///
85    /// Panics if the TLS backend is unavailable.
86    pub fn build_client(&self) -> Client {
87        Client::builder()
88            .timeout(self.timeout)
89            .build()
90            .expect("failed to build HTTP client")
91    }
92}
93
94impl Default for RetryConfig {
95    fn default() -> Self {
96        Self {
97            max_retries: Self::DEFAULT_MAX_RETRIES,
98            timeout: Self::DEFAULT_TIMEOUT,
99            base_backoff: Self::DEFAULT_BASE_BACKOFF,
100        }
101    }
102}
103
104/// Predicate that decides whether an HTTP response counts as success.
105///
106/// The default for webhooks is `Response::status().is_success()` (2xx),
107/// while BetterStack expects exactly `202 Accepted`.
108pub type SuccessPredicate = fn(&Response) -> bool;
109
110/// Returns `true` when the response status is 2xx.
111pub fn is_success_2xx(response: &Response) -> bool {
112    response.status().is_success()
113}
114
115/// Returns `true` when the response status is exactly `202 Accepted`.
116pub fn is_accepted_202(response: &Response) -> bool {
117    response.status() == StatusCode::ACCEPTED
118}
119
120/// Execute an HTTP request with retry and exponential backoff.
121///
122/// Calls `build_request` before each attempt to produce a fresh
123/// [`RequestBuilder`] (request builders are consumed on send).
124/// `is_success` determines whether a given response is acceptable.
125///
126/// `subscriber_name` and `context` are used only for structured logging.
127///
128/// # Examples
129///
130/// ```no_run
131/// use ironflow_engine::notify::{RetryConfig, deliver_with_retry, is_success_2xx};
132/// use reqwest::Client;
133///
134/// # async fn example() {
135/// let config = RetryConfig::default();
136/// let client = config.build_client();
137/// let url = "https://example.com/hook";
138///
139/// deliver_with_retry(
140///     &config,
141///     || client.post(url).body("{}"),
142///     is_success_2xx,
143///     "webhook",
144///     url,
145/// ).await;
146/// # }
147/// ```
148pub async fn deliver_with_retry(
149    config: &RetryConfig,
150    build_request: impl Fn() -> RequestBuilder,
151    is_success: SuccessPredicate,
152    subscriber_name: &str,
153    context: &(impl Display + ?Sized),
154) {
155    for attempt in 0..config.max_retries {
156        let result = build_request().send().await;
157
158        match result {
159            Ok(resp) if is_success(&resp) => {
160                info!(
161                    subscriber = subscriber_name,
162                    context = %context,
163                    "delivery succeeded"
164                );
165                return;
166            }
167            Ok(resp) => {
168                let status = resp.status();
169                log_retry_or_fail(
170                    config,
171                    attempt,
172                    subscriber_name,
173                    context,
174                    &format!("HTTP {status}"),
175                );
176            }
177            Err(err) => {
178                log_retry_or_fail(config, attempt, subscriber_name, context, &err.to_string());
179            }
180        }
181
182        if attempt + 1 < config.max_retries {
183            let delay = config.base_backoff * 2u32.pow(attempt);
184            sleep(delay).await;
185        }
186    }
187}
188
189fn log_retry_or_fail(
190    config: &RetryConfig,
191    attempt: u32,
192    subscriber_name: &str,
193    context: &(impl Display + ?Sized),
194    err_msg: &str,
195) {
196    let remaining = config.max_retries - attempt - 1;
197    if remaining > 0 {
198        warn!(
199            subscriber = subscriber_name,
200            context = %context,
201            attempt = attempt + 1,
202            remaining,
203            error = %err_msg,
204            "delivery failed, retrying"
205        );
206    } else {
207        error!(
208            subscriber = subscriber_name,
209            context = %context,
210            error = %err_msg,
211            "delivery failed after all retries"
212        );
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn default_config_values() {
222        let config = RetryConfig::default();
223        assert_eq!(config.max_retries(), 3);
224        assert_eq!(config.timeout(), Duration::from_secs(5));
225        assert_eq!(config.base_backoff(), Duration::from_millis(500));
226    }
227
228    #[test]
229    fn custom_config_values() {
230        let config = RetryConfig::new(5, Duration::from_secs(10), Duration::from_secs(1));
231        assert_eq!(config.max_retries(), 5);
232        assert_eq!(config.timeout(), Duration::from_secs(10));
233        assert_eq!(config.base_backoff(), Duration::from_secs(1));
234    }
235
236    #[test]
237    fn build_client_succeeds() {
238        let config = RetryConfig::default();
239        let _client = config.build_client();
240    }
241
242    use axum::http::Response as HttpResponse;
243
244    #[test]
245    fn is_success_2xx_predicate() {
246        let response = HttpResponse::builder().status(200).body("").unwrap();
247        let reqwest_resp = Response::from(response);
248        assert!(is_success_2xx(&reqwest_resp));
249    }
250
251    #[test]
252    fn is_success_2xx_rejects_4xx() {
253        let response = HttpResponse::builder().status(400).body("").unwrap();
254        let reqwest_resp = Response::from(response);
255        assert!(!is_success_2xx(&reqwest_resp));
256    }
257
258    #[test]
259    fn is_accepted_202_predicate() {
260        let response = HttpResponse::builder().status(202).body("").unwrap();
261        let reqwest_resp = Response::from(response);
262        assert!(is_accepted_202(&reqwest_resp));
263    }
264
265    #[test]
266    fn is_accepted_202_rejects_200() {
267        let response = HttpResponse::builder().status(200).body("").unwrap();
268        let reqwest_resp = Response::from(response);
269        assert!(!is_accepted_202(&reqwest_resp));
270    }
271
272    #[tokio::test]
273    async fn deliver_succeeds_on_first_try() {
274        use axum::Router;
275        use axum::http::StatusCode;
276        use axum::routing::post;
277        use tokio::net::TcpListener;
278
279        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
280        let addr = listener.local_addr().unwrap();
281
282        let app = Router::new().route("/", post(|| async { StatusCode::OK }));
283        tokio::spawn(async move {
284            axum::serve(listener, app).await.unwrap();
285        });
286
287        let config = RetryConfig::default();
288        let client = config.build_client();
289        let url = format!("http://{}", addr);
290
291        deliver_with_retry(
292            &config,
293            || client.post(&url).body("{}"),
294            is_success_2xx,
295            "test",
296            &url,
297        )
298        .await;
299    }
300
301    #[tokio::test]
302    async fn deliver_retries_on_server_error() {
303        use axum::Router;
304        use axum::http::StatusCode;
305        use axum::routing::post;
306        use std::sync::Arc;
307        use std::sync::atomic::{AtomicU32, Ordering};
308        use tokio::net::TcpListener;
309
310        let call_count = Arc::new(AtomicU32::new(0));
311        let count = call_count.clone();
312
313        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
314        let addr = listener.local_addr().unwrap();
315
316        let app = Router::new().route(
317            "/",
318            post(move || {
319                let count = count.clone();
320                async move {
321                    count.fetch_add(1, Ordering::SeqCst);
322                    StatusCode::INTERNAL_SERVER_ERROR
323                }
324            }),
325        );
326        tokio::spawn(async move {
327            axum::serve(listener, app).await.unwrap();
328        });
329
330        let config = RetryConfig::new(3, Duration::from_secs(5), Duration::from_millis(10));
331        let client = config.build_client();
332        let url = format!("http://{}", addr);
333
334        deliver_with_retry(
335            &config,
336            || client.post(&url).body("{}"),
337            is_success_2xx,
338            "test",
339            &url,
340        )
341        .await;
342
343        assert_eq!(call_count.load(Ordering::SeqCst), 3);
344    }
345}