ironflow_engine/notify/
retry.rs1use std::fmt::Display;
8use std::time::Duration;
9
10use reqwest::{Client, RequestBuilder, Response, StatusCode};
11use tokio::time::sleep;
12use tracing::{error, info, warn};
13
14#[derive(Debug, Clone)]
29pub struct RetryConfig {
30 max_retries: u32,
31 timeout: Duration,
32 base_backoff: Duration,
33}
34
35impl RetryConfig {
36 const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
38
39 const DEFAULT_MAX_RETRIES: u32 = 3;
41
42 const DEFAULT_BASE_BACKOFF: Duration = Duration::from_millis(500);
44
45 pub fn new(max_retries: u32, timeout: Duration, base_backoff: Duration) -> Self {
59 Self {
60 max_retries,
61 timeout,
62 base_backoff,
63 }
64 }
65
66 pub fn max_retries(&self) -> u32 {
68 self.max_retries
69 }
70
71 pub fn timeout(&self) -> Duration {
73 self.timeout
74 }
75
76 pub fn base_backoff(&self) -> Duration {
78 self.base_backoff
79 }
80
81 pub fn build_client(&self) -> Client {
87 Client::builder()
88 .timeout(self.timeout)
89 .build()
90 .expect("failed to build HTTP client")
91 }
92}
93
94impl Default for RetryConfig {
95 fn default() -> Self {
96 Self {
97 max_retries: Self::DEFAULT_MAX_RETRIES,
98 timeout: Self::DEFAULT_TIMEOUT,
99 base_backoff: Self::DEFAULT_BASE_BACKOFF,
100 }
101 }
102}
103
104pub type SuccessPredicate = fn(&Response) -> bool;
109
110pub fn is_success_2xx(response: &Response) -> bool {
112 response.status().is_success()
113}
114
115pub fn is_accepted_202(response: &Response) -> bool {
117 response.status() == StatusCode::ACCEPTED
118}
119
120pub async fn deliver_with_retry(
149 config: &RetryConfig,
150 build_request: impl Fn() -> RequestBuilder,
151 is_success: SuccessPredicate,
152 subscriber_name: &str,
153 context: &(impl Display + ?Sized),
154) {
155 for attempt in 0..config.max_retries {
156 let result = build_request().send().await;
157
158 match result {
159 Ok(resp) if is_success(&resp) => {
160 info!(
161 subscriber = subscriber_name,
162 context = %context,
163 "delivery succeeded"
164 );
165 return;
166 }
167 Ok(resp) => {
168 let status = resp.status();
169 log_retry_or_fail(
170 config,
171 attempt,
172 subscriber_name,
173 context,
174 &format!("HTTP {status}"),
175 );
176 }
177 Err(err) => {
178 log_retry_or_fail(config, attempt, subscriber_name, context, &err.to_string());
179 }
180 }
181
182 if attempt + 1 < config.max_retries {
183 let delay = config.base_backoff * 2u32.pow(attempt);
184 sleep(delay).await;
185 }
186 }
187}
188
189fn log_retry_or_fail(
190 config: &RetryConfig,
191 attempt: u32,
192 subscriber_name: &str,
193 context: &(impl Display + ?Sized),
194 err_msg: &str,
195) {
196 let remaining = config.max_retries - attempt - 1;
197 if remaining > 0 {
198 warn!(
199 subscriber = subscriber_name,
200 context = %context,
201 attempt = attempt + 1,
202 remaining,
203 error = %err_msg,
204 "delivery failed, retrying"
205 );
206 } else {
207 error!(
208 subscriber = subscriber_name,
209 context = %context,
210 error = %err_msg,
211 "delivery failed after all retries"
212 );
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn default_config_values() {
222 let config = RetryConfig::default();
223 assert_eq!(config.max_retries(), 3);
224 assert_eq!(config.timeout(), Duration::from_secs(5));
225 assert_eq!(config.base_backoff(), Duration::from_millis(500));
226 }
227
228 #[test]
229 fn custom_config_values() {
230 let config = RetryConfig::new(5, Duration::from_secs(10), Duration::from_secs(1));
231 assert_eq!(config.max_retries(), 5);
232 assert_eq!(config.timeout(), Duration::from_secs(10));
233 assert_eq!(config.base_backoff(), Duration::from_secs(1));
234 }
235
236 #[test]
237 fn build_client_succeeds() {
238 let config = RetryConfig::default();
239 let _client = config.build_client();
240 }
241
242 use axum::http::Response as HttpResponse;
243
244 #[test]
245 fn is_success_2xx_predicate() {
246 let response = HttpResponse::builder().status(200).body("").unwrap();
247 let reqwest_resp = Response::from(response);
248 assert!(is_success_2xx(&reqwest_resp));
249 }
250
251 #[test]
252 fn is_success_2xx_rejects_4xx() {
253 let response = HttpResponse::builder().status(400).body("").unwrap();
254 let reqwest_resp = Response::from(response);
255 assert!(!is_success_2xx(&reqwest_resp));
256 }
257
258 #[test]
259 fn is_accepted_202_predicate() {
260 let response = HttpResponse::builder().status(202).body("").unwrap();
261 let reqwest_resp = Response::from(response);
262 assert!(is_accepted_202(&reqwest_resp));
263 }
264
265 #[test]
266 fn is_accepted_202_rejects_200() {
267 let response = HttpResponse::builder().status(200).body("").unwrap();
268 let reqwest_resp = Response::from(response);
269 assert!(!is_accepted_202(&reqwest_resp));
270 }
271
272 #[tokio::test]
273 async fn deliver_succeeds_on_first_try() {
274 use axum::Router;
275 use axum::http::StatusCode;
276 use axum::routing::post;
277 use tokio::net::TcpListener;
278
279 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
280 let addr = listener.local_addr().unwrap();
281
282 let app = Router::new().route("/", post(|| async { StatusCode::OK }));
283 tokio::spawn(async move {
284 axum::serve(listener, app).await.unwrap();
285 });
286
287 let config = RetryConfig::default();
288 let client = config.build_client();
289 let url = format!("http://{}", addr);
290
291 deliver_with_retry(
292 &config,
293 || client.post(&url).body("{}"),
294 is_success_2xx,
295 "test",
296 &url,
297 )
298 .await;
299 }
300
301 #[tokio::test]
302 async fn deliver_retries_on_server_error() {
303 use axum::Router;
304 use axum::http::StatusCode;
305 use axum::routing::post;
306 use std::sync::Arc;
307 use std::sync::atomic::{AtomicU32, Ordering};
308 use tokio::net::TcpListener;
309
310 let call_count = Arc::new(AtomicU32::new(0));
311 let count = call_count.clone();
312
313 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
314 let addr = listener.local_addr().unwrap();
315
316 let app = Router::new().route(
317 "/",
318 post(move || {
319 let count = count.clone();
320 async move {
321 count.fetch_add(1, Ordering::SeqCst);
322 StatusCode::INTERNAL_SERVER_ERROR
323 }
324 }),
325 );
326 tokio::spawn(async move {
327 axum::serve(listener, app).await.unwrap();
328 });
329
330 let config = RetryConfig::new(3, Duration::from_secs(5), Duration::from_millis(10));
331 let client = config.build_client();
332 let url = format!("http://{}", addr);
333
334 deliver_with_retry(
335 &config,
336 || client.post(&url).body("{}"),
337 is_success_2xx,
338 "test",
339 &url,
340 )
341 .await;
342
343 assert_eq!(call_count.load(Ordering::SeqCst), 3);
344 }
345}