Skip to main content

heldar_kernel/
repo.rs

1//! Small shared data-access helpers used by background services and routes.
2
3use chrono::{DateTime, Utc};
4use serde_json::Value;
5use sqlx::types::Json;
6use sqlx::SqlitePool;
7use uuid::Uuid;
8
9/// Upsert the camera status row, setting `state` and `last_error` (does not touch counters).
10pub async fn set_state(
11    pool: &SqlitePool,
12    camera_id: &str,
13    state: &str,
14    last_error: Option<&str>,
15) -> sqlx::Result<()> {
16    let now = Utc::now();
17    sqlx::query(
18        "INSERT INTO camera_status (camera_id, state, last_error, updated_at)
19         VALUES (?, ?, ?, ?)
20         ON CONFLICT(camera_id) DO UPDATE SET
21            state = excluded.state,
22            last_error = excluded.last_error,
23            updated_at = excluded.updated_at",
24    )
25    .bind(camera_id)
26    .bind(state)
27    .bind(last_error)
28    .bind(now)
29    .execute(pool)
30    .await?;
31    Ok(())
32}
33
34/// Mark the recorder process started: set state, pid, and last_started_at.
35pub async fn set_running(
36    pool: &SqlitePool,
37    camera_id: &str,
38    state: &str,
39    pid: Option<i64>,
40) -> sqlx::Result<()> {
41    let now = Utc::now();
42    sqlx::query(
43        "INSERT INTO camera_status (camera_id, state, recorder_pid, last_started_at, last_error, updated_at)
44         VALUES (?, ?, ?, ?, NULL, ?)
45         ON CONFLICT(camera_id) DO UPDATE SET
46            state = excluded.state,
47            recorder_pid = excluded.recorder_pid,
48            last_started_at = excluded.last_started_at,
49            last_error = NULL,
50            updated_at = excluded.updated_at",
51    )
52    .bind(camera_id)
53    .bind(state)
54    .bind(pid)
55    .bind(now)
56    .bind(now)
57    .execute(pool)
58    .await?;
59    Ok(())
60}
61
62/// Increment the reconnect counter, clear pid, and record the last error.
63pub async fn bump_reconnect(
64    pool: &SqlitePool,
65    camera_id: &str,
66    last_error: &str,
67) -> sqlx::Result<()> {
68    let now = Utc::now();
69    let err = last_error.chars().rev().take(800).collect::<String>();
70    let err: String = err.chars().rev().collect();
71    sqlx::query(
72        "INSERT INTO camera_status (camera_id, state, reconnect_count, last_error, recorder_pid, updated_at)
73         VALUES (?, 'offline', 1, ?, NULL, ?)
74         ON CONFLICT(camera_id) DO UPDATE SET
75            state = 'offline',
76            reconnect_count = camera_status.reconnect_count + 1,
77            last_error = excluded.last_error,
78            recorder_pid = NULL,
79            updated_at = excluded.updated_at",
80    )
81    .bind(camera_id)
82    .bind(err)
83    .bind(now)
84    .execute(pool)
85    .await?;
86    Ok(())
87}
88
89/// Record that a new segment was indexed: bump count, set last_segment_at and observed bitrate.
90pub async fn record_segment_indexed(
91    pool: &SqlitePool,
92    camera_id: &str,
93    last_segment_at: DateTime<Utc>,
94    bitrate_kbps: Option<f64>,
95    fps_observed: Option<f64>,
96) -> sqlx::Result<()> {
97    let now = Utc::now();
98    sqlx::query(
99        "INSERT INTO camera_status
100           (camera_id, state, last_segment_at, segments_written, bitrate_kbps, fps_observed, updated_at)
101         VALUES (?, 'recording', ?, 1, ?, ?, ?)
102         ON CONFLICT(camera_id) DO UPDATE SET
103            state = 'recording',
104            last_segment_at = excluded.last_segment_at,
105            segments_written = camera_status.segments_written + 1,
106            bitrate_kbps = excluded.bitrate_kbps,
107            fps_observed = excluded.fps_observed,
108            last_error = NULL,
109            updated_at = excluded.updated_at",
110    )
111    .bind(camera_id)
112    .bind(last_segment_at)
113    .bind(bitrate_kbps)
114    .bind(fps_observed)
115    .bind(now)
116    .execute(pool)
117    .await?;
118    Ok(())
119}
120
121/// Record a detected recording gap (a hole > 3s between consecutive segments) for ANR re-fill.
122/// Ignore-on-conflict by `(camera_id, gap_start)` so re-scans never duplicate a gap. Best-effort:
123/// a failure is the caller's to log, not fatal to indexing.
124pub async fn upsert_recording_gap(
125    pool: &SqlitePool,
126    camera_id: &str,
127    gap_start: DateTime<Utc>,
128    gap_end: DateTime<Utc>,
129    gap_seconds: i64,
130) -> sqlx::Result<()> {
131    let id = format!("gap_{}", Uuid::new_v4().simple());
132    sqlx::query(
133        "INSERT INTO recording_gaps
134           (id, camera_id, gap_start, gap_end, gap_seconds, fill_state, fill_attempts, created_at)
135         VALUES (?, ?, ?, ?, ?, 'pending', 0, ?)
136         ON CONFLICT(camera_id, gap_start) DO NOTHING",
137    )
138    .bind(id)
139    .bind(camera_id)
140    .bind(gap_start)
141    .bind(gap_end)
142    .bind(gap_seconds)
143    .bind(Utc::now())
144    .execute(pool)
145    .await?;
146    Ok(())
147}
148
149/// Insert an event into the event log.
150pub async fn log_event(
151    pool: &SqlitePool,
152    camera_id: Option<&str>,
153    event_type: &str,
154    severity: &str,
155    payload: Value,
156) -> sqlx::Result<()> {
157    let now = Utc::now();
158    sqlx::query(
159        "INSERT INTO events (id, camera_id, site_id, event_type, severity, timestamp, payload, created_at)
160         VALUES (?, ?, NULL, ?, ?, ?, ?, ?)",
161    )
162    .bind(Uuid::new_v4().to_string())
163    .bind(camera_id)
164    .bind(event_type)
165    .bind(severity)
166    .bind(now)
167    .bind(Json(payload))
168    .bind(now)
169    .execute(pool)
170    .await?;
171    Ok(())
172}
173
174/// Toggle a transient read-lock on a set of segments so the retention sweeper (which only deletes
175/// `locked = 0`) won't remove them while clip/snapshot ffmpeg is reading them — closing the TOCTOU
176/// between selecting segments and ffmpeg opening their files. Best-effort: a failure is logged, not
177/// fatal (the read still proceeds). Locks are cleared at startup ([`crate::db::clear_segment_read_locks`])
178/// so a crash mid-read cannot pin segments forever.
179pub async fn set_segments_locked(pool: &SqlitePool, ids: &[String], locked: bool) {
180    if ids.is_empty() {
181        return;
182    }
183    let placeholders = vec!["?"; ids.len()].join(",");
184    let sql = format!("UPDATE segments SET locked = ? WHERE id IN ({placeholders})");
185    let mut q = sqlx::query(&sql).bind(i64::from(locked));
186    for id in ids {
187        q = q.bind(id);
188    }
189    if let Err(e) = q.execute(pool).await {
190        tracing::warn!(error = %e, locked, "failed to toggle segment read-lock");
191    }
192}
193
194/// Set or clear the DURABLE evidence lock on a single segment (distinct from the transient `locked`
195/// read-lock). When `incident_id` is supplied it is recorded; `COALESCE` preserves any existing tag
196/// when `incident_id` is `None` (so unlocking — or locking without a tag — never erases the case
197/// the segment was already attached to). Returns the number of rows affected (0 ⇒ no such segment).
198pub async fn set_evidence_locked(
199    pool: &SqlitePool,
200    segment_id: &str,
201    locked: bool,
202    incident_id: Option<&str>,
203) -> sqlx::Result<u64> {
204    let res = sqlx::query(
205        "UPDATE segments SET evidence_locked = ?, incident_id = COALESCE(?, incident_id) WHERE id = ?",
206    )
207    .bind(i64::from(locked))
208    .bind(incident_id)
209    .bind(segment_id)
210    .execute(pool)
211    .await?;
212    Ok(res.rows_affected())
213}