Skip to main content

heldar_kernel/
repo.rs

1//! Small shared data-access helpers used by background services and routes.
2
3use chrono::{DateTime, Utc};
4use serde_json::Value;
5use sqlx::types::Json;
6use sqlx::SqlitePool;
7use uuid::Uuid;
8
9/// Upsert the camera status row, setting `state` and `last_error` (does not touch counters).
10pub async fn set_state(
11    pool: &SqlitePool,
12    camera_id: &str,
13    state: &str,
14    last_error: Option<&str>,
15) -> sqlx::Result<()> {
16    let now = Utc::now();
17    sqlx::query(
18        "INSERT INTO camera_status (camera_id, state, last_error, updated_at)
19         VALUES (?, ?, ?, ?)
20         ON CONFLICT(camera_id) DO UPDATE SET
21            state = excluded.state,
22            last_error = excluded.last_error,
23            updated_at = excluded.updated_at",
24    )
25    .bind(camera_id)
26    .bind(state)
27    .bind(last_error)
28    .bind(now)
29    .execute(pool)
30    .await?;
31    Ok(())
32}
33
34/// Mark the recorder process started: set state, pid, and last_started_at.
35pub async fn set_running(
36    pool: &SqlitePool,
37    camera_id: &str,
38    state: &str,
39    pid: Option<i64>,
40) -> sqlx::Result<()> {
41    let now = Utc::now();
42    sqlx::query(
43        "INSERT INTO camera_status (camera_id, state, recorder_pid, last_started_at, last_error, updated_at)
44         VALUES (?, ?, ?, ?, NULL, ?)
45         ON CONFLICT(camera_id) DO UPDATE SET
46            state = excluded.state,
47            recorder_pid = excluded.recorder_pid,
48            last_started_at = excluded.last_started_at,
49            last_error = NULL,
50            updated_at = excluded.updated_at",
51    )
52    .bind(camera_id)
53    .bind(state)
54    .bind(pid)
55    .bind(now)
56    .bind(now)
57    .execute(pool)
58    .await?;
59    Ok(())
60}
61
62/// Increment the reconnect counter, clear pid, and record the last error.
63pub async fn bump_reconnect(
64    pool: &SqlitePool,
65    camera_id: &str,
66    last_error: &str,
67) -> sqlx::Result<()> {
68    let now = Utc::now();
69    let err = last_error.chars().rev().take(800).collect::<String>();
70    let err: String = err.chars().rev().collect();
71    sqlx::query(
72        "INSERT INTO camera_status (camera_id, state, reconnect_count, last_error, recorder_pid, updated_at)
73         VALUES (?, 'offline', 1, ?, NULL, ?)
74         ON CONFLICT(camera_id) DO UPDATE SET
75            state = 'offline',
76            reconnect_count = camera_status.reconnect_count + 1,
77            last_error = excluded.last_error,
78            recorder_pid = NULL,
79            updated_at = excluded.updated_at",
80    )
81    .bind(camera_id)
82    .bind(err)
83    .bind(now)
84    .execute(pool)
85    .await?;
86    Ok(())
87}
88
89/// Record that a new segment was indexed: bump count, set last_segment_at and observed bitrate.
90pub async fn record_segment_indexed(
91    pool: &SqlitePool,
92    camera_id: &str,
93    last_segment_at: DateTime<Utc>,
94    bitrate_kbps: Option<f64>,
95    fps_observed: Option<f64>,
96) -> sqlx::Result<()> {
97    let now = Utc::now();
98    sqlx::query(
99        "INSERT INTO camera_status
100           (camera_id, state, last_segment_at, segments_written, bitrate_kbps, fps_observed, updated_at)
101         VALUES (?, 'recording', ?, 1, ?, ?, ?)
102         ON CONFLICT(camera_id) DO UPDATE SET
103            state = 'recording',
104            last_segment_at = excluded.last_segment_at,
105            segments_written = camera_status.segments_written + 1,
106            bitrate_kbps = excluded.bitrate_kbps,
107            fps_observed = excluded.fps_observed,
108            last_error = NULL,
109            updated_at = excluded.updated_at",
110    )
111    .bind(camera_id)
112    .bind(last_segment_at)
113    .bind(bitrate_kbps)
114    .bind(fps_observed)
115    .bind(now)
116    .execute(pool)
117    .await?;
118    Ok(())
119}
120
121/// Insert an event into the event log.
122pub async fn log_event(
123    pool: &SqlitePool,
124    camera_id: Option<&str>,
125    event_type: &str,
126    severity: &str,
127    payload: Value,
128) -> sqlx::Result<()> {
129    let now = Utc::now();
130    sqlx::query(
131        "INSERT INTO events (id, camera_id, site_id, event_type, severity, timestamp, payload, created_at)
132         VALUES (?, ?, NULL, ?, ?, ?, ?, ?)",
133    )
134    .bind(Uuid::new_v4().to_string())
135    .bind(camera_id)
136    .bind(event_type)
137    .bind(severity)
138    .bind(now)
139    .bind(Json(payload))
140    .bind(now)
141    .execute(pool)
142    .await?;
143    Ok(())
144}
145
146/// Toggle a transient read-lock on a set of segments so the retention sweeper (which only deletes
147/// `locked = 0`) won't remove them while clip/snapshot ffmpeg is reading them — closing the TOCTOU
148/// between selecting segments and ffmpeg opening their files. Best-effort: a failure is logged, not
149/// fatal (the read still proceeds). Locks are cleared at startup ([`crate::db::clear_segment_read_locks`])
150/// so a crash mid-read cannot pin segments forever.
151pub async fn set_segments_locked(pool: &SqlitePool, ids: &[String], locked: bool) {
152    if ids.is_empty() {
153        return;
154    }
155    let placeholders = vec!["?"; ids.len()].join(",");
156    let sql = format!("UPDATE segments SET locked = ? WHERE id IN ({placeholders})");
157    let mut q = sqlx::query(&sql).bind(i64::from(locked));
158    for id in ids {
159        q = q.bind(id);
160    }
161    if let Err(e) = q.execute(pool).await {
162        tracing::warn!(error = %e, locked, "failed to toggle segment read-lock");
163    }
164}