1use axum::extract::{Path, Query, State};
11use axum::http::StatusCode;
12use axum::routing::{get, post};
13use axum::{Json, Router};
14use chrono::Utc;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use sqlx::types::Json as SqlxJson;
18use uuid::Uuid;
19
20use crate::auth::{self, Principal};
21use crate::error::{AppError, AppResult};
22use crate::models::{
23 WebhookDelivery, WebhookSubscription, WebhookSubscriptionCreate, WebhookSubscriptionUpdate,
24 WebhookSubscriptionView,
25};
26use crate::services::webhooks;
27use crate::state::AppState;
28
29pub fn router() -> Router<AppState> {
30 Router::new()
31 .route("/api/v1/webhooks", get(list).post(create))
32 .route(
33 "/api/v1/webhooks/{id}",
34 axum::routing::patch(update).delete(delete),
35 )
36 .route("/api/v1/webhooks/{id}/test", post(test))
37 .route("/api/v1/webhooks/{id}/deliveries", get(list_deliveries))
38 .route("/api/v1/events/types", get(event_types))
39}
40
41const VALID_SEVERITIES: &[&str] = &["info", "warning", "critical"];
42
43fn valid_severity(s: &str) -> bool {
44 VALID_SEVERITIES.contains(&s)
45}
46
47fn validate_url(url: &str) -> AppResult<String> {
49 let url = url.trim();
50 if url.is_empty() {
51 return Err(AppError::BadRequest("`url` is required".into()));
52 }
53 if !(url.starts_with("http://") || url.starts_with("https://")) {
54 return Err(AppError::BadRequest("`url` must be an http(s) URL".into()));
55 }
56 Ok(url.to_string())
57}
58
59fn normalize_event_types(types: Option<Vec<String>>) -> AppResult<Vec<String>> {
62 let Some(types) = types else {
63 return Ok(vec!["*".to_string()]);
64 };
65 let mut out: Vec<String> = Vec::with_capacity(types.len());
66 for t in types {
67 let t = t.trim().to_string();
68 if t.is_empty() {
69 return Err(AppError::BadRequest(
70 "`event_types` entries must be non-empty".into(),
71 ));
72 }
73 if !out.contains(&t) {
74 out.push(t);
75 }
76 }
77 if out.is_empty() {
78 out.push("*".to_string());
79 }
80 Ok(out)
81}
82
83async fn load_subscription(pool: &sqlx::SqlitePool, id: &str) -> AppResult<WebhookSubscription> {
84 sqlx::query_as::<_, WebhookSubscription>("SELECT * FROM webhook_subscriptions WHERE id = ?")
85 .bind(id)
86 .fetch_optional(pool)
87 .await?
88 .ok_or_else(|| AppError::NotFound(format!("webhook subscription {id} not found")))
89}
90
91async fn list(
92 State(st): State<AppState>,
93 principal: Principal,
94) -> AppResult<Json<Vec<WebhookSubscriptionView>>> {
95 principal.require(principal.can_view(), "view webhook subscriptions")?;
96 let rows = sqlx::query_as::<_, WebhookSubscription>(
97 "SELECT * FROM webhook_subscriptions ORDER BY created_at ASC",
98 )
99 .fetch_all(&st.pool)
100 .await?;
101 Ok(Json(
102 rows.into_iter()
103 .map(WebhookSubscriptionView::from)
104 .collect(),
105 ))
106}
107
108async fn create(
109 State(st): State<AppState>,
110 principal: Principal,
111 Json(body): Json<WebhookSubscriptionCreate>,
112) -> AppResult<(StatusCode, Json<WebhookSubscriptionView>)> {
113 principal.require(
114 principal.can_manage_registry(),
115 "create webhook subscriptions",
116 )?;
117 let name = body.name.trim();
118 if name.is_empty() {
119 return Err(AppError::BadRequest("`name` is required".into()));
120 }
121 let url = validate_url(&body.url)?;
122 let min_severity = body.min_severity.unwrap_or_else(|| "info".into());
123 if !valid_severity(&min_severity) {
124 return Err(AppError::BadRequest(
125 "`min_severity` must be info|warning|critical".into(),
126 ));
127 }
128 let event_types = normalize_event_types(body.event_types)?;
129 let secret = body
130 .secret
131 .map(|s| s.trim().to_string())
132 .filter(|s| !s.is_empty());
133 let enabled = body.enabled.unwrap_or(true);
134 let id = format!("whs_{}", Uuid::new_v4().simple());
135 let now = Utc::now();
136
137 sqlx::query(
138 "INSERT INTO webhook_subscriptions
139 (id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
140 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
141 )
142 .bind(&id)
143 .bind(name)
144 .bind(&url)
145 .bind(SqlxJson(&event_types))
146 .bind(&min_severity)
147 .bind(secret.as_deref())
148 .bind(enabled)
149 .bind(now)
151 .bind(now)
152 .bind(now)
153 .execute(&st.pool)
154 .await?;
155
156 let sub = load_subscription(&st.pool, &id).await?;
157 auth::audit(
158 &st.pool,
159 &principal,
160 "create_webhook",
161 "webhook",
162 &id,
163 json!({
164 "name": name,
165 "event_types": &event_types,
166 "min_severity": &min_severity,
167 "has_secret": secret.is_some(),
168 "enabled": enabled,
169 }),
170 )
171 .await;
172 Ok((
173 StatusCode::CREATED,
174 Json(WebhookSubscriptionView::from(sub)),
175 ))
176}
177
178async fn update(
179 State(st): State<AppState>,
180 Path(id): Path<String>,
181 principal: Principal,
182 Json(body): Json<WebhookSubscriptionUpdate>,
183) -> AppResult<Json<WebhookSubscriptionView>> {
184 principal.require(
185 principal.can_manage_registry(),
186 "update webhook subscriptions",
187 )?;
188 let cur = load_subscription(&st.pool, &id).await?;
189
190 let name = match body.name {
191 Some(n) => {
192 let n = n.trim().to_string();
193 if n.is_empty() {
194 return Err(AppError::BadRequest("`name` must not be empty".into()));
195 }
196 n
197 }
198 None => cur.name,
199 };
200 let url = match body.url {
201 Some(u) => validate_url(&u)?,
202 None => cur.url,
203 };
204 let min_severity = match body.min_severity {
205 Some(s) => {
206 if !valid_severity(&s) {
207 return Err(AppError::BadRequest(
208 "`min_severity` must be info|warning|critical".into(),
209 ));
210 }
211 s
212 }
213 None => cur.min_severity,
214 };
215 let event_types = match body.event_types {
216 Some(t) => normalize_event_types(Some(t))?,
217 None => cur.event_types.0,
218 };
219 let secret: Option<String> = match body.secret {
221 None => cur.secret,
222 Some(None) => None,
223 Some(Some(s)) => {
224 let s = s.trim().to_string();
225 if s.is_empty() {
226 None
227 } else {
228 Some(s)
229 }
230 }
231 };
232 let enabled = body.enabled.unwrap_or(cur.enabled);
233
234 sqlx::query(
235 "UPDATE webhook_subscriptions
236 SET name = ?, url = ?, event_types = ?, min_severity = ?, secret = ?, enabled = ?, updated_at = ?
237 WHERE id = ?",
238 )
239 .bind(&name)
240 .bind(&url)
241 .bind(SqlxJson(&event_types))
242 .bind(&min_severity)
243 .bind(secret.as_deref())
244 .bind(enabled)
245 .bind(Utc::now())
246 .bind(&id)
247 .execute(&st.pool)
248 .await?;
249
250 let sub = load_subscription(&st.pool, &id).await?;
251 auth::audit(
252 &st.pool,
253 &principal,
254 "update_webhook",
255 "webhook",
256 &id,
257 json!({
258 "name": &name,
259 "event_types": &event_types,
260 "min_severity": &min_severity,
261 "has_secret": secret.is_some(),
262 "enabled": enabled,
263 }),
264 )
265 .await;
266 Ok(Json(WebhookSubscriptionView::from(sub)))
267}
268
269async fn delete(
270 State(st): State<AppState>,
271 Path(id): Path<String>,
272 principal: Principal,
273) -> AppResult<StatusCode> {
274 principal.require(
275 principal.can_manage_registry(),
276 "delete webhook subscriptions",
277 )?;
278 let res = sqlx::query("DELETE FROM webhook_subscriptions WHERE id = ?")
279 .bind(&id)
280 .execute(&st.pool)
281 .await?;
282 if res.rows_affected() == 0 {
283 return Err(AppError::NotFound(format!(
284 "webhook subscription {id} not found"
285 )));
286 }
287 auth::audit(
288 &st.pool,
289 &principal,
290 "delete_webhook",
291 "webhook",
292 &id,
293 json!({}),
294 )
295 .await;
296 Ok(StatusCode::NO_CONTENT)
297}
298
299#[derive(Debug, Serialize)]
301struct WebhookTestResult {
302 ok: bool,
303 status: Option<u16>,
304 error: Option<String>,
305}
306
307async fn test(
308 State(st): State<AppState>,
309 Path(id): Path<String>,
310 principal: Principal,
311) -> AppResult<Json<WebhookTestResult>> {
312 principal.require(
313 principal.can_manage_registry(),
314 "test webhook subscriptions",
315 )?;
316 let sub = load_subscription(&st.pool, &id).await?;
317
318 let delivery_id = format!("whd_{}", Uuid::new_v4().simple());
319 let body = json!({
320 "id": &delivery_id,
321 "camera_id": serde_json::Value::Null,
322 "site_id": st.cfg.site_id.clone(),
323 "event_type": "test",
324 "severity": "info",
325 "timestamp": Utc::now(),
326 "payload": { "message": "Heldar webhook test" },
327 });
328 let res = webhooks::send_event(
329 &st.http,
330 &sub.url,
331 &delivery_id,
332 "test",
333 sub.secret.as_deref(),
334 &body,
335 )
336 .await;
337
338 webhooks::record_delivery(
340 &st.pool,
341 &delivery_id,
342 &sub.id,
343 None,
344 Some("test"),
345 res.ok,
346 1,
347 res.status.map(i64::from),
348 res.error.as_deref(),
349 )
350 .await;
351
352 auth::audit(
353 &st.pool,
354 &principal,
355 "test_webhook",
356 "webhook",
357 &id,
358 json!({ "ok": res.ok, "status": res.status }),
359 )
360 .await;
361 Ok(Json(WebhookTestResult {
362 ok: res.ok,
363 status: res.status,
364 error: res.error,
365 }))
366}
367
368#[derive(Debug, Deserialize)]
369struct DeliveriesQuery {
370 limit: Option<i64>,
371}
372
373async fn list_deliveries(
374 State(st): State<AppState>,
375 Path(id): Path<String>,
376 principal: Principal,
377 Query(q): Query<DeliveriesQuery>,
378) -> AppResult<Json<Vec<WebhookDelivery>>> {
379 principal.require(principal.can_view(), "view webhook deliveries")?;
380 let _ = load_subscription(&st.pool, &id).await?;
381 let limit = q.limit.unwrap_or(100).clamp(1, 1000);
382 let rows = sqlx::query_as::<_, WebhookDelivery>(
383 "SELECT * FROM webhook_deliveries WHERE subscription_id = ? ORDER BY created_at DESC LIMIT ?",
384 )
385 .bind(&id)
386 .bind(limit)
387 .fetch_all(&st.pool)
388 .await?;
389 Ok(Json(rows))
390}
391
392#[derive(Debug, Serialize)]
394struct EventTypeInfo {
395 event_type: &'static str,
396 description: &'static str,
397}
398
399async fn event_types(
403 State(st): State<AppState>,
404 principal: Principal,
405) -> AppResult<Json<Vec<serde_json::Value>>> {
406 principal.require(principal.can_view(), "view event types")?;
407 let types = vec![
408 EventTypeInfo {
409 event_type: "camera_offline",
410 description: "A camera's recorder lost its RTSP connection (camera went offline).",
411 },
412 EventTypeInfo {
413 event_type: "recorder_error",
414 description: "A camera's recorder process errored or its segments went stale.",
415 },
416 EventTypeInfo {
417 event_type: "recording_gap",
418 description: "A hole was detected between consecutive recorded segments.",
419 },
420 EventTypeInfo {
421 event_type: "sampler_offline",
422 description: "An AI frame sampler for a camera went offline.",
423 },
424 EventTypeInfo {
425 event_type: "retention_delete",
426 description: "Old segments were pruned by the retention sweeper (by age).",
427 },
428 EventTypeInfo {
429 event_type: "disk_pressure",
430 description:
431 "Recording storage is under pressure (per-camera quota, size cap, or free-space floor).",
432 },
433 EventTypeInfo {
434 event_type: "disk_smart_warning",
435 description: "A SMART self-assessment reported a disk health warning.",
436 },
437 EventTypeInfo {
438 event_type: "raid_degraded",
439 description: "A Linux md/RAID array reported a degraded or down member.",
440 },
441 EventTypeInfo {
442 event_type: "zone_enter",
443 description: "A tracked detection entered a configured zone.",
444 },
445 EventTypeInfo {
446 event_type: "zone_exit",
447 description: "A tracked detection left a configured zone.",
448 },
449 EventTypeInfo {
450 event_type: "zone_dwell",
451 description: "A tracked detection dwelled inside a zone past its dwell threshold.",
452 },
453 EventTypeInfo {
454 event_type: "entry_matched",
455 description: "Access control: an entry matched the registry and was authorized.",
456 },
457 EventTypeInfo {
458 event_type: "entry_exception",
459 description: "Access control: an entry needs operator review (unmatched/low-confidence).",
460 },
461 EventTypeInfo {
462 event_type: "entry_unmatched",
463 description: "Access control: an entry did not match any registry record.",
464 },
465 EventTypeInfo {
466 event_type: "entry_blocked",
467 description: "Access control: an entry matched a watchlist/blocklist and was denied.",
468 },
469 ];
470 let known: std::collections::HashSet<&str> = types.iter().map(|t| t.event_type).collect();
474 let mut out: Vec<serde_json::Value> = types
475 .iter()
476 .map(|t| json!({ "event_type": t.event_type, "description": t.description }))
477 .collect();
478 if let Ok(rows) = sqlx::query_scalar::<_, String>(
479 "SELECT DISTINCT event_type FROM events ORDER BY event_type LIMIT 500",
480 )
481 .fetch_all(&st.pool)
482 .await
483 {
484 for ty in rows.into_iter().filter(|t| !known.contains(t.as_str())) {
485 out.push(json!({ "event_type": ty, "description": "Observed at runtime (plugin/app-emitted)." }));
486 }
487 }
488 Ok(Json(out))
489}