Skip to main content

heldar_kernel/services/
webhooks.rs

1//! Webhook delivery engine — the SINGLE deliverer of generic events to external systems, superseding
2//! the old single-URL alert notifier.
3//!
4//! Each enabled [`WebhookSubscription`] is an independent at-least-once deliverer: it keeps its own
5//! persisted `cursor_at` (an `events.created_at`, mirroring the old notifier cursor), an event-type +
6//! severity filter, and an optional HMAC-SHA256 secret. Every tick we load the enabled subscriptions
7//! and, for each, deliver the events newer than its cursor that pass the filter — POSTing the JSON
8//! envelope with `X-Heldar-Event` / `X-Heldar-Delivery` / `X-Heldar-Timestamp` headers and, when a
9//! secret is set, `X-Heldar-Signature: sha256=<hex HMAC-SHA256(secret, raw_body)>`. Each attempt is
10//! recorded in `webhook_deliveries`; a retryable failure keeps the cursor (retried next cycle) until
11//! the per-event attempts in that ledger reach [`MAX_ATTEMPTS`], after which the event is given up on
12//! and the cursor advances so one bad endpoint cannot wedge the queue forever.
13//!
14//! `run()` NEVER returns: with no enabled subscriptions it idles the cycle. The supervisor in `main`
15//! therefore spawns it unconditionally and never tight-loops respawning it.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use hmac::{Hmac, Mac};
22use serde_json::{json, Value};
23use sha2::Sha256;
24use sqlx::SqlitePool;
25use uuid::Uuid;
26
27use crate::config::Config;
28use crate::models::{Event, WebhookSubscription};
29
30type HmacSha256 = Hmac<Sha256>;
31
32/// Per-cycle event fetch size (mirrors the old notifier batch).
33const BATCH: i64 = 100;
34/// Give up on an event after this many recorded failed attempts (so a dead endpoint can't wedge the
35/// per-subscription cursor forever — the cursor then advances past the poison event).
36const MAX_ATTEMPTS: i64 = 5;
37
38pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
39    // Built once, outside the loop, and reused across cycles. On the (practically impossible) build
40    // failure, park forever rather than return — returning would have the supervisor respawn us.
41    let client = match reqwest::Client::builder()
42        .timeout(Duration::from_secs(10))
43        .build()
44    {
45        Ok(c) => c,
46        Err(e) => {
47            tracing::error!(error = %e, "webhooks: failed to build http client; idling");
48            std::future::pending::<reqwest::Client>().await
49        }
50    };
51
52    let mut tick = tokio::time::interval(Duration::from_secs(cfg.notifier_interval_s.max(5)));
53    loop {
54        tick.tick().await;
55        let subs = match load_enabled(&pool).await {
56            Ok(s) => s,
57            Err(e) => {
58                tracing::error!(error = %e, "webhooks: failed to load subscriptions");
59                continue;
60            }
61        };
62        if subs.is_empty() {
63            // No enabled subscriptions: idle quietly this cycle (never return — see the module note).
64            continue;
65        }
66        for sub in subs {
67            if let Err(e) = deliver_subscription(&pool, &client, &sub).await {
68                tracing::error!(error = %e, subscription = %sub.id, "webhooks: delivery cycle failed");
69            }
70        }
71    }
72}
73
74/// Load enabled subscriptions, oldest first (stable delivery order across cycles).
75async fn load_enabled(pool: &SqlitePool) -> sqlx::Result<Vec<WebhookSubscription>> {
76    sqlx::query_as::<_, WebhookSubscription>(
77        "SELECT * FROM webhook_subscriptions WHERE enabled = 1 ORDER BY created_at ASC",
78    )
79    .fetch_all(pool)
80    .await
81}
82
83/// Persist the advanced delivery cursor for one subscription.
84async fn save_cursor(pool: &SqlitePool, sub_id: &str, cursor: DateTime<Utc>) -> sqlx::Result<()> {
85    sqlx::query("UPDATE webhook_subscriptions SET cursor_at = ? WHERE id = ?")
86        .bind(cursor)
87        .bind(sub_id)
88        .execute(pool)
89        .await?;
90    Ok(())
91}
92
93/// Deliver every pending, matching event for one subscription, draining in batches until caught up or
94/// stopped at a retryable failure.
95async fn deliver_subscription(
96    pool: &SqlitePool,
97    client: &reqwest::Client,
98    sub: &WebhookSubscription,
99) -> anyhow::Result<()> {
100    // First time we see this subscription (cursor NULL): start at "now", no backlog replay — mirrors
101    // the old notifier so a subscription added LATER does not replay the full event history.
102    let Some(mut cursor) = sub.cursor_at else {
103        save_cursor(pool, &sub.id, Utc::now()).await?;
104        return Ok(());
105    };
106
107    loop {
108        let events = fetch_events(pool, cursor, &sub.min_severity).await?;
109        if events.is_empty() {
110            break;
111        }
112        let n = events.len();
113        let mut advanced = false;
114        for ev in events {
115            // Event-type filter: ["*"] matches all, otherwise exact membership. A non-match is not a
116            // delivery (nothing recorded) — just step the cursor past it.
117            if !matches_event_type(&sub.event_types.0, &ev.event_type) {
118                cursor = ev.created_at;
119                advanced = true;
120                continue;
121            }
122            match try_deliver(pool, client, sub, &ev).await {
123                DeliverOutcome::Advance => {
124                    cursor = ev.created_at;
125                    advanced = true;
126                }
127                DeliverOutcome::Retry => {
128                    // Retryable failure under the attempt bound: keep the cursor on this event (retry
129                    // next cycle) but persist progress made on earlier events in this batch.
130                    if advanced {
131                        save_cursor(pool, &sub.id, cursor).await?;
132                    }
133                    return Ok(());
134                }
135            }
136        }
137        if advanced {
138            save_cursor(pool, &sub.id, cursor).await?;
139        }
140        if n < BATCH as usize {
141            break;
142        }
143    }
144    Ok(())
145}
146
147/// Fetch the next batch of events newer than `cursor` that pass the severity floor, oldest first.
148async fn fetch_events(
149    pool: &SqlitePool,
150    cursor: DateTime<Utc>,
151    min_severity: &str,
152) -> sqlx::Result<Vec<Event>> {
153    let sql = format!(
154        "SELECT * FROM events
155         WHERE {} AND created_at > ?
156         ORDER BY created_at ASC LIMIT ?",
157        min_severity_sql(min_severity),
158    );
159    sqlx::query_as::<_, Event>(&sql)
160        .bind(cursor)
161        .bind(BATCH)
162        .fetch_all(pool)
163        .await
164}
165
166enum DeliverOutcome {
167    /// Move the cursor past this event (delivered, or given up on after the attempt bound).
168    Advance,
169    /// Retryable failure under the attempt bound: stop this subscription's cycle, keep the cursor.
170    Retry,
171}
172
173/// Attempt to deliver one event, recording the attempt in the `webhook_deliveries` ledger.
174async fn try_deliver(
175    pool: &SqlitePool,
176    client: &reqwest::Client,
177    sub: &WebhookSubscription,
178    ev: &Event,
179) -> DeliverOutcome {
180    let prior_failures: i64 = sqlx::query_scalar(
181        "SELECT COUNT(*) FROM webhook_deliveries
182          WHERE subscription_id = ? AND event_id = ? AND status = 'failed'",
183    )
184    .bind(&sub.id)
185    .bind(&ev.id)
186    .fetch_one(pool)
187    .await
188    .unwrap_or(0);
189    let attempt = prior_failures + 1;
190
191    let delivery_id = format!("whd_{}", Uuid::new_v4().simple());
192    let body = event_body(ev);
193    let res = send_event(
194        client,
195        &sub.url,
196        &delivery_id,
197        &ev.event_type,
198        sub.secret.as_deref(),
199        &body,
200    )
201    .await;
202
203    record_delivery(
204        pool,
205        &delivery_id,
206        &sub.id,
207        Some(&ev.id),
208        Some(&ev.event_type),
209        res.ok,
210        attempt,
211        res.status.map(i64::from),
212        res.error.as_deref(),
213    )
214    .await;
215
216    if res.ok {
217        DeliverOutcome::Advance
218    } else if attempt >= MAX_ATTEMPTS {
219        tracing::warn!(
220            subscription = %sub.id,
221            event = %ev.id,
222            attempts = attempt,
223            "webhooks: giving up on event after max attempts; advancing cursor past it"
224        );
225        DeliverOutcome::Advance
226    } else {
227        tracing::warn!(
228            subscription = %sub.id,
229            event = %ev.id,
230            attempt,
231            error = res.error.as_deref().unwrap_or("non-2xx"),
232            "webhooks: delivery failed; will retry next cycle"
233        );
234        DeliverOutcome::Retry
235    }
236}
237
238/// The JSON envelope POSTed for an event (the body that is HMAC-signed verbatim).
239pub fn event_body(ev: &Event) -> Value {
240    json!({
241        "id": ev.id,
242        "camera_id": ev.camera_id,
243        "site_id": ev.site_id,
244        "event_type": ev.event_type,
245        "severity": ev.severity,
246        "timestamp": ev.timestamp,
247        "payload": ev.payload.0,
248    })
249}
250
251/// Outcome of a single signed POST: success flag, HTTP status (if a response came back), and an error
252/// string for the delivery ledger.
253pub struct SendResult {
254    pub ok: bool,
255    pub status: Option<u16>,
256    pub error: Option<String>,
257}
258
259/// POST a signed webhook body. The body is serialized ONCE and both signed and sent verbatim so the
260/// `X-Heldar-Signature` always covers the exact bytes the receiver gets. Used by the delivery loop and
261/// by the synthetic `/test` route.
262pub async fn send_event(
263    client: &reqwest::Client,
264    url: &str,
265    delivery_id: &str,
266    event_type: &str,
267    secret: Option<&str>,
268    body: &Value,
269) -> SendResult {
270    let raw = serde_json::to_string(body).unwrap_or_else(|_| "{}".to_string());
271    let mut req = client
272        .post(url)
273        .header(reqwest::header::CONTENT_TYPE, "application/json")
274        .header("X-Heldar-Event", event_type)
275        .header("X-Heldar-Delivery", delivery_id)
276        .header("X-Heldar-Timestamp", Utc::now().timestamp().to_string())
277        .body(raw.clone());
278    if let Some(secret) = secret.filter(|s| !s.is_empty()) {
279        req = req.header("X-Heldar-Signature", sign(secret, raw.as_bytes()));
280    }
281    match req.send().await {
282        Ok(resp) => {
283            let status = resp.status();
284            SendResult {
285                ok: status.is_success(),
286                status: Some(status.as_u16()),
287                error: if status.is_success() {
288                    None
289                } else {
290                    Some(format!("webhook returned HTTP {}", status.as_u16()))
291                },
292            }
293        }
294        Err(e) => SendResult {
295            ok: false,
296            status: None,
297            error: Some(e.to_string()),
298        },
299    }
300}
301
302/// Insert one row into the `webhook_deliveries` ledger. Best-effort: a failure is logged, not fatal.
303#[allow(clippy::too_many_arguments)]
304pub async fn record_delivery(
305    pool: &SqlitePool,
306    id: &str,
307    subscription_id: &str,
308    event_id: Option<&str>,
309    event_type: Option<&str>,
310    delivered: bool,
311    attempts: i64,
312    response_code: Option<i64>,
313    error: Option<&str>,
314) {
315    let now = Utc::now();
316    let delivered_at = if delivered { Some(now) } else { None };
317    let status = if delivered { "delivered" } else { "failed" };
318    let res = sqlx::query(
319        "INSERT INTO webhook_deliveries
320           (id, subscription_id, event_id, event_type, status, attempts, response_code, error, created_at, delivered_at)
321         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
322    )
323    .bind(id)
324    .bind(subscription_id)
325    .bind(event_id)
326    .bind(event_type)
327    .bind(status)
328    .bind(attempts)
329    .bind(response_code)
330    .bind(error)
331    .bind(now)
332    .bind(delivered_at)
333    .execute(pool)
334    .await;
335    if let Err(e) = res {
336        tracing::error!(error = %e, subscription = %subscription_id, "webhooks: failed to record delivery");
337    }
338}
339
340/// `sha256=<hex>` where `<hex>` is HMAC-SHA256(secret, body). HMAC accepts any key length.
341fn sign(secret: &str, body: &[u8]) -> String {
342    let mut mac =
343        HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts a key of any length");
344    mac.update(body);
345    format!(
346        "sha256={}",
347        crate::auth::hex_encode(&mac.finalize().into_bytes())
348    )
349}
350
351/// Whether `event_type` is selected by a subscription's filter. `["*"]` matches everything; otherwise
352/// it is exact membership.
353pub fn matches_event_type(filter: &[String], event_type: &str) -> bool {
354    filter.iter().any(|t| t == "*") || filter.iter().any(|t| t == event_type)
355}
356
357/// SQL predicate selecting the severities at or above `min_severity`. Values are static literals (never
358/// user input) so this is safe to splice into the query. `info` (or unknown) admits all severities.
359fn min_severity_sql(min_severity: &str) -> &'static str {
360    match min_severity {
361        "critical" => "severity = 'critical'",
362        "warning" => "severity IN ('warning', 'critical')",
363        _ => "1 = 1",
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn wildcard_matches_everything() {
373        let star = vec!["*".to_string()];
374        assert!(matches_event_type(&star, "zone_enter"));
375        assert!(matches_event_type(&star, "anything_at_all"));
376    }
377
378    #[test]
379    fn explicit_set_is_exact_membership() {
380        let set = vec!["zone_enter".to_string(), "disk_pressure".to_string()];
381        assert!(matches_event_type(&set, "zone_enter"));
382        assert!(matches_event_type(&set, "disk_pressure"));
383        assert!(!matches_event_type(&set, "zone_exit"));
384        assert!(!matches_event_type(&[], "zone_enter"));
385    }
386
387    #[test]
388    fn severity_floor_thresholds() {
389        assert_eq!(min_severity_sql("critical"), "severity = 'critical'");
390        assert_eq!(
391            min_severity_sql("warning"),
392            "severity IN ('warning', 'critical')"
393        );
394        // info (and any unknown value) admits all severities.
395        assert_eq!(min_severity_sql("info"), "1 = 1");
396        assert_eq!(min_severity_sql("whatever"), "1 = 1");
397    }
398
399    #[test]
400    fn signature_is_stable_prefixed_hmac_sha256() {
401        // Known-answer: HMAC-SHA256(key="key", msg="The quick brown fox jumps over the lazy dog").
402        let sig = sign("key", b"The quick brown fox jumps over the lazy dog");
403        assert_eq!(
404            sig,
405            "sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
406        );
407        // Stable + key-sensitive.
408        assert_eq!(sign("s", b"body"), sign("s", b"body"));
409        assert_ne!(sign("s1", b"body"), sign("s2", b"body"));
410    }
411}