ironflow_engine/notify/
webhook.rs1use std::time::Duration;
4
5use tokio::time::sleep;
6use tracing::{error, info, warn};
7
8use super::{Event, EventSubscriber, SubscriberFuture};
9
10const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
12
13const MAX_RETRIES: u32 = 3;
15
16const BASE_BACKOFF: Duration = Duration::from_millis(500);
18
19pub struct WebhookSubscriber {
40 url: String,
41 client: reqwest::Client,
42}
43
44impl WebhookSubscriber {
45 pub fn new(url: &str) -> Self {
60 let client = reqwest::Client::builder()
61 .timeout(DEFAULT_TIMEOUT)
62 .build()
63 .expect("failed to build HTTP client");
64 Self {
65 url: url.to_string(),
66 client,
67 }
68 }
69
70 pub fn url(&self) -> &str {
72 &self.url
73 }
74
75 async fn deliver(&self, event: &Event) {
77 for attempt in 0..MAX_RETRIES {
78 let result = self.client.post(&self.url).json(event).send().await;
79
80 match result {
81 Ok(resp) if resp.status().is_success() => {
82 info!(
83 url = %self.url,
84 event_type = %event.event_type(),
85 "webhook delivered"
86 );
87 return;
88 }
89 Ok(resp) => {
90 let status = resp.status();
91 self.log_retry_or_fail(attempt, event.event_type(), &format!("HTTP {status}"));
92 }
93 Err(err) => {
94 self.log_retry_or_fail(attempt, event.event_type(), &err.to_string());
95 }
96 }
97
98 if attempt + 1 < MAX_RETRIES {
99 let delay = BASE_BACKOFF * 2u32.pow(attempt);
100 sleep(delay).await;
101 }
102 }
103 }
104
105 fn log_retry_or_fail(&self, attempt: u32, event_type: &str, err_msg: &str) {
106 let remaining = MAX_RETRIES - attempt - 1;
107 if remaining > 0 {
108 warn!(
109 url = %self.url,
110 event_type,
111 attempt = attempt + 1,
112 remaining,
113 error = %err_msg,
114 "webhook delivery failed, retrying"
115 );
116 } else {
117 error!(
118 url = %self.url,
119 event_type,
120 error = %err_msg,
121 "webhook delivery failed after all retries"
122 );
123 }
124 }
125}
126
127impl EventSubscriber for WebhookSubscriber {
128 fn name(&self) -> &str {
129 "webhook"
130 }
131
132 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
133 Box::pin(async move {
134 self.deliver(event).await;
135 })
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn url_accessor() {
145 let sub = WebhookSubscriber::new("https://example.com/hook");
146 assert_eq!(sub.url(), "https://example.com/hook");
147 }
148
149 #[test]
150 fn name_is_webhook() {
151 let sub = WebhookSubscriber::new("https://example.com");
152 assert_eq!(sub.name(), "webhook");
153 }
154}