Skip to main content

heldar_kernel/
db.rs

1use std::str::FromStr;
2use std::time::Duration;
3
4use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
5use sqlx::SqlitePool;
6
7use crate::config::Config;
8
9/// Open the SQLite pool with WAL + sane concurrency settings, creating the file if needed.
10pub async fn init_pool(cfg: &Config) -> anyhow::Result<SqlitePool> {
11    if !cfg.database_url.starts_with("sqlite") {
12        anyhow::bail!(
13            "Stage 0 supports sqlite only; got `{}`. Postgres is planned via SQLx.",
14            cfg.database_url
15        );
16    }
17
18    let opts = SqliteConnectOptions::from_str(&cfg.database_url)?
19        .create_if_missing(true)
20        .journal_mode(SqliteJournalMode::Wal)
21        .synchronous(SqliteSynchronous::Normal)
22        .busy_timeout(Duration::from_secs(15))
23        .foreign_keys(true);
24
25    let pool = SqlitePoolOptions::new()
26        .max_connections(cfg.db_max_connections)
27        .acquire_timeout(Duration::from_secs(20))
28        .connect_with(opts)
29        .await?;
30
31    Ok(pool)
32}
33
34/// Apply embedded migrations from `./migrations`.
35pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
36    sqlx::migrate!("./migrations").run(pool).await?;
37    Ok(())
38}
39
40/// Clear any transient segment read-locks left over from a crash. clip/snapshot export set
41/// `segments.locked = 1` while ffmpeg reads a segment and release it afterwards; if the process died
42/// mid-read those segments would stay locked (and never be pruned by retention). Clearing at startup
43/// makes the read-lock crash-safe. NOTE: this means `locked` is reserved for transient read-locks —
44/// a future durable evidence-hold must use a separate column, not this one.
45pub async fn clear_segment_read_locks(pool: &SqlitePool) -> anyhow::Result<()> {
46    let n = sqlx::query("UPDATE segments SET locked = 0 WHERE locked <> 0")
47        .execute(pool)
48        .await?
49        .rows_affected();
50    if n > 0 {
51        tracing::info!(cleared = n, "startup: cleared stale segment read-locks");
52    }
53    Ok(())
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59
60    /// First-principles concurrency invariant: under heavy concurrent writers on the real production
61    /// pool config (WAL + busy_timeout), a normal write must WAIT (serialize) rather than surface
62    /// SQLITE_BUSY as an error. If this ever fails, the busy_timeout is too low (and the 503 mapping
63    /// in error.rs is the user-facing safety net).
64    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
65    async fn concurrent_writers_serialize_without_busy_errors() {
66        let dir = std::env::temp_dir().join(format!("heldar-walstress-{}", std::process::id()));
67        let _ = std::fs::remove_dir_all(&dir);
68        std::fs::create_dir_all(&dir).unwrap();
69        let mut cfg = Config::from_env();
70        cfg.database_url = format!("sqlite://{}", dir.join("t.db").display());
71        cfg.db_max_connections = 8;
72        let pool = init_pool(&cfg).await.unwrap();
73        run_migrations(&pool).await.unwrap();
74
75        // 64 concurrent writers contend for the single WAL writer slot.
76        let mut handles = Vec::new();
77        for i in 0..64 {
78            let p = pool.clone();
79            handles.push(tokio::spawn(async move {
80                let now = chrono::Utc::now();
81                sqlx::query(
82                    "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
83                     VALUES (?, ?, 168, NULL, ?, ?)",
84                )
85                .bind(format!("cam{i}"))
86                .bind(format!("cam{i}"))
87                .bind(now)
88                .bind(now)
89                .execute(&p)
90                .await
91            }));
92        }
93        let mut errors = 0usize;
94        for h in handles {
95            if h.await.unwrap().is_err() {
96                errors += 1;
97            }
98        }
99        let _ = std::fs::remove_dir_all(&dir);
100        assert_eq!(
101            errors, 0,
102            "concurrent writers must not surface SQLITE_BUSY under WAL + busy_timeout ({errors} failed)"
103        );
104    }
105}