1use std::sync::Arc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use hmac::{Hmac, Mac};
22use serde_json::{json, Value};
23use sha2::Sha256;
24use sqlx::SqlitePool;
25use uuid::Uuid;
26
27use crate::config::Config;
28use crate::models::{Event, WebhookSubscription};
29
30type HmacSha256 = Hmac<Sha256>;
31
32const BATCH: i64 = 100;
34const MAX_ATTEMPTS: i64 = 5;
37
38pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
39 let client = match reqwest::Client::builder()
42 .timeout(Duration::from_secs(10))
43 .build()
44 {
45 Ok(c) => c,
46 Err(e) => {
47 tracing::error!(error = %e, "webhooks: failed to build http client; idling");
48 std::future::pending::<reqwest::Client>().await
49 }
50 };
51
52 let mut tick = tokio::time::interval(Duration::from_secs(cfg.notifier_interval_s.max(5)));
53 loop {
54 tick.tick().await;
55 let subs = match load_enabled(&pool).await {
56 Ok(s) => s,
57 Err(e) => {
58 tracing::error!(error = %e, "webhooks: failed to load subscriptions");
59 continue;
60 }
61 };
62 if subs.is_empty() {
63 continue;
65 }
66 for sub in subs {
67 if let Err(e) = deliver_subscription(&pool, &client, &sub).await {
68 tracing::error!(error = %e, subscription = %sub.id, "webhooks: delivery cycle failed");
69 }
70 }
71 }
72}
73
74async fn load_enabled(pool: &SqlitePool) -> sqlx::Result<Vec<WebhookSubscription>> {
76 sqlx::query_as::<_, WebhookSubscription>(
77 "SELECT * FROM webhook_subscriptions WHERE enabled = 1 ORDER BY created_at ASC",
78 )
79 .fetch_all(pool)
80 .await
81}
82
83async fn save_cursor(pool: &SqlitePool, sub_id: &str, cursor: DateTime<Utc>) -> sqlx::Result<()> {
85 sqlx::query("UPDATE webhook_subscriptions SET cursor_at = ? WHERE id = ?")
86 .bind(cursor)
87 .bind(sub_id)
88 .execute(pool)
89 .await?;
90 Ok(())
91}
92
93async fn deliver_subscription(
96 pool: &SqlitePool,
97 client: &reqwest::Client,
98 sub: &WebhookSubscription,
99) -> anyhow::Result<()> {
100 let Some(mut cursor) = sub.cursor_at else {
103 save_cursor(pool, &sub.id, Utc::now()).await?;
104 return Ok(());
105 };
106
107 loop {
108 let events = fetch_events(pool, cursor, &sub.min_severity).await?;
109 if events.is_empty() {
110 break;
111 }
112 let n = events.len();
113 let mut advanced = false;
114 for ev in events {
115 if !matches_event_type(&sub.event_types.0, &ev.event_type) {
118 cursor = ev.created_at;
119 advanced = true;
120 continue;
121 }
122 match try_deliver(pool, client, sub, &ev).await {
123 DeliverOutcome::Advance => {
124 cursor = ev.created_at;
125 advanced = true;
126 }
127 DeliverOutcome::Retry => {
128 if advanced {
131 save_cursor(pool, &sub.id, cursor).await?;
132 }
133 return Ok(());
134 }
135 }
136 }
137 if advanced {
138 save_cursor(pool, &sub.id, cursor).await?;
139 }
140 if n < BATCH as usize {
141 break;
142 }
143 }
144 Ok(())
145}
146
147async fn fetch_events(
149 pool: &SqlitePool,
150 cursor: DateTime<Utc>,
151 min_severity: &str,
152) -> sqlx::Result<Vec<Event>> {
153 let sql = format!(
154 "SELECT * FROM events
155 WHERE {} AND created_at > ?
156 ORDER BY created_at ASC LIMIT ?",
157 min_severity_sql(min_severity),
158 );
159 sqlx::query_as::<_, Event>(&sql)
160 .bind(cursor)
161 .bind(BATCH)
162 .fetch_all(pool)
163 .await
164}
165
166enum DeliverOutcome {
167 Advance,
169 Retry,
171}
172
173async fn try_deliver(
175 pool: &SqlitePool,
176 client: &reqwest::Client,
177 sub: &WebhookSubscription,
178 ev: &Event,
179) -> DeliverOutcome {
180 let prior_failures: i64 = sqlx::query_scalar(
181 "SELECT COUNT(*) FROM webhook_deliveries
182 WHERE subscription_id = ? AND event_id = ? AND status = 'failed'",
183 )
184 .bind(&sub.id)
185 .bind(&ev.id)
186 .fetch_one(pool)
187 .await
188 .unwrap_or(0);
189 let attempt = prior_failures + 1;
190
191 let delivery_id = format!("whd_{}", Uuid::new_v4().simple());
192 let body = event_body(ev);
193 let res = send_event(
194 client,
195 &sub.url,
196 &delivery_id,
197 &ev.event_type,
198 sub.secret.as_deref(),
199 &body,
200 )
201 .await;
202
203 record_delivery(
204 pool,
205 &delivery_id,
206 &sub.id,
207 Some(&ev.id),
208 Some(&ev.event_type),
209 res.ok,
210 attempt,
211 res.status.map(i64::from),
212 res.error.as_deref(),
213 )
214 .await;
215
216 if res.ok {
217 DeliverOutcome::Advance
218 } else if attempt >= MAX_ATTEMPTS {
219 tracing::warn!(
220 subscription = %sub.id,
221 event = %ev.id,
222 attempts = attempt,
223 "webhooks: giving up on event after max attempts; advancing cursor past it"
224 );
225 DeliverOutcome::Advance
226 } else {
227 tracing::warn!(
228 subscription = %sub.id,
229 event = %ev.id,
230 attempt,
231 error = res.error.as_deref().unwrap_or("non-2xx"),
232 "webhooks: delivery failed; will retry next cycle"
233 );
234 DeliverOutcome::Retry
235 }
236}
237
238pub fn event_body(ev: &Event) -> Value {
240 json!({
241 "id": ev.id,
242 "camera_id": ev.camera_id,
243 "site_id": ev.site_id,
244 "event_type": ev.event_type,
245 "severity": ev.severity,
246 "timestamp": ev.timestamp,
247 "payload": ev.payload.0,
248 })
249}
250
251pub struct SendResult {
254 pub ok: bool,
255 pub status: Option<u16>,
256 pub error: Option<String>,
257}
258
259pub async fn send_event(
263 client: &reqwest::Client,
264 url: &str,
265 delivery_id: &str,
266 event_type: &str,
267 secret: Option<&str>,
268 body: &Value,
269) -> SendResult {
270 let raw = serde_json::to_string(body).unwrap_or_else(|_| "{}".to_string());
271 let mut req = client
272 .post(url)
273 .header(reqwest::header::CONTENT_TYPE, "application/json")
274 .header("X-Heldar-Event", event_type)
275 .header("X-Heldar-Delivery", delivery_id)
276 .header("X-Heldar-Timestamp", Utc::now().timestamp().to_string())
277 .body(raw.clone());
278 if let Some(secret) = secret.filter(|s| !s.is_empty()) {
279 req = req.header("X-Heldar-Signature", sign(secret, raw.as_bytes()));
280 }
281 match req.send().await {
282 Ok(resp) => {
283 let status = resp.status();
284 SendResult {
285 ok: status.is_success(),
286 status: Some(status.as_u16()),
287 error: if status.is_success() {
288 None
289 } else {
290 Some(format!("webhook returned HTTP {}", status.as_u16()))
291 },
292 }
293 }
294 Err(e) => SendResult {
295 ok: false,
296 status: None,
297 error: Some(e.to_string()),
298 },
299 }
300}
301
302#[allow(clippy::too_many_arguments)]
304pub async fn record_delivery(
305 pool: &SqlitePool,
306 id: &str,
307 subscription_id: &str,
308 event_id: Option<&str>,
309 event_type: Option<&str>,
310 delivered: bool,
311 attempts: i64,
312 response_code: Option<i64>,
313 error: Option<&str>,
314) {
315 let now = Utc::now();
316 let delivered_at = if delivered { Some(now) } else { None };
317 let status = if delivered { "delivered" } else { "failed" };
318 let res = sqlx::query(
319 "INSERT INTO webhook_deliveries
320 (id, subscription_id, event_id, event_type, status, attempts, response_code, error, created_at, delivered_at)
321 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
322 )
323 .bind(id)
324 .bind(subscription_id)
325 .bind(event_id)
326 .bind(event_type)
327 .bind(status)
328 .bind(attempts)
329 .bind(response_code)
330 .bind(error)
331 .bind(now)
332 .bind(delivered_at)
333 .execute(pool)
334 .await;
335 if let Err(e) = res {
336 tracing::error!(error = %e, subscription = %subscription_id, "webhooks: failed to record delivery");
337 }
338}
339
340fn sign(secret: &str, body: &[u8]) -> String {
342 let mut mac =
343 HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC accepts a key of any length");
344 mac.update(body);
345 format!(
346 "sha256={}",
347 crate::auth::hex_encode(&mac.finalize().into_bytes())
348 )
349}
350
351pub fn matches_event_type(filter: &[String], event_type: &str) -> bool {
354 filter.iter().any(|t| t == "*") || filter.iter().any(|t| t == event_type)
355}
356
357fn min_severity_sql(min_severity: &str) -> &'static str {
360 match min_severity {
361 "critical" => "severity = 'critical'",
362 "warning" => "severity IN ('warning', 'critical')",
363 _ => "1 = 1",
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn wildcard_matches_everything() {
373 let star = vec!["*".to_string()];
374 assert!(matches_event_type(&star, "zone_enter"));
375 assert!(matches_event_type(&star, "anything_at_all"));
376 }
377
378 #[test]
379 fn explicit_set_is_exact_membership() {
380 let set = vec!["zone_enter".to_string(), "disk_pressure".to_string()];
381 assert!(matches_event_type(&set, "zone_enter"));
382 assert!(matches_event_type(&set, "disk_pressure"));
383 assert!(!matches_event_type(&set, "zone_exit"));
384 assert!(!matches_event_type(&[], "zone_enter"));
385 }
386
387 #[test]
388 fn severity_floor_thresholds() {
389 assert_eq!(min_severity_sql("critical"), "severity = 'critical'");
390 assert_eq!(
391 min_severity_sql("warning"),
392 "severity IN ('warning', 'critical')"
393 );
394 assert_eq!(min_severity_sql("info"), "1 = 1");
396 assert_eq!(min_severity_sql("whatever"), "1 = 1");
397 }
398
399 #[test]
400 fn signature_is_stable_prefixed_hmac_sha256() {
401 let sig = sign("key", b"The quick brown fox jumps over the lazy dog");
403 assert_eq!(
404 sig,
405 "sha256=f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
406 );
407 assert_eq!(sign("s", b"body"), sign("s", b"body"));
409 assert_ne!(sign("s1", b"body"), sign("s2", b"body"));
410 }
411}