Skip to main content

heldar_kernel/services/
webhooks.rs

1//! Webhook delivery engine — the SINGLE deliverer of generic events to external systems, superseding
2//! the old single-URL alert notifier (whose stored `app_state` settings are folded into a "Default
3//! alerts" subscription by the one-time legacy migration below).
4//!
5//! Each enabled [`WebhookSubscription`] is an independent at-least-once deliverer: it keeps its own
6//! persisted `cursor_at` (an `events.created_at`, mirroring the old notifier cursor), an event-type +
7//! severity filter, and an optional HMAC-SHA256 secret. Every tick we load the enabled subscriptions
8//! and, for each, deliver the events newer than its cursor that pass the filter — POSTing the JSON
9//! envelope with `X-Heldar-Event` / `X-Heldar-Delivery` / `X-Heldar-Timestamp` headers and, when a
10//! secret is set, `X-Heldar-Signature: sha256=<hex HMAC-SHA256(secret, raw_body)>`. Each attempt is
11//! recorded in `webhook_deliveries`; a retryable failure keeps the cursor (retried next cycle) until
12//! the per-event attempts in that ledger reach [`MAX_ATTEMPTS`], after which the event is given up on
13//! and the cursor advances so one bad endpoint cannot wedge the queue forever.
14//!
15//! `run()` NEVER returns: with no enabled subscriptions it idles the cycle. The supervisor in `main`
16//! therefore spawns it unconditionally and never tight-loops respawning it.
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use chrono::{DateTime, Utc};
22use hmac::{Hmac, Mac};
23use serde_json::{json, Value};
24use sha2::Sha256;
25use sqlx::SqlitePool;
26use uuid::Uuid;
27
28use crate::config::Config;
29use crate::models::{Event, WebhookSubscription};
30
31type HmacSha256 = Hmac<Sha256>;
32
33/// Per-cycle event fetch size (mirrors the old notifier batch).
34const BATCH: i64 = 100;
35/// Give up on an event after this many recorded failed attempts (so a dead endpoint can't wedge the
36/// per-subscription cursor forever — the cursor then advances past the poison event).
37const MAX_ATTEMPTS: i64 = 5;
38
39pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
40    // Built once, outside the loop, and reused across cycles. On the (practically impossible) build
41    // failure, park forever rather than return — returning would have the supervisor respawn us.
42    let client = match reqwest::Client::builder()
43        .timeout(Duration::from_secs(10))
44        .build()
45    {
46        Ok(c) => c,
47        Err(e) => {
48            tracing::error!(error = %e, "webhooks: failed to build http client; idling");
49            std::future::pending::<reqwest::Client>().await
50        }
51    };
52
53    // One-time: fold the legacy single-URL alerting webhook into a "Default alerts" subscription so it
54    // keeps delivering under the new model. Best-effort — a failure just logs and retries next boot.
55    if let Err(e) = migrate_legacy_alerting(&pool, &cfg).await {
56        tracing::warn!(error = %e, "webhooks: legacy alerting migration failed");
57    }
58
59    let mut tick = tokio::time::interval(Duration::from_secs(cfg.notifier_interval_s.max(5)));
60    loop {
61        tick.tick().await;
62        let subs = match load_enabled(&pool).await {
63            Ok(s) => s,
64            Err(e) => {
65                tracing::error!(error = %e, "webhooks: failed to load subscriptions");
66                continue;
67            }
68        };
69        if subs.is_empty() {
70            // No enabled subscriptions: idle quietly this cycle (never return — see the module note).
71            continue;
72        }
73        for sub in subs {
74            if let Err(e) = deliver_subscription(&pool, &client, &sub).await {
75                tracing::error!(error = %e, subscription = %sub.id, "webhooks: delivery cycle failed");
76            }
77        }
78    }
79}
80
81/// Load enabled subscriptions, oldest first (stable delivery order across cycles).
82async fn load_enabled(pool: &SqlitePool) -> sqlx::Result<Vec<WebhookSubscription>> {
83    sqlx::query_as::<_, WebhookSubscription>(
84        "SELECT * FROM webhook_subscriptions WHERE enabled = 1 ORDER BY created_at ASC",
85    )
86    .fetch_all(pool)
87    .await
88}
89
90/// Persist the advanced delivery cursor for one subscription.
91async fn save_cursor(pool: &SqlitePool, sub_id: &str, cursor: DateTime<Utc>) -> sqlx::Result<()> {
92    sqlx::query("UPDATE webhook_subscriptions SET cursor_at = ? WHERE id = ?")
93        .bind(cursor)
94        .bind(sub_id)
95        .execute(pool)
96        .await?;
97    Ok(())
98}
99
100/// Deliver every pending, matching event for one subscription, draining in batches until caught up or
101/// stopped at a retryable failure.
102async fn deliver_subscription(
103    pool: &SqlitePool,
104    client: &reqwest::Client,
105    sub: &WebhookSubscription,
106) -> anyhow::Result<()> {
107    // First time we see this subscription (cursor NULL): start at "now", no backlog replay — mirrors
108    // the old notifier so a subscription added LATER does not replay the full event history.
109    let Some(mut cursor) = sub.cursor_at else {
110        save_cursor(pool, &sub.id, Utc::now()).await?;
111        return Ok(());
112    };
113
114    loop {
115        let events = fetch_events(pool, cursor, &sub.min_severity).await?;
116        if events.is_empty() {
117            break;
118        }
119        let n = events.len();
120        let mut advanced = false;
121        for ev in events {
122            // Event-type filter: ["*"] matches all, otherwise exact membership. A non-match is not a
123            // delivery (nothing recorded) — just step the cursor past it.
124            if !matches_event_type(&sub.event_types.0, &ev.event_type) {
125                cursor = ev.created_at;
126                advanced = true;
127                continue;
128            }
129            match try_deliver(pool, client, sub, &ev).await {
130                DeliverOutcome::Advance => {
131                    cursor = ev.created_at;
132                    advanced = true;
133                }
134                DeliverOutcome::Retry => {
135                    // Retryable failure under the attempt bound: keep the cursor on this event (retry
136                    // next cycle) but persist progress made on earlier events in this batch.
137                    if advanced {
138                        save_cursor(pool, &sub.id, cursor).await?;
139                    }
140                    return Ok(());
141                }
142            }
143        }
144        if advanced {
145            save_cursor(pool, &sub.id, cursor).await?;
146        }
147        if n < BATCH as usize {
148            break;
149        }
150    }
151    Ok(())
152}
153
154/// Fetch the next batch of events newer than `cursor` that pass the severity floor, oldest first.
155async fn fetch_events(
156    pool: &SqlitePool,
157    cursor: DateTime<Utc>,
158    min_severity: &str,
159) -> sqlx::Result<Vec<Event>> {
160    let sql = format!(
161        "SELECT * FROM events
162         WHERE {} AND created_at > ?
163         ORDER BY created_at ASC LIMIT ?",
164        min_severity_sql(min_severity),
165    );
166    sqlx::query_as::<_, Event>(&sql)
167        .bind(cursor)
168        .bind(BATCH)
169        .fetch_all(pool)
170        .await
171}
172
173enum DeliverOutcome {
174    /// Move the cursor past this event (delivered, or given up on after the attempt bound).
175    Advance,
176    /// Retryable failure under the attempt bound: stop this subscription's cycle, keep the cursor.
177    Retry,
178}
179
180/// Attempt to deliver one event, recording the attempt in the `webhook_deliveries` ledger.
181async fn try_deliver(
182    pool: &SqlitePool,
183    client: &reqwest::Client,
184    sub: &WebhookSubscription,
185    ev: &Event,
186) -> DeliverOutcome {
187    let prior_failures: i64 = sqlx::query_scalar(
188        "SELECT COUNT(*) FROM webhook_deliveries
189          WHERE subscription_id = ? AND event_id = ? AND status = 'failed'",
190    )
191    .bind(&sub.id)
192    .bind(&ev.id)
193    .fetch_one(pool)
194    .await
195    .unwrap_or(0);
196    let attempt = prior_failures + 1;
197
198    let delivery_id = format!("whd_{}", Uuid::new_v4().simple());
199    let body = event_body(ev);
200    let res = send_event(
201        client,
202        &sub.url,
203        &delivery_id,
204        &ev.event_type,
205        sub.secret.as_deref(),
206        &body,
207    )
208    .await;
209
210    record_delivery(
211        pool,
212        &delivery_id,
213        &sub.id,
214        Some(&ev.id),
215        Some(&ev.event_type),
216        res.ok,
217        attempt,
218        res.status.map(i64::from),
219        res.error.as_deref(),
220    )
221    .await;
222
223    if res.ok {
224        DeliverOutcome::Advance
225    } else if attempt >= MAX_ATTEMPTS {
226        tracing::warn!(
227            subscription = %sub.id,
228            event = %ev.id,
229            attempts = attempt,
230            "webhooks: giving up on event after max attempts; advancing cursor past it"
231        );
232        DeliverOutcome::Advance
233    } else {
234        tracing::warn!(
235            subscription = %sub.id,
236            event = %ev.id,
237            attempt,
238            error = res.error.as_deref().unwrap_or("non-2xx"),
239            "webhooks: delivery failed; will retry next cycle"
240        );
241        DeliverOutcome::Retry
242    }
243}
244
245/// The JSON envelope POSTed for an event (the body that is HMAC-signed verbatim).
246pub fn event_body(ev: &Event) -> Value {
247    json!({
248        "id": ev.id,
249        "camera_id": ev.camera_id,
250        "site_id": ev.site_id,
251        "event_type": ev.event_type,
252        "severity": ev.severity,
253        "timestamp": ev.timestamp,
254        "payload": ev.payload.0,
255    })
256}
257
258/// Outcome of a single signed POST: success flag, HTTP status (if a response came back), and an error
259/// string for the delivery ledger.
260pub struct SendResult {
261    pub ok: bool,
262    pub status: Option<u16>,
263    pub error: Option<String>,
264}
265
266/// POST a signed webhook body. The body is serialized ONCE and both signed and sent verbatim so the
267/// `X-Heldar-Signature` always covers the exact bytes the receiver gets. Used by the delivery loop and
268/// by the synthetic `/test` route.
269pub async fn send_event(
270    client: &reqwest::Client,
271    url: &str,
272    delivery_id: &str,
273    event_type: &str,
274    secret: Option<&str>,
275    body: &Value,
276) -> SendResult {
277    let raw = serde_json::to_string(body).unwrap_or_else(|_| "{}".to_string());
278    let mut req = client
279        .post(url)
280        .header(reqwest::header::CONTENT_TYPE, "application/json")
281        .header("X-Heldar-Event", event_type)
282        .header("X-Heldar-Delivery", delivery_id)
283        .header("X-Heldar-Timestamp", Utc::now().timestamp().to_string())
284        .body(raw.clone());
285    if let Some(secret) = secret.filter(|s| !s.is_empty()) {
286        req = req.header("X-Heldar-Signature", sign(secret, raw.as_bytes()));
287    }
288    match req.send().await {
289        Ok(resp) => {
290            let status = resp.status();
291            SendResult {
292                ok: status.is_success(),
293                status: Some(status.as_u16()),
294                error: if status.is_success() {
295                    None
296                } else {
297                    Some(format!("webhook returned HTTP {}", status.as_u16()))
298                },
299            }
300        }
301        Err(e) => SendResult {
302            ok: false,
303            status: None,
304            error: Some(e.to_string()),
305        },
306    }
307}
308
309/// Insert one row into the `webhook_deliveries` ledger. Best-effort: a failure is logged, not fatal.
310#[allow(clippy::too_many_arguments)]
311pub async fn record_delivery(
312    pool: &SqlitePool,
313    id: &str,
314    subscription_id: &str,
315    event_id: Option<&str>,
316    event_type: Option<&str>,
317    delivered: bool,
318    attempts: i64,
319    response_code: Option<i64>,
320    error: Option<&str>,
321) {
322    let now = Utc::now();
323    let delivered_at = if delivered { Some(now) } else { None };
324    let status = if delivered { "delivered" } else { "failed" };
325    let res = sqlx::query(
326        "INSERT INTO webhook_deliveries
327           (id, subscription_id, event_id, event_type, status, attempts, response_code, error, created_at, delivered_at)
328         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
329    )
330    .bind(id)
331    .bind(subscription_id)
332    .bind(event_id)
333    .bind(event_type)
334    .bind(status)
335    .bind(attempts)
336    .bind(response_code)
337    .bind(error)
338    .bind(now)
339    .bind(delivered_at)
340    .execute(pool)
341    .await;
342    if let Err(e) = res {
343        tracing::error!(error = %e, subscription = %subscription_id, "webhooks: failed to record delivery");
344    }
345}
346
347/// `sha256=<hex>` where `<hex>` is HMAC-SHA256(secret, body). HMAC accepts any key length.
348fn sign(secret: &str, body: &[u8]) -> String {
349    let mut mac =
350        HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts a key of any length");
351    mac.update(body);
352    format!(
353        "sha256={}",
354        crate::auth::hex_encode(&mac.finalize().into_bytes())
355    )
356}
357
358/// Whether `event_type` is selected by a subscription's filter. `["*"]` matches everything; otherwise
359/// it is exact membership.
360pub fn matches_event_type(filter: &[String], event_type: &str) -> bool {
361    filter.iter().any(|t| t == "*") || filter.iter().any(|t| t == event_type)
362}
363
364/// SQL predicate selecting the severities at or above `min_severity`. Values are static literals (never
365/// user input) so this is safe to splice into the query. `info` (or unknown) admits all severities.
366fn min_severity_sql(min_severity: &str) -> &'static str {
367    match min_severity {
368        "critical" => "severity = 'critical'",
369        "warning" => "severity IN ('warning', 'critical')",
370        _ => "1 = 1",
371    }
372}
373
374/// Read one `app_state` value (the legacy single-URL alerting settings were persisted here, keyed by
375/// `alert_webhook_url` / `alert_enabled` / `alert_min_severity`).
376async fn app_state(pool: &SqlitePool, key: &str) -> Option<String> {
377    sqlx::query_scalar::<_, String>("SELECT value FROM app_state WHERE key = ?")
378        .bind(key)
379        .fetch_optional(pool)
380        .await
381        .ok()
382        .flatten()
383}
384
385/// Fold the legacy single-URL alerting webhook (app_state / `HELDAR_ALERT_WEBHOOK_URL`) into a
386/// "Default alerts" subscription, once, so it keeps delivering under the subscription model. No-op when
387/// no legacy webhook is configured or a subscription already targets that url. Starts the new cursor at
388/// "now" so the upgrade does not replay historical events.
389async fn migrate_legacy_alerting(pool: &SqlitePool, cfg: &Config) -> sqlx::Result<()> {
390    // Resolve the legacy settings the old `services::alerting` module used to own: the stored webhook
391    // (app_state `alert_webhook_url`), falling back to the `HELDAR_ALERT_WEBHOOK_URL` env value, plus
392    // the stored enabled flag (default true) and severity floor (default `warning`).
393    let stored = app_state(pool, "alert_webhook_url")
394        .await
395        .map(|s| s.trim().to_string())
396        .filter(|s| !s.is_empty());
397    let Some(url) = stored.or_else(|| {
398        cfg.alert_webhook_url
399            .as_deref()
400            .map(str::trim)
401            .filter(|s| !s.is_empty())
402            .map(str::to_string)
403    }) else {
404        return Ok(());
405    };
406    let enabled = !matches!(
407        app_state(pool, "alert_enabled").await.as_deref(),
408        Some("false")
409    );
410    let min_severity = match app_state(pool, "alert_min_severity").await.as_deref() {
411        Some("critical") => "critical",
412        _ => "warning",
413    };
414
415    let exists: Option<i64> =
416        sqlx::query_scalar("SELECT 1 FROM webhook_subscriptions WHERE url = ? LIMIT 1")
417            .bind(&url)
418            .fetch_optional(pool)
419            .await?;
420    if exists.is_some() {
421        return Ok(());
422    }
423    let now = Utc::now();
424    let id = format!("whs_{}", Uuid::new_v4().simple());
425    sqlx::query(
426        "INSERT INTO webhook_subscriptions
427           (id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
428         VALUES (?, 'Default alerts', ?, '[\"*\"]', ?, NULL, ?, ?, ?, ?)",
429    )
430    .bind(&id)
431    .bind(&url)
432    .bind(min_severity)
433    .bind(i64::from(enabled))
434    .bind(now)
435    .bind(now)
436    .bind(now)
437    .execute(pool)
438    .await?;
439    tracing::info!(
440        masked = crate::models::mask_webhook_url(&url),
441        enabled,
442        "webhooks: migrated legacy alerting webhook into a 'Default alerts' subscription"
443    );
444    Ok(())
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[test]
452    fn wildcard_matches_everything() {
453        let star = vec!["*".to_string()];
454        assert!(matches_event_type(&star, "zone_enter"));
455        assert!(matches_event_type(&star, "anything_at_all"));
456    }
457
458    #[test]
459    fn explicit_set_is_exact_membership() {
460        let set = vec!["zone_enter".to_string(), "disk_pressure".to_string()];
461        assert!(matches_event_type(&set, "zone_enter"));
462        assert!(matches_event_type(&set, "disk_pressure"));
463        assert!(!matches_event_type(&set, "zone_exit"));
464        assert!(!matches_event_type(&[], "zone_enter"));
465    }
466
467    #[test]
468    fn severity_floor_thresholds() {
469        assert_eq!(min_severity_sql("critical"), "severity = 'critical'");
470        assert_eq!(
471            min_severity_sql("warning"),
472            "severity IN ('warning', 'critical')"
473        );
474        // info (and any unknown value) admits all severities.
475        assert_eq!(min_severity_sql("info"), "1 = 1");
476        assert_eq!(min_severity_sql("whatever"), "1 = 1");
477    }
478
479    #[test]
480    fn signature_is_stable_prefixed_hmac_sha256() {
481        // Known-answer: HMAC-SHA256(key="key", msg="The quick brown fox jumps over the lazy dog").
482        let sig = sign("key", b"The quick brown fox jumps over the lazy dog");
483        assert_eq!(
484            sig,
485            "sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
486        );
487        // Stable + key-sensitive.
488        assert_eq!(sign("s", b"body"), sign("s", b"body"));
489        assert_ne!(sign("s1", b"body"), sign("s2", b"body"));
490    }
491}