Skip to main content

heldar_kernel/services/
notifier.rs

1//! Alert notifier: delivers new warning/critical events to a configured webhook (POST JSON).
2//! The delivery cursor is persisted (survives restarts, so events generated during downtime are
3//! still delivered); retryable failures (5xx / 429 / network) do not advance the cursor.
4//! `main` only supervises this task when a webhook is configured, so the no-webhook path returns
5//! once without a respawn storm.
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use chrono::{DateTime, Utc};
11use serde_json::json;
12use sqlx::SqlitePool;
13
14use crate::config::Config;
15use crate::models::Event;
16
17const CURSOR_KEY: &str = "notifier_cursor";
18const BATCH: i64 = 100;
19
20pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
21    let Some(url) = cfg.alert_webhook_url.clone() else {
22        tracing::info!("notifier: no HELDAR_ALERT_WEBHOOK_URL set; alerting disabled");
23        return;
24    };
25    let client = match reqwest::Client::builder()
26        .timeout(Duration::from_secs(10))
27        .build()
28    {
29        Ok(c) => c,
30        Err(e) => {
31            tracing::error!(error = %e, "notifier: failed to build http client");
32            return;
33        }
34    };
35    tracing::info!(%url, "notifier: alerting enabled (warning/critical events)");
36
37    // Resume from the persisted cursor; the first ever run starts at "now" (no history replay).
38    let mut cursor = match load_cursor(&pool).await {
39        Some(c) => c,
40        None => {
41            let now = Utc::now();
42            let _ = save_cursor(&pool, now).await;
43            now
44        }
45    };
46
47    let mut tick = tokio::time::interval(Duration::from_secs(cfg.notifier_interval_s.max(5)));
48    loop {
49        tick.tick().await;
50        // Drain until a batch comes back not-full (backlog cleared) or a failure stops progress.
51        loop {
52            match deliver_batch(&pool, &client, &url, cursor).await {
53                Ok(Some((latest, n))) => {
54                    cursor = latest;
55                    let _ = save_cursor(&pool, cursor).await;
56                    if n < BATCH as usize {
57                        break;
58                    }
59                }
60                Ok(None) => break,
61                Err(e) => {
62                    tracing::error!(error = %e, "notifier: delivery cycle failed");
63                    break;
64                }
65            }
66        }
67    }
68}
69
70async fn load_cursor(pool: &SqlitePool) -> Option<DateTime<Utc>> {
71    let v: Option<String> = sqlx::query_scalar("SELECT value FROM app_state WHERE key = ?")
72        .bind(CURSOR_KEY)
73        .fetch_optional(pool)
74        .await
75        .ok()
76        .flatten();
77    v.and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
78        .map(|d| d.with_timezone(&Utc))
79}
80
81async fn save_cursor(pool: &SqlitePool, cursor: DateTime<Utc>) -> sqlx::Result<()> {
82    sqlx::query(
83        "INSERT INTO app_state (key, value, updated_at) VALUES (?, ?, ?)
84         ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at",
85    )
86    .bind(CURSOR_KEY)
87    .bind(cursor.to_rfc3339())
88    .bind(Utc::now())
89    .execute(pool)
90    .await?;
91    Ok(())
92}
93
94/// Deliver one batch of events newer than `cursor`. Returns `Some((new_cursor, delivered))` after
95/// any progress, or `None` if there is nothing to deliver or delivery stopped at a retryable
96/// failure before any event was delivered (cursor must not advance past the failing event).
97async fn deliver_batch(
98    pool: &SqlitePool,
99    client: &reqwest::Client,
100    url: &str,
101    cursor: DateTime<Utc>,
102) -> anyhow::Result<Option<(DateTime<Utc>, usize)>> {
103    let events = sqlx::query_as::<_, Event>(
104        "SELECT * FROM events
105         WHERE severity IN ('warning', 'critical') AND created_at > ?
106         ORDER BY created_at ASC LIMIT ?",
107    )
108    .bind(cursor)
109    .bind(BATCH)
110    .fetch_all(pool)
111    .await?;
112    if events.is_empty() {
113        return Ok(None);
114    }
115
116    let mut latest: Option<DateTime<Utc>> = None;
117    let mut delivered = 0usize;
118    for ev in events {
119        let body = json!({
120            "source": "heldar-core",
121            "event_id": ev.id,
122            "event_type": ev.event_type,
123            "severity": ev.severity,
124            "camera_id": ev.camera_id,
125            "timestamp": ev.timestamp,
126            "payload": ev.payload.0,
127        });
128        match client.post(url).json(&body).send().await {
129            Ok(resp) if resp.status().is_success() => {}
130            Ok(resp) => {
131                let code = resp.status();
132                if code.is_server_error() || code.as_u16() == 429 {
133                    // Retryable: stop without advancing past this event.
134                    tracing::warn!(status = %code, event = %ev.event_type, "notifier: retryable webhook failure; will retry next cycle");
135                    return Ok(latest.map(|l| (l, delivered)));
136                }
137                // Other 4xx: the event won't ever be accepted; log and skip past it.
138                tracing::warn!(status = %code, event = %ev.event_type, "notifier: webhook rejected event; skipping");
139            }
140            Err(e) => {
141                tracing::warn!(error = %e, "notifier: webhook post failed; will retry next cycle");
142                return Ok(latest.map(|l| (l, delivered)));
143            }
144        }
145        latest = Some(ev.created_at);
146        delivered += 1;
147    }
148    Ok(latest.map(|l| (l, delivered)))
149}