Skip to main content

ironflow_engine/notify/
webhook.rs

1//! [`WebhookSubscriber`] -- POSTs events as JSON to a URL.
2//!
3//! When a signing secret is configured, each outbound request includes an
4//! `X-Signature-256` header containing the HMAC-SHA256 digest of the JSON
5//! body, prefixed with `sha256=` (the same convention used by GitHub and
6//! GitLab). Receivers can verify the signature to ensure the payload was
7//! not tampered with in transit.
8
9use hmac::{Hmac, Mac};
10use reqwest::Client;
11use sha2::Sha256;
12
13use super::retry::{RetryConfig, deliver_with_retry, is_success_2xx};
14use super::{Event, EventSubscriber, SubscriberFuture};
15
16type HmacSha256 = Hmac<Sha256>;
17
18/// Header name used for outbound HMAC-SHA256 signatures.
19const SIGNATURE_HEADER: &str = "X-Signature-256";
20
21/// Subscriber that POSTs the event as JSON to a webhook URL.
22///
23/// Retries failed deliveries with exponential backoff (up to 3 attempts,
24/// 5 s timeout per attempt). The HTTP client is created once and reused.
25///
26/// When a signing secret is provided via [`WebhookSubscriber::with_signing_secret`],
27/// every outbound request includes an `X-Signature-256` header whose value
28/// is `sha256=<hex-encoded HMAC-SHA256>`, computed over the raw JSON body.
29/// This lets the receiver verify payload authenticity.
30///
31/// Event type filtering is handled by the
32/// [`EventPublisher`](super::EventPublisher) at subscription time -- this
33/// subscriber receives only events that already passed the filter.
34///
35/// # Examples
36///
37/// ```no_run
38/// use ironflow_engine::notify::{Event, EventPublisher, WebhookSubscriber};
39///
40/// let mut publisher = EventPublisher::new();
41///
42/// // Without signature
43/// publisher.subscribe(
44///     WebhookSubscriber::new("https://hooks.example.com/events"),
45///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
46/// );
47///
48/// // With HMAC-SHA256 signature
49/// publisher.subscribe(
50///     WebhookSubscriber::with_signing_secret(
51///         "https://hooks.example.com/signed",
52///         "my-webhook-secret",
53///     ),
54///     Event::ALL,
55/// );
56/// ```
57pub struct WebhookSubscriber {
58    url: String,
59    signing_secret: Option<String>,
60    client: Client,
61    retry_config: RetryConfig,
62}
63
64impl WebhookSubscriber {
65    /// Create a new webhook subscriber targeting the given URL.
66    ///
67    /// Uses the default [`RetryConfig`] (3 retries, 5 s timeout, 500 ms
68    /// base backoff). No outbound signature is added.
69    ///
70    /// # Panics
71    ///
72    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use ironflow_engine::notify::WebhookSubscriber;
78    ///
79    /// let subscriber = WebhookSubscriber::new("https://example.com/hook");
80    /// assert_eq!(subscriber.url(), "https://example.com/hook");
81    /// assert!(subscriber.signing_secret().is_none());
82    /// ```
83    pub fn new(url: &str) -> Self {
84        Self::with_retry_config(url, RetryConfig::default())
85    }
86
87    /// Create a webhook subscriber with a custom retry configuration.
88    ///
89    /// No outbound signature is added.
90    ///
91    /// # Panics
92    ///
93    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use ironflow_engine::notify::{RetryConfig, WebhookSubscriber};
99    ///
100    /// let config = RetryConfig::new(
101    ///     5,
102    ///     std::time::Duration::from_secs(10),
103    ///     std::time::Duration::from_secs(1),
104    /// );
105    /// let subscriber = WebhookSubscriber::with_retry_config("https://example.com/hook", config);
106    /// ```
107    pub fn with_retry_config(url: &str, retry_config: RetryConfig) -> Self {
108        Self::build(url, None, retry_config)
109    }
110
111    /// Create a webhook subscriber that signs outbound payloads with HMAC-SHA256.
112    ///
113    /// Each request will include an `X-Signature-256` header containing
114    /// `sha256=<hex-encoded digest>`, computed over the raw JSON body.
115    ///
116    /// Uses the default [`RetryConfig`].
117    ///
118    /// # Panics
119    ///
120    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
121    ///
122    /// # Examples
123    ///
124    /// ```
125    /// use ironflow_engine::notify::WebhookSubscriber;
126    ///
127    /// let subscriber = WebhookSubscriber::with_signing_secret(
128    ///     "https://example.com/hook",
129    ///     "my-secret",
130    /// );
131    /// assert!(subscriber.signing_secret().is_some());
132    /// ```
133    pub fn with_signing_secret(url: &str, secret: &str) -> Self {
134        Self::with_signing_secret_and_retry(url, secret, RetryConfig::default())
135    }
136
137    /// Create a webhook subscriber with HMAC-SHA256 signing and custom retry config.
138    ///
139    /// # Panics
140    ///
141    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
142    ///
143    /// # Examples
144    ///
145    /// ```
146    /// use ironflow_engine::notify::{RetryConfig, WebhookSubscriber};
147    ///
148    /// let config = RetryConfig::new(
149    ///     5,
150    ///     std::time::Duration::from_secs(10),
151    ///     std::time::Duration::from_secs(1),
152    /// );
153    /// let subscriber = WebhookSubscriber::with_signing_secret_and_retry(
154    ///     "https://example.com/hook",
155    ///     "my-secret",
156    ///     config,
157    /// );
158    /// ```
159    pub fn with_signing_secret_and_retry(
160        url: &str,
161        secret: &str,
162        retry_config: RetryConfig,
163    ) -> Self {
164        Self::build(url, Some(secret), retry_config)
165    }
166
167    fn build(url: &str, signing_secret: Option<&str>, retry_config: RetryConfig) -> Self {
168        let client = retry_config.build_client();
169        Self {
170            url: url.to_string(),
171            signing_secret: signing_secret.map(|s| s.to_string()),
172            client,
173            retry_config,
174        }
175    }
176
177    /// Returns the target URL.
178    pub fn url(&self) -> &str {
179        &self.url
180    }
181
182    /// Returns the signing secret, if configured.
183    pub fn signing_secret(&self) -> Option<&str> {
184        self.signing_secret.as_deref()
185    }
186
187    /// Compute the `sha256=<hex>` signature string for a given body.
188    fn compute_signature(secret: &str, body: &[u8]) -> String {
189        let mut mac =
190            HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts any key size");
191        mac.update(body);
192        format!("sha256={}", hex::encode(mac.finalize().into_bytes()))
193    }
194}
195
196impl EventSubscriber for WebhookSubscriber {
197    fn name(&self) -> &str {
198        "webhook"
199    }
200
201    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
202        Box::pin(async move {
203            let body = serde_json::to_vec(event).expect("Event is always serializable");
204            let signature = self
205                .signing_secret
206                .as_deref()
207                .map(|secret| Self::compute_signature(secret, &body));
208
209            deliver_with_retry(
210                &self.retry_config,
211                || {
212                    let mut req = self
213                        .client
214                        .post(&self.url)
215                        .header("Content-Type", "application/json")
216                        .body(body.clone());
217                    if let Some(sig) = &signature {
218                        req = req.header(SIGNATURE_HEADER, sig.as_str());
219                    }
220                    req
221                },
222                is_success_2xx,
223                "webhook",
224                &self.url,
225            )
226            .await;
227        })
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use std::sync::Arc;
234    use std::time::Duration;
235
236    use axum::Router;
237    use axum::body::Bytes;
238    use axum::http::{HeaderMap, StatusCode};
239    use axum::routing::post;
240    use chrono::Utc;
241    use hmac::{Hmac, Mac};
242    use ironflow_store::models::RunStatus;
243    use rust_decimal::Decimal;
244    use sha2::Sha256;
245    use tokio::net::TcpListener;
246    use tokio::sync::Mutex;
247    use uuid::Uuid;
248
249    use super::*;
250
251    type HmacSha256 = Hmac<Sha256>;
252    type CapturedRequest = Arc<Mutex<Option<(HeaderMap, Vec<u8>)>>>;
253
254    fn compute_expected_hmac(secret: &[u8], body: &[u8]) -> String {
255        let mut mac = HmacSha256::new_from_slice(secret).expect("HMAC key rejected");
256        mac.update(body);
257        format!("sha256={}", hex::encode(mac.finalize().into_bytes()))
258    }
259
260    #[test]
261    fn url_accessor() {
262        let sub = WebhookSubscriber::new("https://example.com/hook");
263        assert_eq!(sub.url(), "https://example.com/hook");
264    }
265
266    #[test]
267    fn name_is_webhook() {
268        let sub = WebhookSubscriber::new("https://example.com");
269        assert_eq!(sub.name(), "webhook");
270    }
271
272    #[test]
273    fn no_signing_secret_by_default() {
274        let sub = WebhookSubscriber::new("https://example.com/hook");
275        assert!(sub.signing_secret().is_none());
276    }
277
278    #[test]
279    fn with_signing_secret_stores_secret() {
280        let sub = WebhookSubscriber::with_signing_secret("https://example.com/hook", "my-secret");
281        assert_eq!(sub.signing_secret(), Some("my-secret"));
282    }
283
284    #[test]
285    fn with_signing_secret_and_retry_stores_secret() {
286        let config = RetryConfig::new(5, Duration::from_secs(10), Duration::from_secs(1));
287        let sub = WebhookSubscriber::with_signing_secret_and_retry(
288            "https://example.com/hook",
289            "my-secret",
290            config,
291        );
292        assert_eq!(sub.signing_secret(), Some("my-secret"));
293        assert_eq!(sub.url(), "https://example.com/hook");
294    }
295
296    #[test]
297    fn compute_signature_matches_hmac_sha256() {
298        let secret = "test-secret";
299        let body = b"{\"type\":\"run_created\"}";
300        let sig = WebhookSubscriber::compute_signature(secret, body);
301        let expected = compute_expected_hmac(secret.as_bytes(), body);
302        assert_eq!(sig, expected);
303    }
304
305    #[test]
306    fn compute_signature_empty_body() {
307        let secret = "test-secret";
308        let body = b"";
309        let sig = WebhookSubscriber::compute_signature(secret, body);
310        let expected = compute_expected_hmac(secret.as_bytes(), body);
311        assert_eq!(sig, expected);
312    }
313
314    #[test]
315    fn compute_signature_has_sha256_prefix() {
316        let sig = WebhookSubscriber::compute_signature("secret", b"body");
317        assert!(sig.starts_with("sha256="));
318        assert_eq!(sig.len(), 7 + 64); // "sha256=" + 64 hex chars
319    }
320
321    #[test]
322    fn compute_signature_rfc4231_test_vector() {
323        // RFC 4231 Test Case 2: Key = "Jefe", Data = "what do ya want for nothing?"
324        let key = "Jefe";
325        let data = b"what do ya want for nothing?";
326        let expected = "5bdcc146bf60754e6a042426089575c75a003f089d2739839dec58b964ec3843";
327
328        let sig = WebhookSubscriber::compute_signature(key, data);
329        assert_eq!(sig, format!("sha256={}", expected));
330    }
331
332    #[tokio::test]
333    async fn unsigned_webhook_does_not_send_signature_header() {
334        let received_headers: Arc<Mutex<Option<HeaderMap>>> = Arc::new(Mutex::new(None));
335        let captured = received_headers.clone();
336
337        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
338        let addr = listener.local_addr().unwrap();
339
340        let app = Router::new().route(
341            "/",
342            post(move |headers: HeaderMap, _body: Bytes| {
343                let captured = captured.clone();
344                async move {
345                    *captured.lock().await = Some(headers);
346                    StatusCode::OK
347                }
348            }),
349        );
350        tokio::spawn(async move {
351            axum::serve(listener, app).await.unwrap();
352        });
353
354        let sub = WebhookSubscriber::new(&format!("http://{}", addr));
355        let event = Event::RunCreated {
356            run_id: Uuid::now_v7(),
357            workflow_name: "deploy".to_string(),
358            at: Utc::now(),
359        };
360
361        sub.handle(&event).await;
362
363        let headers = received_headers.lock().await;
364        let headers = headers.as_ref().expect("request was received");
365        assert!(headers.get("X-Signature-256").is_none());
366    }
367
368    #[tokio::test]
369    async fn signed_webhook_sends_valid_signature_header() {
370        let secret = "webhook-secret-42";
371
372        let received: CapturedRequest = Arc::new(Mutex::new(None));
373        let captured = received.clone();
374
375        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
376        let addr = listener.local_addr().unwrap();
377
378        let app = Router::new().route(
379            "/",
380            post(move |headers: HeaderMap, body: Bytes| {
381                let captured = captured.clone();
382                async move {
383                    *captured.lock().await = Some((headers, body.to_vec()));
384                    StatusCode::OK
385                }
386            }),
387        );
388        tokio::spawn(async move {
389            axum::serve(listener, app).await.unwrap();
390        });
391
392        let sub = WebhookSubscriber::with_signing_secret(&format!("http://{}", addr), secret);
393        let event = Event::RunStatusChanged {
394            run_id: Uuid::now_v7(),
395            workflow_name: "deploy".to_string(),
396            from: RunStatus::Pending,
397            to: RunStatus::Running,
398            error: None,
399            cost_usd: Decimal::ZERO,
400            duration_ms: 0,
401            at: Utc::now(),
402        };
403
404        sub.handle(&event).await;
405
406        let guard = received.lock().await;
407        let (headers, body) = guard.as_ref().expect("request was received");
408
409        let sig_header = headers
410            .get("X-Signature-256")
411            .expect("X-Signature-256 header must be present")
412            .to_str()
413            .unwrap();
414
415        assert!(sig_header.starts_with("sha256="));
416
417        // Verify the signature is correct
418        let expected = compute_expected_hmac(secret.as_bytes(), body);
419        assert_eq!(sig_header, expected);
420    }
421
422    #[test]
423    fn different_secrets_produce_different_signatures() {
424        let body = b"{\"type\":\"run_created\"}";
425        let sig_a = WebhookSubscriber::compute_signature("secret-A", body);
426        let sig_b = WebhookSubscriber::compute_signature("secret-B", body);
427        assert_ne!(sig_a, sig_b);
428    }
429
430    #[test]
431    fn wrong_secret_does_not_match() {
432        let body = b"{\"type\":\"run_created\"}";
433        let sig = WebhookSubscriber::compute_signature("correct-secret", body);
434        let wrong = compute_expected_hmac(b"wrong-secret", body);
435        assert_ne!(sig, wrong);
436    }
437}