heldar_kernel/services/
notifier.rs1use std::sync::Arc;
8use std::time::Duration;
9
10use chrono::{DateTime, Utc};
11use serde_json::json;
12use sqlx::SqlitePool;
13
14use crate::config::Config;
15use crate::models::Event;
16
17const CURSOR_KEY: &str = "notifier_cursor";
18const BATCH: i64 = 100;
19
20pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
21 let Some(url) = cfg.alert_webhook_url.clone() else {
22 tracing::info!("notifier: no HELDAR_ALERT_WEBHOOK_URL set; alerting disabled");
23 return;
24 };
25 let client = match reqwest::Client::builder()
26 .timeout(Duration::from_secs(10))
27 .build()
28 {
29 Ok(c) => c,
30 Err(e) => {
31 tracing::error!(error = %e, "notifier: failed to build http client");
32 return;
33 }
34 };
35 tracing::info!(%url, "notifier: alerting enabled (warning/critical events)");
36
37 let mut cursor = match load_cursor(&pool).await {
39 Some(c) => c,
40 None => {
41 let now = Utc::now();
42 let _ = save_cursor(&pool, now).await;
43 now
44 }
45 };
46
47 let mut tick = tokio::time::interval(Duration::from_secs(cfg.notifier_interval_s.max(5)));
48 loop {
49 tick.tick().await;
50 loop {
52 match deliver_batch(&pool, &client, &url, cursor).await {
53 Ok(Some((latest, n))) => {
54 cursor = latest;
55 let _ = save_cursor(&pool, cursor).await;
56 if n < BATCH as usize {
57 break;
58 }
59 }
60 Ok(None) => break,
61 Err(e) => {
62 tracing::error!(error = %e, "notifier: delivery cycle failed");
63 break;
64 }
65 }
66 }
67 }
68}
69
70async fn load_cursor(pool: &SqlitePool) -> Option<DateTime<Utc>> {
71 let v: Option<String> = sqlx::query_scalar("SELECT value FROM app_state WHERE key = ?")
72 .bind(CURSOR_KEY)
73 .fetch_optional(pool)
74 .await
75 .ok()
76 .flatten();
77 v.and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
78 .map(|d| d.with_timezone(&Utc))
79}
80
81async fn save_cursor(pool: &SqlitePool, cursor: DateTime<Utc>) -> sqlx::Result<()> {
82 sqlx::query(
83 "INSERT INTO app_state (key, value, updated_at) VALUES (?, ?, ?)
84 ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at",
85 )
86 .bind(CURSOR_KEY)
87 .bind(cursor.to_rfc3339())
88 .bind(Utc::now())
89 .execute(pool)
90 .await?;
91 Ok(())
92}
93
94async fn deliver_batch(
98 pool: &SqlitePool,
99 client: &reqwest::Client,
100 url: &str,
101 cursor: DateTime<Utc>,
102) -> anyhow::Result<Option<(DateTime<Utc>, usize)>> {
103 let events = sqlx::query_as::<_, Event>(
104 "SELECT * FROM events
105 WHERE severity IN ('warning', 'critical') AND created_at > ?
106 ORDER BY created_at ASC LIMIT ?",
107 )
108 .bind(cursor)
109 .bind(BATCH)
110 .fetch_all(pool)
111 .await?;
112 if events.is_empty() {
113 return Ok(None);
114 }
115
116 let mut latest: Option<DateTime<Utc>> = None;
117 let mut delivered = 0usize;
118 for ev in events {
119 let body = json!({
120 "source": "heldar-core",
121 "event_id": ev.id,
122 "event_type": ev.event_type,
123 "severity": ev.severity,
124 "camera_id": ev.camera_id,
125 "timestamp": ev.timestamp,
126 "payload": ev.payload.0,
127 });
128 match client.post(url).json(&body).send().await {
129 Ok(resp) if resp.status().is_success() => {}
130 Ok(resp) => {
131 let code = resp.status();
132 if code.is_server_error() || code.as_u16() == 429 {
133 tracing::warn!(status = %code, event = %ev.event_type, "notifier: retryable webhook failure; will retry next cycle");
135 return Ok(latest.map(|l| (l, delivered)));
136 }
137 tracing::warn!(status = %code, event = %ev.event_type, "notifier: webhook rejected event; skipping");
139 }
140 Err(e) => {
141 tracing::warn!(error = %e, "notifier: webhook post failed; will retry next cycle");
142 return Ok(latest.map(|l| (l, delivered)));
143 }
144 }
145 latest = Some(ev.created_at);
146 delivered += 1;
147 }
148 Ok(latest.map(|l| (l, delivered)))
149}