heldar_kernel/services/
snapshot_scheduler.rs1use std::time::Duration;
9
10use chrono::{DateTime, Utc};
11use uuid::Uuid;
12
13use crate::services::snapshot;
14use crate::state::AppState;
15
16pub async fn run(state: AppState) {
17 let interval_s = state.cfg.snapshot_scheduler_interval_s.max(5);
18 let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
19 loop {
20 tick.tick().await;
21 if let Err(e) = sweep(&state).await {
22 tracing::error!(error = %e, "snapshot_scheduler: tick failed");
23 }
24 }
25}
26
27async fn sweep(state: &AppState) -> anyhow::Result<()> {
30 let now = Utc::now();
31 let schedules: Vec<crate::models::SnapshotSchedule> =
32 sqlx::query_as::<_, crate::models::SnapshotSchedule>(
33 "SELECT * FROM snapshot_schedules WHERE enabled = 1",
34 )
35 .fetch_all(&state.pool)
36 .await?;
37
38 for sched in schedules {
39 let interval = sched.interval_seconds.max(1);
40 let due = match sched.last_fired_at {
41 None => true,
42 Some(last) => last + chrono::Duration::seconds(interval) <= now,
43 };
44 if !due {
45 continue;
46 }
47 if let Err(e) = fire(state, &sched).await {
49 tracing::warn!(
50 schedule = %sched.id,
51 camera = %sched.camera_id,
52 error = %e,
53 "snapshot_scheduler: capture failed"
54 );
55 }
56 }
57 Ok(())
58}
59
60async fn fire(state: &AppState, sched: &crate::models::SnapshotSchedule) -> anyhow::Result<()> {
62 let taken_at: DateTime<Utc> = Utc::now();
63 let bytes = snapshot::snapshot_live_raw(state, &sched.camera_id).await?;
64 let size_bytes = bytes.len() as i64;
65
66 let dir = state.cfg.snapshots_dir.join(&sched.camera_id);
67 tokio::fs::create_dir_all(&dir).await?;
68 let fname = format!("{}.jpg", taken_at.format("%Y%m%dT%H%M%S%3fZ"));
70 let path = dir.join(&fname);
71 tokio::fs::write(&path, &bytes).await?;
72 let path_str = path.to_string_lossy().to_string();
73
74 let id = format!("snap_{}", Uuid::new_v4().simple());
75 let now = Utc::now();
76 sqlx::query(
77 "INSERT INTO snapshots (id, camera_id, schedule_id, path, taken_at, size_bytes, created_at)
78 VALUES (?, ?, ?, ?, ?, ?, ?)",
79 )
80 .bind(&id)
81 .bind(&sched.camera_id)
82 .bind(&sched.id)
83 .bind(&path_str)
84 .bind(taken_at)
85 .bind(size_bytes)
86 .bind(now)
87 .execute(&state.pool)
88 .await?;
89
90 sqlx::query("UPDATE snapshot_schedules SET last_fired_at = ?, updated_at = ? WHERE id = ?")
91 .bind(taken_at)
92 .bind(now)
93 .bind(&sched.id)
94 .execute(&state.pool)
95 .await?;
96
97 tracing::debug!(camera = %sched.camera_id, path = %path_str, "snapshot_scheduler: captured snapshot");
98 Ok(())
99}