1use std::str::FromStr;
2use std::time::Duration;
3
4use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
5use sqlx::SqlitePool;
6
7use crate::config::Config;
8
9pub async fn init_pool(cfg: &Config) -> anyhow::Result<SqlitePool> {
11 if !cfg.database_url.starts_with("sqlite") {
12 anyhow::bail!(
13 "Stage 0 supports sqlite only; got `{}`. Postgres is planned via SQLx.",
14 cfg.database_url
15 );
16 }
17
18 let opts = SqliteConnectOptions::from_str(&cfg.database_url)?
19 .create_if_missing(true)
20 .journal_mode(SqliteJournalMode::Wal)
21 .synchronous(SqliteSynchronous::Normal)
22 .busy_timeout(Duration::from_secs(15))
23 .foreign_keys(true);
24
25 let pool = SqlitePoolOptions::new()
26 .max_connections(cfg.db_max_connections)
27 .acquire_timeout(Duration::from_secs(20))
28 .connect_with(opts)
29 .await?;
30
31 Ok(pool)
32}
33
34pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
36 sqlx::migrate!("./migrations").run(pool).await?;
37 Ok(())
38}
39
40pub async fn clear_segment_read_locks(pool: &SqlitePool) -> anyhow::Result<()> {
46 let n = sqlx::query("UPDATE segments SET locked = 0 WHERE locked <> 0")
47 .execute(pool)
48 .await?
49 .rows_affected();
50 if n > 0 {
51 tracing::info!(cleared = n, "startup: cleared stale segment read-locks");
52 }
53 Ok(())
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59
60 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
65 async fn concurrent_writers_serialize_without_busy_errors() {
66 let dir = std::env::temp_dir().join(format!("heldar-walstress-{}", std::process::id()));
67 let _ = std::fs::remove_dir_all(&dir);
68 std::fs::create_dir_all(&dir).unwrap();
69 let mut cfg = Config::from_env();
70 cfg.database_url = format!("sqlite://{}", dir.join("t.db").display());
71 cfg.db_max_connections = 8;
72 let pool = init_pool(&cfg).await.unwrap();
73 run_migrations(&pool).await.unwrap();
74
75 let mut handles = Vec::new();
77 for i in 0..64 {
78 let p = pool.clone();
79 handles.push(tokio::spawn(async move {
80 let now = chrono::Utc::now();
81 sqlx::query(
82 "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
83 VALUES (?, ?, 168, NULL, ?, ?)",
84 )
85 .bind(format!("cam{i}"))
86 .bind(format!("cam{i}"))
87 .bind(now)
88 .bind(now)
89 .execute(&p)
90 .await
91 }));
92 }
93 let mut errors = 0usize;
94 for h in handles {
95 if h.await.unwrap().is_err() {
96 errors += 1;
97 }
98 }
99 let _ = std::fs::remove_dir_all(&dir);
100 assert_eq!(
101 errors, 0,
102 "concurrent writers must not surface SQLITE_BUSY under WAL + busy_timeout ({errors} failed)"
103 );
104 }
105}