heldar_kernel/services/
indexer.rs1use std::path::Path;
5use std::sync::Arc;
6use std::time::{Duration, SystemTime};
7
8use chrono::{DateTime, Utc};
9use serde_json::json;
10use sqlx::SqlitePool;
11use uuid::Uuid;
12
13use crate::config::Config;
14use crate::models::Camera;
15use crate::repo;
16use crate::util;
17
18const SETTLE_SECS: u64 = 5;
20
21pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
22 let mut tick = tokio::time::interval(Duration::from_secs(cfg.indexer_interval_s.max(2)));
23 loop {
24 tick.tick().await;
25 if let Err(e) = scan_once(&pool, &cfg).await {
26 tracing::error!(error = %e, "indexer: scan failed");
27 }
28 }
29}
30
31async fn scan_once(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
32 let cams: Vec<Camera> = sqlx::query_as::<_, Camera>("SELECT * FROM cameras")
33 .fetch_all(pool)
34 .await?;
35 for cam in cams {
36 let dir = cfg.camera_recordings_dir(&cam.id);
37 if !dir.exists() {
38 continue;
39 }
40 if let Err(e) = index_camera_dir(pool, cfg, &cam.id, &dir).await {
41 tracing::error!(camera_id = %cam.id, error = %e, "indexer: dir scan failed");
42 }
43 }
44 Ok(())
45}
46
47async fn index_camera_dir(
48 pool: &SqlitePool,
49 cfg: &Config,
50 camera_id: &str,
51 dir: &Path,
52) -> anyhow::Result<()> {
53 let mut entries = tokio::fs::read_dir(dir).await?;
54 let mut files: Vec<(String, std::path::PathBuf, SystemTime, u64)> = Vec::new();
55 while let Some(ent) = entries.next_entry().await? {
56 let path = ent.path();
57 if path.extension().and_then(|e| e.to_str()) != Some("mp4") {
58 continue;
59 }
60 let Ok(meta) = ent.metadata().await else {
61 continue;
62 };
63 if !meta.is_file() {
64 continue;
65 }
66 let name = ent.file_name().to_string_lossy().to_string();
67 let mtime = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
68 files.push((name, path, mtime, meta.len()));
69 }
70 files.sort_by(|a, b| a.0.cmp(&b.0));
71
72 let now = SystemTime::now();
73 for (name, path, mtime, size) in files {
74 if let Ok(age) = now.duration_since(mtime) {
76 if age < Duration::from_secs(SETTLE_SECS) {
77 continue;
78 }
79 }
80 let path_str = path.to_string_lossy().to_string();
81 let already: Option<(String,)> = sqlx::query_as("SELECT id FROM segments WHERE path = ?")
82 .bind(&path_str)
83 .fetch_optional(pool)
84 .await?;
85 if already.is_some() {
86 continue;
87 }
88 let Some(start) = util::parse_segment_time(&name) else {
89 tracing::warn!(%camera_id, file = %name, "indexer: unparseable filename, skipping");
90 continue;
91 };
92 let probe = match util::ffprobe_file(&cfg.ffprobe_bin, &path).await {
93 Ok(p) => p,
94 Err(e) => {
95 tracing::debug!(%camera_id, file = %name, error = %e, "indexer: probe failed (retry later)");
96 continue;
97 }
98 };
99 if !probe.duration_s.is_finite() || probe.duration_s <= 0.05 || size == 0 {
100 continue; }
102 let end = start + chrono::Duration::milliseconds((probe.duration_s * 1000.0) as i64);
103 let bitrate_kbps = if probe.duration_s > 0.0 {
104 Some((size as f64 * 8.0) / probe.duration_s / 1000.0)
105 } else {
106 None
107 };
108
109 let prev_end: Option<(DateTime<Utc>,)> = sqlx::query_as(
110 "SELECT end_time FROM segments WHERE camera_id = ? ORDER BY end_time DESC LIMIT 1",
111 )
112 .bind(camera_id)
113 .fetch_optional(pool)
114 .await?;
115
116 let id = format!("seg_{}", Uuid::new_v4().simple());
117 sqlx::query(
118 "INSERT INTO segments
119 (id, camera_id, path, start_time, end_time, duration_s, codec, width, height,
120 size_bytes, container, locked, incident_id, created_at)
121 VALUES (?,?,?,?,?,?,?,?,?,?, 'mp4', 0, NULL, ?)",
122 )
123 .bind(&id)
124 .bind(camera_id)
125 .bind(&path_str)
126 .bind(start)
127 .bind(end)
128 .bind(probe.duration_s)
129 .bind(&probe.codec)
130 .bind(probe.width)
131 .bind(probe.height)
132 .bind(size as i64)
133 .bind(Utc::now())
134 .execute(pool)
135 .await?;
136
137 let _ = repo::record_segment_indexed(pool, camera_id, end, bitrate_kbps, probe.fps).await;
138
139 if let Some((pe,)) = prev_end {
140 let gap = (start - pe).num_seconds();
141 if gap > 3 {
142 let _ = repo::log_event(
143 pool,
144 Some(camera_id),
145 "recording_gap",
146 "warning",
147 json!({ "gap_seconds": gap, "prev_end": pe, "next_start": start }),
148 )
149 .await;
150 let _ = repo::upsert_recording_gap(pool, camera_id, pe, start, gap).await;
152 }
153 }
154 tracing::debug!(%camera_id, file = %name, dur = probe.duration_s, "indexer: indexed segment");
155 }
156 Ok(())
157}