1use chrono::{DateTime, Utc};
4use serde_json::Value;
5use sqlx::types::Json;
6use sqlx::SqlitePool;
7use uuid::Uuid;
8
9pub async fn set_state(
11 pool: &SqlitePool,
12 camera_id: &str,
13 state: &str,
14 last_error: Option<&str>,
15) -> sqlx::Result<()> {
16 let now = Utc::now();
17 sqlx::query(
18 "INSERT INTO camera_status (camera_id, state, last_error, updated_at)
19 VALUES (?, ?, ?, ?)
20 ON CONFLICT(camera_id) DO UPDATE SET
21 state = excluded.state,
22 last_error = excluded.last_error,
23 updated_at = excluded.updated_at",
24 )
25 .bind(camera_id)
26 .bind(state)
27 .bind(last_error)
28 .bind(now)
29 .execute(pool)
30 .await?;
31 Ok(())
32}
33
34pub async fn set_running(
36 pool: &SqlitePool,
37 camera_id: &str,
38 state: &str,
39 pid: Option<i64>,
40) -> sqlx::Result<()> {
41 let now = Utc::now();
42 sqlx::query(
43 "INSERT INTO camera_status (camera_id, state, recorder_pid, last_started_at, last_error, updated_at)
44 VALUES (?, ?, ?, ?, NULL, ?)
45 ON CONFLICT(camera_id) DO UPDATE SET
46 state = excluded.state,
47 recorder_pid = excluded.recorder_pid,
48 last_started_at = excluded.last_started_at,
49 last_error = NULL,
50 updated_at = excluded.updated_at",
51 )
52 .bind(camera_id)
53 .bind(state)
54 .bind(pid)
55 .bind(now)
56 .bind(now)
57 .execute(pool)
58 .await?;
59 Ok(())
60}
61
62pub async fn bump_reconnect(
64 pool: &SqlitePool,
65 camera_id: &str,
66 last_error: &str,
67) -> sqlx::Result<()> {
68 let now = Utc::now();
69 let err = last_error.chars().rev().take(800).collect::<String>();
70 let err: String = err.chars().rev().collect();
71 sqlx::query(
72 "INSERT INTO camera_status (camera_id, state, reconnect_count, last_error, recorder_pid, updated_at)
73 VALUES (?, 'offline', 1, ?, NULL, ?)
74 ON CONFLICT(camera_id) DO UPDATE SET
75 state = 'offline',
76 reconnect_count = camera_status.reconnect_count + 1,
77 last_error = excluded.last_error,
78 recorder_pid = NULL,
79 updated_at = excluded.updated_at",
80 )
81 .bind(camera_id)
82 .bind(err)
83 .bind(now)
84 .execute(pool)
85 .await?;
86 Ok(())
87}
88
89pub async fn record_segment_indexed(
91 pool: &SqlitePool,
92 camera_id: &str,
93 last_segment_at: DateTime<Utc>,
94 bitrate_kbps: Option<f64>,
95 fps_observed: Option<f64>,
96) -> sqlx::Result<()> {
97 let now = Utc::now();
98 sqlx::query(
99 "INSERT INTO camera_status
100 (camera_id, state, last_segment_at, segments_written, bitrate_kbps, fps_observed, updated_at)
101 VALUES (?, 'recording', ?, 1, ?, ?, ?)
102 ON CONFLICT(camera_id) DO UPDATE SET
103 state = 'recording',
104 last_segment_at = excluded.last_segment_at,
105 segments_written = camera_status.segments_written + 1,
106 bitrate_kbps = excluded.bitrate_kbps,
107 fps_observed = excluded.fps_observed,
108 last_error = NULL,
109 updated_at = excluded.updated_at",
110 )
111 .bind(camera_id)
112 .bind(last_segment_at)
113 .bind(bitrate_kbps)
114 .bind(fps_observed)
115 .bind(now)
116 .execute(pool)
117 .await?;
118 Ok(())
119}
120
121pub async fn upsert_recording_gap(
125 pool: &SqlitePool,
126 camera_id: &str,
127 gap_start: DateTime<Utc>,
128 gap_end: DateTime<Utc>,
129 gap_seconds: i64,
130) -> sqlx::Result<()> {
131 let id = format!("gap_{}", Uuid::new_v4().simple());
132 sqlx::query(
133 "INSERT INTO recording_gaps
134 (id, camera_id, gap_start, gap_end, gap_seconds, fill_state, fill_attempts, created_at)
135 VALUES (?, ?, ?, ?, ?, 'pending', 0, ?)
136 ON CONFLICT(camera_id, gap_start) DO NOTHING",
137 )
138 .bind(id)
139 .bind(camera_id)
140 .bind(gap_start)
141 .bind(gap_end)
142 .bind(gap_seconds)
143 .bind(Utc::now())
144 .execute(pool)
145 .await?;
146 Ok(())
147}
148
149pub async fn log_event(
151 pool: &SqlitePool,
152 camera_id: Option<&str>,
153 event_type: &str,
154 severity: &str,
155 payload: Value,
156) -> sqlx::Result<()> {
157 let now = Utc::now();
158 sqlx::query(
159 "INSERT INTO events (id, camera_id, site_id, event_type, severity, timestamp, payload, created_at)
160 VALUES (?, ?, NULL, ?, ?, ?, ?, ?)",
161 )
162 .bind(Uuid::new_v4().to_string())
163 .bind(camera_id)
164 .bind(event_type)
165 .bind(severity)
166 .bind(now)
167 .bind(Json(payload))
168 .bind(now)
169 .execute(pool)
170 .await?;
171 Ok(())
172}
173
174pub async fn set_segments_locked(pool: &SqlitePool, ids: &[String], locked: bool) {
180 if ids.is_empty() {
181 return;
182 }
183 let placeholders = vec!["?"; ids.len()].join(",");
184 let sql = format!("UPDATE segments SET locked = ? WHERE id IN ({placeholders})");
185 let mut q = sqlx::query(&sql).bind(i64::from(locked));
186 for id in ids {
187 q = q.bind(id);
188 }
189 if let Err(e) = q.execute(pool).await {
190 tracing::warn!(error = %e, locked, "failed to toggle segment read-lock");
191 }
192}
193
194pub async fn set_evidence_locked(
199 pool: &SqlitePool,
200 segment_id: &str,
201 locked: bool,
202 incident_id: Option<&str>,
203) -> sqlx::Result<u64> {
204 let res = sqlx::query(
205 "UPDATE segments SET evidence_locked = ?, incident_id = COALESCE(?, incident_id) WHERE id = ?",
206 )
207 .bind(i64::from(locked))
208 .bind(incident_id)
209 .bind(segment_id)
210 .execute(pool)
211 .await?;
212 Ok(res.rows_affected())
213}