Skip to main content

ironflow_engine/notify/
webhook.rs

1//! [`WebhookSubscriber`] -- POSTs events as JSON to a URL.
2
3use std::time::Duration;
4
5use tokio::time::sleep;
6use tracing::{error, info, warn};
7
8use super::{Event, EventSubscriber, SubscriberFuture};
9
10/// Default timeout for outbound HTTP calls.
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
12
13/// Maximum number of retry attempts for failed deliveries.
14const MAX_RETRIES: u32 = 3;
15
16/// Base delay for exponential backoff (doubled each retry).
17const BASE_BACKOFF: Duration = Duration::from_millis(500);
18
19/// Subscriber that POSTs the event as JSON to a webhook URL.
20///
21/// Retries failed deliveries with exponential backoff (up to 3 attempts,
22/// 5 s timeout per attempt). The HTTP client is created once and reused.
23///
24/// Event type filtering is handled by the
25/// [`EventPublisher`](super::EventPublisher) at subscription time -- this
26/// subscriber receives only events that already passed the filter.
27///
28/// # Examples
29///
30/// ```no_run
31/// use ironflow_engine::notify::{Event, EventPublisher, WebhookSubscriber};
32///
33/// let mut publisher = EventPublisher::new();
34/// publisher.subscribe(
35///     WebhookSubscriber::new("https://hooks.example.com/events"),
36///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
37/// );
38/// ```
39pub struct WebhookSubscriber {
40    url: String,
41    client: reqwest::Client,
42}
43
44impl WebhookSubscriber {
45    /// Create a new webhook subscriber targeting the given URL.
46    ///
47    /// # Panics
48    ///
49    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// use ironflow_engine::notify::WebhookSubscriber;
55    ///
56    /// let subscriber = WebhookSubscriber::new("https://example.com/hook");
57    /// assert_eq!(subscriber.url(), "https://example.com/hook");
58    /// ```
59    pub fn new(url: &str) -> Self {
60        let client = reqwest::Client::builder()
61            .timeout(DEFAULT_TIMEOUT)
62            .build()
63            .expect("failed to build HTTP client");
64        Self {
65            url: url.to_string(),
66            client,
67        }
68    }
69
70    /// Returns the target URL.
71    pub fn url(&self) -> &str {
72        &self.url
73    }
74
75    /// Deliver with retry + exponential backoff.
76    async fn deliver(&self, event: &Event) {
77        for attempt in 0..MAX_RETRIES {
78            let result = self.client.post(&self.url).json(event).send().await;
79
80            match result {
81                Ok(resp) if resp.status().is_success() => {
82                    info!(
83                        url = %self.url,
84                        event_type = %event.event_type(),
85                        "webhook delivered"
86                    );
87                    return;
88                }
89                Ok(resp) => {
90                    let status = resp.status();
91                    self.log_retry_or_fail(attempt, event.event_type(), &format!("HTTP {status}"));
92                }
93                Err(err) => {
94                    self.log_retry_or_fail(attempt, event.event_type(), &err.to_string());
95                }
96            }
97
98            if attempt + 1 < MAX_RETRIES {
99                let delay = BASE_BACKOFF * 2u32.pow(attempt);
100                sleep(delay).await;
101            }
102        }
103    }
104
105    fn log_retry_or_fail(&self, attempt: u32, event_type: &str, err_msg: &str) {
106        let remaining = MAX_RETRIES - attempt - 1;
107        if remaining > 0 {
108            warn!(
109                url = %self.url,
110                event_type,
111                attempt = attempt + 1,
112                remaining,
113                error = %err_msg,
114                "webhook delivery failed, retrying"
115            );
116        } else {
117            error!(
118                url = %self.url,
119                event_type,
120                error = %err_msg,
121                "webhook delivery failed after all retries"
122            );
123        }
124    }
125}
126
127impl EventSubscriber for WebhookSubscriber {
128    fn name(&self) -> &str {
129        "webhook"
130    }
131
132    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
133        Box::pin(async move {
134            self.deliver(event).await;
135        })
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn url_accessor() {
145        let sub = WebhookSubscriber::new("https://example.com/hook");
146        assert_eq!(sub.url(), "https://example.com/hook");
147    }
148
149    #[test]
150    fn name_is_webhook() {
151        let sub = WebhookSubscriber::new("https://example.com");
152        assert_eq!(sub.name(), "webhook");
153    }
154}