Skip to main content

atomr_persistence_sql/
schema.rs

1//! Idempotent schema bootstrap.
2//!
3//! `ensure_schema` is safe to call on startup in dev / test; in prod it is
4//! guarded by the `auto_migrate` config flag (no-op when disabled).
5
6use atomr_persistence::JournalError;
7use sqlx::AnyPool;
8
9use crate::config::{SqlConfig, SqlDialect};
10
11/// Install sqlx runtime drivers for every enabled feature. Idempotent.
12pub(crate) fn init_drivers() {
13    sqlx::any::install_default_drivers();
14}
15
16/// Apply the migration DDL for the configured dialect. Statements are split
17/// on `;` so a single embedded SQL file can bootstrap every required table.
18pub async fn ensure_schema(pool: &AnyPool, cfg: &SqlConfig) -> Result<(), JournalError> {
19    if !cfg.auto_migrate {
20        return Ok(());
21    }
22    let ddl = crate::dialect::migration_for(cfg.dialect);
23    for stmt in split_statements(ddl) {
24        sqlx::query(&stmt).execute(pool).await.map_err(JournalError::backend)?;
25    }
26    // FR-9 / FR-8: additive WORM hash-chain + bitemporal columns (002). The
27    // ALTER statements are tolerant of an already-migrated schema per dialect
28    // (IF NOT EXISTS / COL_LENGTH guards); on SQLite, re-adding a column errors,
29    // so we ignore "duplicate column" failures to keep bootstrap idempotent.
30    let worm_ddl = crate::dialect::worm_migration_for(cfg.dialect);
31    for stmt in split_statements(worm_ddl) {
32        if let Err(e) = sqlx::query(&stmt).execute(pool).await {
33            let msg = e.to_string().to_ascii_lowercase();
34            let already_applied = msg.contains("duplicate column")
35                || msg.contains("already exists")
36                || msg.contains("duplicate")
37                || msg.contains("exists");
38            if !already_applied {
39                return Err(JournalError::backend(e));
40            }
41        }
42    }
43    Ok(())
44}
45
46/// Install the dialect's WORM `deny_update_delete` enforcement DDL.
47/// Called by [`crate::journal::SqlJournal::with_worm`] when the toggle is on.
48/// SQLite installs `BEFORE UPDATE/DELETE` abort triggers; other dialects are
49/// documented for out-of-band operator action (see each `002_worm.sql`).
50pub(crate) async fn install_worm_triggers(pool: &AnyPool, cfg: &SqlConfig) -> Result<(), JournalError> {
51    let ddl = crate::dialect::worm_deny_trigger_for(cfg.dialect);
52    // Trigger bodies contain inner `;`, so they are delimited by `@@`, not `;`.
53    for stmt in ddl.split("@@").map(str::trim).filter(|s| !s.is_empty()) {
54        sqlx::query(stmt).execute(pool).await.map_err(JournalError::backend)?;
55    }
56    Ok(())
57}
58
59fn split_statements(ddl: &str) -> Vec<String> {
60    let stripped: String =
61        ddl.lines().map(|l| l.split("--").next().unwrap_or("")).collect::<Vec<_>>().join("\n");
62    stripped.split(';').map(str::trim).filter(|s| !s.is_empty()).map(|s| s.to_string()).collect()
63}
64
65/// Drop every journal/snapshot row for tests that want a clean slate.
66/// Requires the schema to already exist.
67#[allow(dead_code)]
68pub async fn truncate(pool: &AnyPool, _dialect: SqlDialect) -> Result<(), JournalError> {
69    for table in ["event_tags", "event_journal", "snapshot_store"] {
70        sqlx::query(&format!("DELETE FROM {table}")).execute(pool).await.map_err(JournalError::backend)?;
71    }
72    Ok(())
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn splitter_skips_blank_and_comments() {
81        let sql = "-- hello\nCREATE TABLE a (id INT);\n\nCREATE TABLE b (id INT);";
82        let out = split_statements(sql);
83        assert_eq!(out.len(), 2);
84        assert!(out[0].starts_with("CREATE TABLE a"));
85    }
86}