Skip to main content

heldar_kernel/routes/
webhooks.rs

1//! Webhook subscription management + the event-type taxonomy.
2//!
3//! Subscriptions are the generic external-delivery surface that supersedes the single-URL alerting
4//! webhook: the background engine ([`crate::services::webhooks`]) delivers events to each enabled
5//! subscription with HMAC signing and at-least-once retry. Listings + the delivery log are readable by
6//! any authenticated principal (`can_view`); create/update/delete/test are gated by manager+
7//! (`can_manage_registry`) and written to the immutable audit log. The signing `secret` is MASKED on
8//! read (surfaced only as `has_secret`) and never echoed back.
9
10use axum::extract::{Path, Query, State};
11use axum::http::StatusCode;
12use axum::routing::{get, post};
13use axum::{Json, Router};
14use chrono::Utc;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use sqlx::types::Json as SqlxJson;
18use uuid::Uuid;
19
20use crate::auth::{self, Principal};
21use crate::error::{AppError, AppResult};
22use crate::models::{
23    WebhookDelivery, WebhookSubscription, WebhookSubscriptionCreate, WebhookSubscriptionUpdate,
24    WebhookSubscriptionView,
25};
26use crate::services::webhooks;
27use crate::state::AppState;
28
29pub fn router() -> Router<AppState> {
30    Router::new()
31        .route("/api/v1/webhooks", get(list).post(create))
32        .route(
33            "/api/v1/webhooks/{id}",
34            axum::routing::patch(update).delete(delete),
35        )
36        .route("/api/v1/webhooks/{id}/test", post(test))
37        .route("/api/v1/webhooks/{id}/deliveries", get(list_deliveries))
38        .route("/api/v1/events/types", get(event_types))
39}
40
41const VALID_SEVERITIES: &[&str] = &["info", "warning", "critical"];
42
43fn valid_severity(s: &str) -> bool {
44    VALID_SEVERITIES.contains(&s)
45}
46
47/// Validate + normalize a webhook url. Only http(s) targets are accepted (no `file://`, etc.).
48fn validate_url(url: &str) -> AppResult<String> {
49    let url = url.trim();
50    if url.is_empty() {
51        return Err(AppError::BadRequest("`url` is required".into()));
52    }
53    if !(url.starts_with("http://") || url.starts_with("https://")) {
54        return Err(AppError::BadRequest("`url` must be an http(s) URL".into()));
55    }
56    Ok(url.to_string())
57}
58
59/// Validate + normalize an event-type filter. `None`/empty = all types (`["*"]`); otherwise each entry
60/// must be a non-empty token (deduped, trimmed).
61fn normalize_event_types(types: Option<Vec<String>>) -> AppResult<Vec<String>> {
62    let Some(types) = types else {
63        return Ok(vec!["*".to_string()]);
64    };
65    let mut out: Vec<String> = Vec::with_capacity(types.len());
66    for t in types {
67        let t = t.trim().to_string();
68        if t.is_empty() {
69            return Err(AppError::BadRequest(
70                "`event_types` entries must be non-empty".into(),
71            ));
72        }
73        if !out.contains(&t) {
74            out.push(t);
75        }
76    }
77    if out.is_empty() {
78        out.push("*".to_string());
79    }
80    Ok(out)
81}
82
83async fn load_subscription(pool: &sqlx::SqlitePool, id: &str) -> AppResult<WebhookSubscription> {
84    sqlx::query_as::<_, WebhookSubscription>("SELECT * FROM webhook_subscriptions WHERE id = ?")
85        .bind(id)
86        .fetch_optional(pool)
87        .await?
88        .ok_or_else(|| AppError::NotFound(format!("webhook subscription {id} not found")))
89}
90
91async fn list(
92    State(st): State<AppState>,
93    principal: Principal,
94) -> AppResult<Json<Vec<WebhookSubscriptionView>>> {
95    principal.require(principal.can_view(), "view webhook subscriptions")?;
96    let rows = sqlx::query_as::<_, WebhookSubscription>(
97        "SELECT * FROM webhook_subscriptions ORDER BY created_at ASC",
98    )
99    .fetch_all(&st.pool)
100    .await?;
101    Ok(Json(
102        rows.into_iter()
103            .map(WebhookSubscriptionView::from)
104            .collect(),
105    ))
106}
107
108async fn create(
109    State(st): State<AppState>,
110    principal: Principal,
111    Json(body): Json<WebhookSubscriptionCreate>,
112) -> AppResult<(StatusCode, Json<WebhookSubscriptionView>)> {
113    principal.require(
114        principal.can_manage_registry(),
115        "create webhook subscriptions",
116    )?;
117    let name = body.name.trim();
118    if name.is_empty() {
119        return Err(AppError::BadRequest("`name` is required".into()));
120    }
121    let url = validate_url(&body.url)?;
122    let min_severity = body.min_severity.unwrap_or_else(|| "info".into());
123    if !valid_severity(&min_severity) {
124        return Err(AppError::BadRequest(
125            "`min_severity` must be info|warning|critical".into(),
126        ));
127    }
128    let event_types = normalize_event_types(body.event_types)?;
129    let secret = body
130        .secret
131        .map(|s| s.trim().to_string())
132        .filter(|s| !s.is_empty());
133    let enabled = body.enabled.unwrap_or(true);
134    let id = format!("whs_{}", Uuid::new_v4().simple());
135    let now = Utc::now();
136
137    sqlx::query(
138        "INSERT INTO webhook_subscriptions
139           (id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
140         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
141    )
142    .bind(&id)
143    .bind(name)
144    .bind(&url)
145    .bind(SqlxJson(&event_types))
146    .bind(&min_severity)
147    .bind(secret.as_deref())
148    .bind(enabled)
149    // cursor_at = now: deliver events from creation forward, never replay the backlog.
150    .bind(now)
151    .bind(now)
152    .bind(now)
153    .execute(&st.pool)
154    .await?;
155
156    let sub = load_subscription(&st.pool, &id).await?;
157    auth::audit(
158        &st.pool,
159        &principal,
160        "create_webhook",
161        "webhook",
162        &id,
163        json!({
164            "name": name,
165            "event_types": &event_types,
166            "min_severity": &min_severity,
167            "has_secret": secret.is_some(),
168            "enabled": enabled,
169        }),
170    )
171    .await;
172    Ok((
173        StatusCode::CREATED,
174        Json(WebhookSubscriptionView::from(sub)),
175    ))
176}
177
178async fn update(
179    State(st): State<AppState>,
180    Path(id): Path<String>,
181    principal: Principal,
182    Json(body): Json<WebhookSubscriptionUpdate>,
183) -> AppResult<Json<WebhookSubscriptionView>> {
184    principal.require(
185        principal.can_manage_registry(),
186        "update webhook subscriptions",
187    )?;
188    let cur = load_subscription(&st.pool, &id).await?;
189
190    let name = match body.name {
191        Some(n) => {
192            let n = n.trim().to_string();
193            if n.is_empty() {
194                return Err(AppError::BadRequest("`name` must not be empty".into()));
195            }
196            n
197        }
198        None => cur.name,
199    };
200    let url = match body.url {
201        Some(u) => validate_url(&u)?,
202        None => cur.url,
203    };
204    let min_severity = match body.min_severity {
205        Some(s) => {
206            if !valid_severity(&s) {
207                return Err(AppError::BadRequest(
208                    "`min_severity` must be info|warning|critical".into(),
209                ));
210            }
211            s
212        }
213        None => cur.min_severity,
214    };
215    let event_types = match body.event_types {
216        Some(t) => normalize_event_types(Some(t))?,
217        None => cur.event_types.0,
218    };
219    // Three-state secret: omitted = keep, null = clear, value = set (empty value also clears).
220    let secret: Option<String> = match body.secret {
221        None => cur.secret,
222        Some(None) => None,
223        Some(Some(s)) => {
224            let s = s.trim().to_string();
225            if s.is_empty() {
226                None
227            } else {
228                Some(s)
229            }
230        }
231    };
232    let enabled = body.enabled.unwrap_or(cur.enabled);
233
234    sqlx::query(
235        "UPDATE webhook_subscriptions
236            SET name = ?, url = ?, event_types = ?, min_severity = ?, secret = ?, enabled = ?, updated_at = ?
237          WHERE id = ?",
238    )
239    .bind(&name)
240    .bind(&url)
241    .bind(SqlxJson(&event_types))
242    .bind(&min_severity)
243    .bind(secret.as_deref())
244    .bind(enabled)
245    .bind(Utc::now())
246    .bind(&id)
247    .execute(&st.pool)
248    .await?;
249
250    let sub = load_subscription(&st.pool, &id).await?;
251    auth::audit(
252        &st.pool,
253        &principal,
254        "update_webhook",
255        "webhook",
256        &id,
257        json!({
258            "name": &name,
259            "event_types": &event_types,
260            "min_severity": &min_severity,
261            "has_secret": secret.is_some(),
262            "enabled": enabled,
263        }),
264    )
265    .await;
266    Ok(Json(WebhookSubscriptionView::from(sub)))
267}
268
269async fn delete(
270    State(st): State<AppState>,
271    Path(id): Path<String>,
272    principal: Principal,
273) -> AppResult<StatusCode> {
274    principal.require(
275        principal.can_manage_registry(),
276        "delete webhook subscriptions",
277    )?;
278    let res = sqlx::query("DELETE FROM webhook_subscriptions WHERE id = ?")
279        .bind(&id)
280        .execute(&st.pool)
281        .await?;
282    if res.rows_affected() == 0 {
283        return Err(AppError::NotFound(format!(
284            "webhook subscription {id} not found"
285        )));
286    }
287    auth::audit(
288        &st.pool,
289        &principal,
290        "delete_webhook",
291        "webhook",
292        &id,
293        json!({}),
294    )
295    .await;
296    Ok(StatusCode::NO_CONTENT)
297}
298
299/// Result of POST /api/v1/webhooks/{id}/test — one synthetic signed delivery to the subscription's url.
300#[derive(Debug, Serialize)]
301struct WebhookTestResult {
302    ok: bool,
303    status: Option<u16>,
304    error: Option<String>,
305}
306
307async fn test(
308    State(st): State<AppState>,
309    Path(id): Path<String>,
310    principal: Principal,
311) -> AppResult<Json<WebhookTestResult>> {
312    principal.require(
313        principal.can_manage_registry(),
314        "test webhook subscriptions",
315    )?;
316    let sub = load_subscription(&st.pool, &id).await?;
317
318    let delivery_id = format!("whd_{}", Uuid::new_v4().simple());
319    let body = json!({
320        "id": &delivery_id,
321        "camera_id": serde_json::Value::Null,
322        "site_id": st.cfg.site_id.clone(),
323        "event_type": "test",
324        "severity": "info",
325        "timestamp": Utc::now(),
326        "payload": { "message": "Heldar webhook test" },
327    });
328    let res = webhooks::send_event(
329        &st.http,
330        &sub.url,
331        &delivery_id,
332        "test",
333        sub.secret.as_deref(),
334        &body,
335    )
336    .await;
337
338    // Record the synthetic delivery (event_id NULL, so it never counts toward real-event retry bounds).
339    webhooks::record_delivery(
340        &st.pool,
341        &delivery_id,
342        &sub.id,
343        None,
344        Some("test"),
345        res.ok,
346        1,
347        res.status.map(i64::from),
348        res.error.as_deref(),
349    )
350    .await;
351
352    auth::audit(
353        &st.pool,
354        &principal,
355        "test_webhook",
356        "webhook",
357        &id,
358        json!({ "ok": res.ok, "status": res.status }),
359    )
360    .await;
361    Ok(Json(WebhookTestResult {
362        ok: res.ok,
363        status: res.status,
364        error: res.error,
365    }))
366}
367
368#[derive(Debug, Deserialize)]
369struct DeliveriesQuery {
370    limit: Option<i64>,
371}
372
373async fn list_deliveries(
374    State(st): State<AppState>,
375    Path(id): Path<String>,
376    principal: Principal,
377    Query(q): Query<DeliveriesQuery>,
378) -> AppResult<Json<Vec<WebhookDelivery>>> {
379    principal.require(principal.can_view(), "view webhook deliveries")?;
380    let _ = load_subscription(&st.pool, &id).await?;
381    let limit = q.limit.unwrap_or(100).clamp(1, 1000);
382    let rows = sqlx::query_as::<_, WebhookDelivery>(
383        "SELECT * FROM webhook_deliveries WHERE subscription_id = ? ORDER BY created_at DESC LIMIT ?",
384    )
385    .bind(&id)
386    .bind(limit)
387    .fetch_all(&st.pool)
388    .await?;
389    Ok(Json(rows))
390}
391
392/// One known event type plus a one-line description (the built-in taxonomy).
393#[derive(Debug, Serialize)]
394struct EventTypeInfo {
395    event_type: &'static str,
396    description: &'static str,
397}
398
399/// The built-in event-type taxonomy emitted via `repo::log_event` across the kernel + bundled apps.
400/// Apps and AI workers may additionally emit their own custom `event_type` strings (the AI ingest path
401/// passes a worker-supplied type straight through), so this list is descriptive, not exhaustive.
402async fn event_types(
403    State(st): State<AppState>,
404    principal: Principal,
405) -> AppResult<Json<Vec<serde_json::Value>>> {
406    principal.require(principal.can_view(), "view event types")?;
407    let types = vec![
408        EventTypeInfo {
409            event_type: "camera_offline",
410            description: "A camera's recorder lost its RTSP connection (camera went offline).",
411        },
412        EventTypeInfo {
413            event_type: "recorder_error",
414            description: "A camera's recorder process errored or its segments went stale.",
415        },
416        EventTypeInfo {
417            event_type: "recording_gap",
418            description: "A hole was detected between consecutive recorded segments.",
419        },
420        EventTypeInfo {
421            event_type: "sampler_offline",
422            description: "An AI frame sampler for a camera went offline.",
423        },
424        EventTypeInfo {
425            event_type: "retention_delete",
426            description: "Old segments were pruned by the retention sweeper (by age).",
427        },
428        EventTypeInfo {
429            event_type: "disk_pressure",
430            description:
431                "Recording storage is under pressure (per-camera quota, size cap, or free-space floor).",
432        },
433        EventTypeInfo {
434            event_type: "disk_smart_warning",
435            description: "A SMART self-assessment reported a disk health warning.",
436        },
437        EventTypeInfo {
438            event_type: "raid_degraded",
439            description: "A Linux md/RAID array reported a degraded or down member.",
440        },
441        EventTypeInfo {
442            event_type: "zone_enter",
443            description: "A tracked detection entered a configured zone.",
444        },
445        EventTypeInfo {
446            event_type: "zone_exit",
447            description: "A tracked detection left a configured zone.",
448        },
449        EventTypeInfo {
450            event_type: "zone_dwell",
451            description: "A tracked detection dwelled inside a zone past its dwell threshold.",
452        },
453        EventTypeInfo {
454            event_type: "entry_matched",
455            description: "Access control: an entry matched the registry and was authorized.",
456        },
457        EventTypeInfo {
458            event_type: "entry_exception",
459            description: "Access control: an entry needs operator review (unmatched/low-confidence).",
460        },
461        EventTypeInfo {
462            event_type: "entry_unmatched",
463            description: "Access control: an entry did not match any registry record.",
464        },
465        EventTypeInfo {
466            event_type: "entry_blocked",
467            description: "Access control: an entry matched a watchlist/blocklist and was denied.",
468        },
469    ];
470    // Start with the built-in taxonomy, then UNION in event types actually observed in the events table
471    // (plugin/app-emitted types like `wasm.*`, which the static list can't know), so they appear in the
472    // webhook subscription picker. Capped + best-effort: a query failure just returns the static set.
473    let known: std::collections::HashSet<&str> = types.iter().map(|t| t.event_type).collect();
474    let mut out: Vec<serde_json::Value> = types
475        .iter()
476        .map(|t| json!({ "event_type": t.event_type, "description": t.description }))
477        .collect();
478    if let Ok(rows) = sqlx::query_scalar::<_, String>(
479        "SELECT DISTINCT event_type FROM events ORDER BY event_type LIMIT 500",
480    )
481    .fetch_all(&st.pool)
482    .await
483    {
484        for ty in rows.into_iter().filter(|t| !known.contains(t.as_str())) {
485            out.push(json!({ "event_type": ty, "description": "Observed at runtime (plugin/app-emitted)." }));
486        }
487    }
488    Ok(Json(out))
489}