Skip to main content

modo/db/
migrate.rs

1use crate::error::{Error, Result};
2
3/// Run SQL migrations from a directory against a connection.
4///
5/// Reads `*.sql` files sorted by filename, tracks applied migrations
6/// in a `_migrations` table with checksum verification. Each migration
7/// is applied inside a transaction so the schema change and the
8/// `_migrations` record are committed atomically.
9///
10/// Already-applied migrations are skipped. If a file's checksum differs
11/// from the recorded checksum, an error is returned (the file was modified
12/// after being applied).
13///
14/// # Errors
15///
16/// Returns an error if the migrations directory cannot be read, a
17/// migration file cannot be parsed, a checksum mismatch is detected,
18/// or a migration statement fails.
19pub async fn migrate(conn: &libsql::Connection, dir: &str) -> Result<()> {
20    // Create tracking table
21    conn.execute(
22        "CREATE TABLE IF NOT EXISTS _migrations (
23            name TEXT PRIMARY KEY,
24            checksum TEXT NOT NULL,
25            applied_at TEXT NOT NULL DEFAULT (datetime('now'))
26        )",
27        (),
28    )
29    .await
30    .map_err(Error::from)?;
31
32    // Read and sort migration files on a blocking thread
33    let dir_owned = dir.to_string();
34    let files = tokio::task::spawn_blocking(move || {
35        let dir_path = std::path::Path::new(&dir_owned);
36        if !dir_path.exists() {
37            return Ok(Vec::new());
38        }
39
40        let mut entries: Vec<std::fs::DirEntry> = std::fs::read_dir(dir_path)
41            .map_err(|e| {
42                Error::internal(format!("failed to read migrations directory: {dir_owned}"))
43                    .chain(e)
44            })?
45            .filter_map(|entry| entry.ok())
46            .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql"))
47            .collect();
48        entries.sort_by_key(|e| e.file_name());
49
50        let mut result: Vec<(String, String)> = Vec::with_capacity(entries.len());
51        for entry in entries {
52            let name = entry.file_name().to_string_lossy().to_string();
53            let sql = std::fs::read_to_string(entry.path()).map_err(|e| {
54                Error::internal(format!("failed to read migration file: {name}")).chain(e)
55            })?;
56            result.push((name, sql));
57        }
58        Ok(result)
59    })
60    .await
61    .map_err(|e| Error::internal("migration task panicked").chain(e))?
62        as Result<Vec<(String, String)>>;
63
64    let files = files?;
65    if files.is_empty() {
66        return Ok(());
67    }
68
69    for (name, sql) in &files {
70        let checksum = fnv1a_hex(sql.as_bytes());
71
72        // Check if already applied
73        let mut rows = conn
74            .query(
75                "SELECT checksum FROM _migrations WHERE name = ?1",
76                libsql::params![name.clone()],
77            )
78            .await
79            .map_err(Error::from)?;
80
81        if let Some(row) = rows.next().await.map_err(Error::from)? {
82            let existing: String = row.get(0).map_err(Error::from)?;
83            if existing != checksum {
84                return Err(Error::internal(format!(
85                    "migration '{name}' checksum mismatch — file was modified after applying (expected {existing}, got {checksum})"
86                )));
87            }
88            continue; // Already applied
89        }
90
91        // Apply migration inside a transaction
92        conn.execute("BEGIN", ()).await.map_err(Error::from)?;
93
94        if let Err(e) = async {
95            conn.execute_batch(sql).await.map_err(|e| {
96                Error::internal(format!("failed to apply migration '{name}'")).chain(e)
97            })?;
98
99            conn.execute(
100                "INSERT INTO _migrations (name, checksum) VALUES (?1, ?2)",
101                libsql::params![name.clone(), checksum],
102            )
103            .await
104            .map_err(Error::from)?;
105
106            conn.execute("COMMIT", ()).await.map_err(Error::from)?;
107            Ok::<(), Error>(())
108        }
109        .await
110        {
111            if let Err(rb_err) = conn.execute("ROLLBACK", ()).await {
112                tracing::error!(error = %rb_err, "rollback failed after migration error");
113            }
114            return Err(e);
115        }
116    }
117
118    Ok(())
119}
120
121/// FNV-1a hash, deterministic and stable across Rust versions.
122fn fnv1a_hex(data: &[u8]) -> String {
123    let mut hash: u64 = 0xcbf29ce484222325;
124    for &byte in data {
125        hash ^= byte as u64;
126        hash = hash.wrapping_mul(0x100000001b3);
127    }
128    format!("{:016x}", hash)
129}