Skip to main content

heldar_kernel/services/
indexer.rs

1//! Timeline indexer: periodically scans each camera's recordings directory, turning closed
2//! segment files into rows in the `segments` table (the timeline index) and detecting gaps.
3
4use std::path::Path;
5use std::sync::Arc;
6use std::time::{Duration, SystemTime};
7
8use chrono::{DateTime, Utc};
9use serde_json::json;
10use sqlx::SqlitePool;
11use uuid::Uuid;
12
13use crate::config::Config;
14use crate::models::Camera;
15use crate::repo;
16use crate::util;
17
18/// A file untouched for at least this long is treated as closed (not mid-write).
19const SETTLE_SECS: u64 = 5;
20
21pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
22    let mut tick = tokio::time::interval(Duration::from_secs(cfg.indexer_interval_s.max(2)));
23    loop {
24        tick.tick().await;
25        if let Err(e) = scan_once(&pool, &cfg).await {
26            tracing::error!(error = %e, "indexer: scan failed");
27        }
28    }
29}
30
31async fn scan_once(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
32    let cams: Vec<Camera> = sqlx::query_as::<_, Camera>("SELECT * FROM cameras")
33        .fetch_all(pool)
34        .await?;
35    for cam in cams {
36        let dir = cfg.camera_recordings_dir(&cam.id);
37        if !dir.exists() {
38            continue;
39        }
40        if let Err(e) = index_camera_dir(pool, cfg, &cam.id, &dir).await {
41            tracing::error!(camera_id = %cam.id, error = %e, "indexer: dir scan failed");
42        }
43    }
44    Ok(())
45}
46
47async fn index_camera_dir(
48    pool: &SqlitePool,
49    cfg: &Config,
50    camera_id: &str,
51    dir: &Path,
52) -> anyhow::Result<()> {
53    let mut entries = tokio::fs::read_dir(dir).await?;
54    let mut files: Vec<(String, std::path::PathBuf, SystemTime, u64)> = Vec::new();
55    while let Some(ent) = entries.next_entry().await? {
56        let path = ent.path();
57        if path.extension().and_then(|e| e.to_str()) != Some("mp4") {
58            continue;
59        }
60        let Ok(meta) = ent.metadata().await else {
61            continue;
62        };
63        if !meta.is_file() {
64            continue;
65        }
66        let name = ent.file_name().to_string_lossy().to_string();
67        let mtime = meta.modified().unwrap_or(SystemTime::UNIX_EPOCH);
68        files.push((name, path, mtime, meta.len()));
69    }
70    files.sort_by(|a, b| a.0.cmp(&b.0));
71
72    let now = SystemTime::now();
73    for (name, path, mtime, size) in files {
74        // Skip files still being written (recently modified).
75        if let Ok(age) = now.duration_since(mtime) {
76            if age < Duration::from_secs(SETTLE_SECS) {
77                continue;
78            }
79        }
80        let path_str = path.to_string_lossy().to_string();
81        let already: Option<(String,)> = sqlx::query_as("SELECT id FROM segments WHERE path = ?")
82            .bind(&path_str)
83            .fetch_optional(pool)
84            .await?;
85        if already.is_some() {
86            continue;
87        }
88        let Some(start) = util::parse_segment_time(&name) else {
89            tracing::warn!(%camera_id, file = %name, "indexer: unparseable filename, skipping");
90            continue;
91        };
92        let probe = match util::ffprobe_file(&cfg.ffprobe_bin, &path).await {
93            Ok(p) => p,
94            Err(e) => {
95                tracing::debug!(%camera_id, file = %name, error = %e, "indexer: probe failed (retry later)");
96                continue;
97            }
98        };
99        if !probe.duration_s.is_finite() || probe.duration_s <= 0.05 || size == 0 {
100            continue; // empty/just-rotated stub, or a bogus (NaN/inf) probed duration
101        }
102        let end = start + chrono::Duration::milliseconds((probe.duration_s * 1000.0) as i64);
103        let bitrate_kbps = if probe.duration_s > 0.0 {
104            Some((size as f64 * 8.0) / probe.duration_s / 1000.0)
105        } else {
106            None
107        };
108
109        let prev_end: Option<(DateTime<Utc>,)> = sqlx::query_as(
110            "SELECT end_time FROM segments WHERE camera_id = ? ORDER BY end_time DESC LIMIT 1",
111        )
112        .bind(camera_id)
113        .fetch_optional(pool)
114        .await?;
115
116        let id = format!("seg_{}", Uuid::new_v4().simple());
117        sqlx::query(
118            "INSERT INTO segments
119               (id, camera_id, path, start_time, end_time, duration_s, codec, width, height,
120                size_bytes, container, locked, incident_id, created_at)
121             VALUES (?,?,?,?,?,?,?,?,?,?, 'mp4', 0, NULL, ?)",
122        )
123        .bind(&id)
124        .bind(camera_id)
125        .bind(&path_str)
126        .bind(start)
127        .bind(end)
128        .bind(probe.duration_s)
129        .bind(&probe.codec)
130        .bind(probe.width)
131        .bind(probe.height)
132        .bind(size as i64)
133        .bind(Utc::now())
134        .execute(pool)
135        .await?;
136
137        let _ = repo::record_segment_indexed(pool, camera_id, end, bitrate_kbps, probe.fps).await;
138
139        if let Some((pe,)) = prev_end {
140            // Second-resolution segment filenames can make the previous segment's end overlap this
141            // one's start. Clamp any prior segment that overlaps this start so segments never overlap
142            // in time (A.end <= B.start) — keeps playback/timeline coverage unambiguous.
143            if pe > start {
144                let _ = sqlx::query(
145                    "UPDATE segments SET end_time = ? WHERE camera_id = ? AND end_time > ? AND start_time < ?",
146                )
147                .bind(start)
148                .bind(camera_id)
149                .bind(start)
150                .bind(start)
151                .execute(pool)
152                .await;
153            }
154            let gap = (start - pe).num_seconds();
155            if gap > 3 {
156                let _ = repo::log_event(
157                    pool,
158                    Some(camera_id),
159                    "recording_gap",
160                    "warning",
161                    json!({ "gap_seconds": gap, "prev_end": pe, "next_start": start }),
162                )
163                .await;
164                // Persist the gap for ANR edge re-fill (ignore-on-conflict by camera_id + start).
165                let _ = repo::upsert_recording_gap(pool, camera_id, pe, start, gap).await;
166            }
167        }
168        tracing::debug!(%camera_id, file = %name, dur = probe.duration_s, "indexer: indexed segment");
169    }
170    Ok(())
171}