1use chrono::{DateTime, Utc};
4use serde_json::Value;
5use sqlx::types::Json;
6use sqlx::SqlitePool;
7use uuid::Uuid;
8
9pub async fn set_state(
11 pool: &SqlitePool,
12 camera_id: &str,
13 state: &str,
14 last_error: Option<&str>,
15) -> sqlx::Result<()> {
16 let now = Utc::now();
17 sqlx::query(
18 "INSERT INTO camera_status (camera_id, state, last_error, updated_at)
19 VALUES (?, ?, ?, ?)
20 ON CONFLICT(camera_id) DO UPDATE SET
21 state = excluded.state,
22 last_error = excluded.last_error,
23 updated_at = excluded.updated_at",
24 )
25 .bind(camera_id)
26 .bind(state)
27 .bind(last_error)
28 .bind(now)
29 .execute(pool)
30 .await?;
31 Ok(())
32}
33
34pub async fn set_running(
36 pool: &SqlitePool,
37 camera_id: &str,
38 state: &str,
39 pid: Option<i64>,
40) -> sqlx::Result<()> {
41 let now = Utc::now();
42 sqlx::query(
43 "INSERT INTO camera_status (camera_id, state, recorder_pid, last_started_at, last_error, updated_at)
44 VALUES (?, ?, ?, ?, NULL, ?)
45 ON CONFLICT(camera_id) DO UPDATE SET
46 state = excluded.state,
47 recorder_pid = excluded.recorder_pid,
48 last_started_at = excluded.last_started_at,
49 last_error = NULL,
50 updated_at = excluded.updated_at",
51 )
52 .bind(camera_id)
53 .bind(state)
54 .bind(pid)
55 .bind(now)
56 .bind(now)
57 .execute(pool)
58 .await?;
59 Ok(())
60}
61
62pub async fn bump_reconnect(
64 pool: &SqlitePool,
65 camera_id: &str,
66 last_error: &str,
67) -> sqlx::Result<()> {
68 let now = Utc::now();
69 let err = last_error.chars().rev().take(800).collect::<String>();
70 let err: String = err.chars().rev().collect();
71 sqlx::query(
72 "INSERT INTO camera_status (camera_id, state, reconnect_count, last_error, recorder_pid, updated_at)
73 VALUES (?, 'offline', 1, ?, NULL, ?)
74 ON CONFLICT(camera_id) DO UPDATE SET
75 state = 'offline',
76 reconnect_count = camera_status.reconnect_count + 1,
77 last_error = excluded.last_error,
78 recorder_pid = NULL,
79 updated_at = excluded.updated_at",
80 )
81 .bind(camera_id)
82 .bind(err)
83 .bind(now)
84 .execute(pool)
85 .await?;
86 Ok(())
87}
88
89pub async fn record_segment_indexed(
91 pool: &SqlitePool,
92 camera_id: &str,
93 last_segment_at: DateTime<Utc>,
94 bitrate_kbps: Option<f64>,
95 fps_observed: Option<f64>,
96) -> sqlx::Result<()> {
97 let now = Utc::now();
98 sqlx::query(
99 "INSERT INTO camera_status
100 (camera_id, state, last_segment_at, segments_written, bitrate_kbps, fps_observed, updated_at)
101 VALUES (?, 'recording', ?, 1, ?, ?, ?)
102 ON CONFLICT(camera_id) DO UPDATE SET
103 state = 'recording',
104 last_segment_at = excluded.last_segment_at,
105 segments_written = camera_status.segments_written + 1,
106 bitrate_kbps = excluded.bitrate_kbps,
107 fps_observed = excluded.fps_observed,
108 last_error = NULL,
109 updated_at = excluded.updated_at",
110 )
111 .bind(camera_id)
112 .bind(last_segment_at)
113 .bind(bitrate_kbps)
114 .bind(fps_observed)
115 .bind(now)
116 .execute(pool)
117 .await?;
118 Ok(())
119}
120
121pub async fn log_event(
123 pool: &SqlitePool,
124 camera_id: Option<&str>,
125 event_type: &str,
126 severity: &str,
127 payload: Value,
128) -> sqlx::Result<()> {
129 let now = Utc::now();
130 sqlx::query(
131 "INSERT INTO events (id, camera_id, site_id, event_type, severity, timestamp, payload, created_at)
132 VALUES (?, ?, NULL, ?, ?, ?, ?, ?)",
133 )
134 .bind(Uuid::new_v4().to_string())
135 .bind(camera_id)
136 .bind(event_type)
137 .bind(severity)
138 .bind(now)
139 .bind(Json(payload))
140 .bind(now)
141 .execute(pool)
142 .await?;
143 Ok(())
144}
145
146pub async fn set_segments_locked(pool: &SqlitePool, ids: &[String], locked: bool) {
152 if ids.is_empty() {
153 return;
154 }
155 let placeholders = vec!["?"; ids.len()].join(",");
156 let sql = format!("UPDATE segments SET locked = ? WHERE id IN ({placeholders})");
157 let mut q = sqlx::query(&sql).bind(i64::from(locked));
158 for id in ids {
159 q = q.bind(id);
160 }
161 if let Err(e) = q.execute(pool).await {
162 tracing::warn!(error = %e, locked, "failed to toggle segment read-lock");
163 }
164}