1use std::sync::Arc;
19use std::time::Duration;
20
21use chrono::{DateTime, Utc};
22use hmac::{Hmac, Mac};
23use serde_json::{json, Value};
24use sha2::Sha256;
25use sqlx::SqlitePool;
26use uuid::Uuid;
27
28use crate::config::Config;
29use crate::models::{Event, WebhookSubscription};
30
31type HmacSha256 = Hmac<Sha256>;
32
33const BATCH: i64 = 100;
35const MAX_ATTEMPTS: i64 = 5;
38
39pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
40 let client = match reqwest::Client::builder()
43 .timeout(Duration::from_secs(10))
44 .build()
45 {
46 Ok(c) => c,
47 Err(e) => {
48 tracing::error!(error = %e, "webhooks: failed to build http client; idling");
49 std::future::pending::<reqwest::Client>().await
50 }
51 };
52
53 if let Err(e) = migrate_legacy_alerting(&pool, &cfg).await {
56 tracing::warn!(error = %e, "webhooks: legacy alerting migration failed");
57 }
58
59 let mut tick = tokio::time::interval(Duration::from_secs(cfg.notifier_interval_s.max(5)));
60 loop {
61 tick.tick().await;
62 let subs = match load_enabled(&pool).await {
63 Ok(s) => s,
64 Err(e) => {
65 tracing::error!(error = %e, "webhooks: failed to load subscriptions");
66 continue;
67 }
68 };
69 if subs.is_empty() {
70 continue;
72 }
73 for sub in subs {
74 if let Err(e) = deliver_subscription(&pool, &client, &sub).await {
75 tracing::error!(error = %e, subscription = %sub.id, "webhooks: delivery cycle failed");
76 }
77 }
78 }
79}
80
81async fn load_enabled(pool: &SqlitePool) -> sqlx::Result<Vec<WebhookSubscription>> {
83 sqlx::query_as::<_, WebhookSubscription>(
84 "SELECT * FROM webhook_subscriptions WHERE enabled = 1 ORDER BY created_at ASC",
85 )
86 .fetch_all(pool)
87 .await
88}
89
90async fn save_cursor(pool: &SqlitePool, sub_id: &str, cursor: DateTime<Utc>) -> sqlx::Result<()> {
92 sqlx::query("UPDATE webhook_subscriptions SET cursor_at = ? WHERE id = ?")
93 .bind(cursor)
94 .bind(sub_id)
95 .execute(pool)
96 .await?;
97 Ok(())
98}
99
100async fn deliver_subscription(
103 pool: &SqlitePool,
104 client: &reqwest::Client,
105 sub: &WebhookSubscription,
106) -> anyhow::Result<()> {
107 let Some(mut cursor) = sub.cursor_at else {
110 save_cursor(pool, &sub.id, Utc::now()).await?;
111 return Ok(());
112 };
113
114 loop {
115 let events = fetch_events(pool, cursor, &sub.min_severity).await?;
116 if events.is_empty() {
117 break;
118 }
119 let n = events.len();
120 let mut advanced = false;
121 for ev in events {
122 if !matches_event_type(&sub.event_types.0, &ev.event_type) {
125 cursor = ev.created_at;
126 advanced = true;
127 continue;
128 }
129 match try_deliver(pool, client, sub, &ev).await {
130 DeliverOutcome::Advance => {
131 cursor = ev.created_at;
132 advanced = true;
133 }
134 DeliverOutcome::Retry => {
135 if advanced {
138 save_cursor(pool, &sub.id, cursor).await?;
139 }
140 return Ok(());
141 }
142 }
143 }
144 if advanced {
145 save_cursor(pool, &sub.id, cursor).await?;
146 }
147 if n < BATCH as usize {
148 break;
149 }
150 }
151 Ok(())
152}
153
154async fn fetch_events(
156 pool: &SqlitePool,
157 cursor: DateTime<Utc>,
158 min_severity: &str,
159) -> sqlx::Result<Vec<Event>> {
160 let sql = format!(
161 "SELECT * FROM events
162 WHERE {} AND created_at > ?
163 ORDER BY created_at ASC LIMIT ?",
164 min_severity_sql(min_severity),
165 );
166 sqlx::query_as::<_, Event>(&sql)
167 .bind(cursor)
168 .bind(BATCH)
169 .fetch_all(pool)
170 .await
171}
172
173enum DeliverOutcome {
174 Advance,
176 Retry,
178}
179
180async fn try_deliver(
182 pool: &SqlitePool,
183 client: &reqwest::Client,
184 sub: &WebhookSubscription,
185 ev: &Event,
186) -> DeliverOutcome {
187 let prior_failures: i64 = sqlx::query_scalar(
188 "SELECT COUNT(*) FROM webhook_deliveries
189 WHERE subscription_id = ? AND event_id = ? AND status = 'failed'",
190 )
191 .bind(&sub.id)
192 .bind(&ev.id)
193 .fetch_one(pool)
194 .await
195 .unwrap_or(0);
196 let attempt = prior_failures + 1;
197
198 let delivery_id = format!("whd_{}", Uuid::new_v4().simple());
199 let body = event_body(ev);
200 let res = send_event(
201 client,
202 &sub.url,
203 &delivery_id,
204 &ev.event_type,
205 sub.secret.as_deref(),
206 &body,
207 )
208 .await;
209
210 record_delivery(
211 pool,
212 &delivery_id,
213 &sub.id,
214 Some(&ev.id),
215 Some(&ev.event_type),
216 res.ok,
217 attempt,
218 res.status.map(i64::from),
219 res.error.as_deref(),
220 )
221 .await;
222
223 if res.ok {
224 DeliverOutcome::Advance
225 } else if attempt >= MAX_ATTEMPTS {
226 tracing::warn!(
227 subscription = %sub.id,
228 event = %ev.id,
229 attempts = attempt,
230 "webhooks: giving up on event after max attempts; advancing cursor past it"
231 );
232 DeliverOutcome::Advance
233 } else {
234 tracing::warn!(
235 subscription = %sub.id,
236 event = %ev.id,
237 attempt,
238 error = res.error.as_deref().unwrap_or("non-2xx"),
239 "webhooks: delivery failed; will retry next cycle"
240 );
241 DeliverOutcome::Retry
242 }
243}
244
245pub fn event_body(ev: &Event) -> Value {
247 json!({
248 "id": ev.id,
249 "camera_id": ev.camera_id,
250 "site_id": ev.site_id,
251 "event_type": ev.event_type,
252 "severity": ev.severity,
253 "timestamp": ev.timestamp,
254 "payload": ev.payload.0,
255 })
256}
257
258pub struct SendResult {
261 pub ok: bool,
262 pub status: Option<u16>,
263 pub error: Option<String>,
264}
265
266pub async fn send_event(
270 client: &reqwest::Client,
271 url: &str,
272 delivery_id: &str,
273 event_type: &str,
274 secret: Option<&str>,
275 body: &Value,
276) -> SendResult {
277 let raw = serde_json::to_string(body).unwrap_or_else(|_| "{}".to_string());
278 let mut req = client
279 .post(url)
280 .header(reqwest::header::CONTENT_TYPE, "application/json")
281 .header("X-Heldar-Event", event_type)
282 .header("X-Heldar-Delivery", delivery_id)
283 .header("X-Heldar-Timestamp", Utc::now().timestamp().to_string())
284 .body(raw.clone());
285 if let Some(secret) = secret.filter(|s| !s.is_empty()) {
286 req = req.header("X-Heldar-Signature", sign(secret, raw.as_bytes()));
287 }
288 match req.send().await {
289 Ok(resp) => {
290 let status = resp.status();
291 SendResult {
292 ok: status.is_success(),
293 status: Some(status.as_u16()),
294 error: if status.is_success() {
295 None
296 } else {
297 Some(format!("webhook returned HTTP {}", status.as_u16()))
298 },
299 }
300 }
301 Err(e) => SendResult {
302 ok: false,
303 status: None,
304 error: Some(e.to_string()),
305 },
306 }
307}
308
309#[allow(clippy::too_many_arguments)]
311pub async fn record_delivery(
312 pool: &SqlitePool,
313 id: &str,
314 subscription_id: &str,
315 event_id: Option<&str>,
316 event_type: Option<&str>,
317 delivered: bool,
318 attempts: i64,
319 response_code: Option<i64>,
320 error: Option<&str>,
321) {
322 let now = Utc::now();
323 let delivered_at = if delivered { Some(now) } else { None };
324 let status = if delivered { "delivered" } else { "failed" };
325 let res = sqlx::query(
326 "INSERT INTO webhook_deliveries
327 (id, subscription_id, event_id, event_type, status, attempts, response_code, error, created_at, delivered_at)
328 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
329 )
330 .bind(id)
331 .bind(subscription_id)
332 .bind(event_id)
333 .bind(event_type)
334 .bind(status)
335 .bind(attempts)
336 .bind(response_code)
337 .bind(error)
338 .bind(now)
339 .bind(delivered_at)
340 .execute(pool)
341 .await;
342 if let Err(e) = res {
343 tracing::error!(error = %e, subscription = %subscription_id, "webhooks: failed to record delivery");
344 }
345}
346
347fn sign(secret: &str, body: &[u8]) -> String {
349 let mut mac =
350 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts a key of any length");
351 mac.update(body);
352 format!(
353 "sha256={}",
354 crate::auth::hex_encode(&mac.finalize().into_bytes())
355 )
356}
357
358pub fn matches_event_type(filter: &[String], event_type: &str) -> bool {
361 filter.iter().any(|t| t == "*") || filter.iter().any(|t| t == event_type)
362}
363
364fn min_severity_sql(min_severity: &str) -> &'static str {
367 match min_severity {
368 "critical" => "severity = 'critical'",
369 "warning" => "severity IN ('warning', 'critical')",
370 _ => "1 = 1",
371 }
372}
373
374async fn app_state(pool: &SqlitePool, key: &str) -> Option<String> {
377 sqlx::query_scalar::<_, String>("SELECT value FROM app_state WHERE key = ?")
378 .bind(key)
379 .fetch_optional(pool)
380 .await
381 .ok()
382 .flatten()
383}
384
385async fn migrate_legacy_alerting(pool: &SqlitePool, cfg: &Config) -> sqlx::Result<()> {
390 let stored = app_state(pool, "alert_webhook_url")
394 .await
395 .map(|s| s.trim().to_string())
396 .filter(|s| !s.is_empty());
397 let Some(url) = stored.or_else(|| {
398 cfg.alert_webhook_url
399 .as_deref()
400 .map(str::trim)
401 .filter(|s| !s.is_empty())
402 .map(str::to_string)
403 }) else {
404 return Ok(());
405 };
406 let enabled = !matches!(
407 app_state(pool, "alert_enabled").await.as_deref(),
408 Some("false")
409 );
410 let min_severity = match app_state(pool, "alert_min_severity").await.as_deref() {
411 Some("critical") => "critical",
412 _ => "warning",
413 };
414
415 let exists: Option<i64> =
416 sqlx::query_scalar("SELECT 1 FROM webhook_subscriptions WHERE url = ? LIMIT 1")
417 .bind(&url)
418 .fetch_optional(pool)
419 .await?;
420 if exists.is_some() {
421 return Ok(());
422 }
423 let now = Utc::now();
424 let id = format!("whs_{}", Uuid::new_v4().simple());
425 sqlx::query(
426 "INSERT INTO webhook_subscriptions
427 (id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
428 VALUES (?, 'Default alerts', ?, '[\"*\"]', ?, NULL, ?, ?, ?, ?)",
429 )
430 .bind(&id)
431 .bind(&url)
432 .bind(min_severity)
433 .bind(i64::from(enabled))
434 .bind(now)
435 .bind(now)
436 .bind(now)
437 .execute(pool)
438 .await?;
439 tracing::info!(
440 masked = crate::models::mask_webhook_url(&url),
441 enabled,
442 "webhooks: migrated legacy alerting webhook into a 'Default alerts' subscription"
443 );
444 Ok(())
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 #[test]
452 fn wildcard_matches_everything() {
453 let star = vec!["*".to_string()];
454 assert!(matches_event_type(&star, "zone_enter"));
455 assert!(matches_event_type(&star, "anything_at_all"));
456 }
457
458 #[test]
459 fn explicit_set_is_exact_membership() {
460 let set = vec!["zone_enter".to_string(), "disk_pressure".to_string()];
461 assert!(matches_event_type(&set, "zone_enter"));
462 assert!(matches_event_type(&set, "disk_pressure"));
463 assert!(!matches_event_type(&set, "zone_exit"));
464 assert!(!matches_event_type(&[], "zone_enter"));
465 }
466
467 #[test]
468 fn severity_floor_thresholds() {
469 assert_eq!(min_severity_sql("critical"), "severity = 'critical'");
470 assert_eq!(
471 min_severity_sql("warning"),
472 "severity IN ('warning', 'critical')"
473 );
474 assert_eq!(min_severity_sql("info"), "1 = 1");
476 assert_eq!(min_severity_sql("whatever"), "1 = 1");
477 }
478
479 #[test]
480 fn signature_is_stable_prefixed_hmac_sha256() {
481 let sig = sign("key", b"The quick brown fox jumps over the lazy dog");
483 assert_eq!(
484 sig,
485 "sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
486 );
487 assert_eq!(sign("s", b"body"), sign("s", b"body"));
489 assert_ne!(sign("s1", b"body"), sign("s2", b"body"));
490 }
491}