Skip to main content

heldar_kernel/services/
snapshot_scheduler.rs

1//! Scheduled interval snapshots. On each tick the scheduler fires any due `snapshot_schedule`
2//! (enabled, and either never fired or its interval has elapsed since `last_fired_at`): it grabs a
3//! live JPEG via the snapshot service, writes it to `snapshots_dir/{camera_id}/{taken_at}.jpg`,
4//! records a `snapshots` row, and advances `last_fired_at`. Captured frames are pruned by the
5//! retention sweeper past HELDAR_SNAPSHOT_RETENTION_HOURS. Spawned from `main` (supervised) only
6//! when HELDAR_SNAPSHOT_SCHEDULER_ENABLED.
7
8use std::time::Duration;
9
10use chrono::{DateTime, Utc};
11use uuid::Uuid;
12
13use crate::services::snapshot;
14use crate::state::AppState;
15
16pub async fn run(state: AppState) {
17    let interval_s = state.cfg.snapshot_scheduler_interval_s.max(5);
18    let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
19    loop {
20        tick.tick().await;
21        if let Err(e) = sweep(&state).await {
22            tracing::error!(error = %e, "snapshot_scheduler: tick failed");
23        }
24    }
25}
26
27/// Fire every due schedule once. Due-ness is computed in Rust (robust against SQLite text-time
28/// quirks): a schedule is due when it has never fired, or `last_fired_at + interval_seconds <= now`.
29async fn sweep(state: &AppState) -> anyhow::Result<()> {
30    let now = Utc::now();
31    let schedules: Vec<crate::models::SnapshotSchedule> =
32        sqlx::query_as::<_, crate::models::SnapshotSchedule>(
33            "SELECT * FROM snapshot_schedules WHERE enabled = 1",
34        )
35        .fetch_all(&state.pool)
36        .await?;
37
38    for sched in schedules {
39        let interval = sched.interval_seconds.max(1);
40        let due = match sched.last_fired_at {
41            None => true,
42            Some(last) => last + chrono::Duration::seconds(interval) <= now,
43        };
44        if !due {
45            continue;
46        }
47        // One camera failing to capture must not stop the others; log and move on.
48        if let Err(e) = fire(state, &sched).await {
49            tracing::warn!(
50                schedule = %sched.id,
51                camera = %sched.camera_id,
52                error = %e,
53                "snapshot_scheduler: capture failed"
54            );
55        }
56    }
57    Ok(())
58}
59
60/// Capture one frame for a schedule, persist the file + row, and stamp `last_fired_at`.
61async fn fire(state: &AppState, sched: &crate::models::SnapshotSchedule) -> anyhow::Result<()> {
62    let taken_at: DateTime<Utc> = Utc::now();
63    let bytes = snapshot::snapshot_live_raw(state, &sched.camera_id).await?;
64    let size_bytes = bytes.len() as i64;
65
66    let dir = state.cfg.snapshots_dir.join(&sched.camera_id);
67    tokio::fs::create_dir_all(&dir).await?;
68    // Compact, sortable, URL-safe filename derived from the capture time (no colons/offset chars).
69    let fname = format!("{}.jpg", taken_at.format("%Y%m%dT%H%M%S%3fZ"));
70    let path = dir.join(&fname);
71    tokio::fs::write(&path, &bytes).await?;
72    let path_str = path.to_string_lossy().to_string();
73
74    let id = format!("snap_{}", Uuid::new_v4().simple());
75    let now = Utc::now();
76    sqlx::query(
77        "INSERT INTO snapshots (id, camera_id, schedule_id, path, taken_at, size_bytes, created_at)
78         VALUES (?, ?, ?, ?, ?, ?, ?)",
79    )
80    .bind(&id)
81    .bind(&sched.camera_id)
82    .bind(&sched.id)
83    .bind(&path_str)
84    .bind(taken_at)
85    .bind(size_bytes)
86    .bind(now)
87    .execute(&state.pool)
88    .await?;
89
90    sqlx::query("UPDATE snapshot_schedules SET last_fired_at = ?, updated_at = ? WHERE id = ?")
91        .bind(taken_at)
92        .bind(now)
93        .bind(&sched.id)
94        .execute(&state.pool)
95        .await?;
96
97    tracing::debug!(camera = %sched.camera_id, path = %path_str, "snapshot_scheduler: captured snapshot");
98    Ok(())
99}