1use crate::error::{Error, Result};
2
3pub async fn migrate(conn: &libsql::Connection, dir: &str) -> Result<()> {
20 conn.execute(
22 "CREATE TABLE IF NOT EXISTS _migrations (
23 name TEXT PRIMARY KEY,
24 checksum TEXT NOT NULL,
25 applied_at TEXT NOT NULL DEFAULT (datetime('now'))
26 )",
27 (),
28 )
29 .await
30 .map_err(Error::from)?;
31
32 let dir_owned = dir.to_string();
34 let files = tokio::task::spawn_blocking(move || {
35 let dir_path = std::path::Path::new(&dir_owned);
36 if !dir_path.exists() {
37 return Ok(Vec::new());
38 }
39
40 let mut entries: Vec<std::fs::DirEntry> = std::fs::read_dir(dir_path)
41 .map_err(|e| {
42 Error::internal(format!("failed to read migrations directory: {dir_owned}"))
43 .chain(e)
44 })?
45 .filter_map(|entry| entry.ok())
46 .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql"))
47 .collect();
48 entries.sort_by_key(|e| e.file_name());
49
50 let mut result: Vec<(String, String)> = Vec::with_capacity(entries.len());
51 for entry in entries {
52 let name = entry.file_name().to_string_lossy().to_string();
53 let sql = std::fs::read_to_string(entry.path()).map_err(|e| {
54 Error::internal(format!("failed to read migration file: {name}")).chain(e)
55 })?;
56 result.push((name, sql));
57 }
58 Ok(result)
59 })
60 .await
61 .map_err(|e| Error::internal("migration task panicked").chain(e))?
62 as Result<Vec<(String, String)>>;
63
64 let files = files?;
65 if files.is_empty() {
66 return Ok(());
67 }
68
69 for (name, sql) in &files {
70 let checksum = fnv1a_hex(sql.as_bytes());
71
72 let mut rows = conn
74 .query(
75 "SELECT checksum FROM _migrations WHERE name = ?1",
76 libsql::params![name.clone()],
77 )
78 .await
79 .map_err(Error::from)?;
80
81 if let Some(row) = rows.next().await.map_err(Error::from)? {
82 let existing: String = row.get(0).map_err(Error::from)?;
83 if existing != checksum {
84 return Err(Error::internal(format!(
85 "migration '{name}' checksum mismatch — file was modified after applying (expected {existing}, got {checksum})"
86 )));
87 }
88 continue; }
90
91 conn.execute("BEGIN", ()).await.map_err(Error::from)?;
93
94 if let Err(e) = async {
95 conn.execute_batch(sql).await.map_err(|e| {
96 Error::internal(format!("failed to apply migration '{name}'")).chain(e)
97 })?;
98
99 conn.execute(
100 "INSERT INTO _migrations (name, checksum) VALUES (?1, ?2)",
101 libsql::params![name.clone(), checksum],
102 )
103 .await
104 .map_err(Error::from)?;
105
106 conn.execute("COMMIT", ()).await.map_err(Error::from)?;
107 Ok::<(), Error>(())
108 }
109 .await
110 {
111 if let Err(rb_err) = conn.execute("ROLLBACK", ()).await {
112 tracing::error!(error = %rb_err, "rollback failed after migration error");
113 }
114 return Err(e);
115 }
116 }
117
118 Ok(())
119}
120
121fn fnv1a_hex(data: &[u8]) -> String {
123 let mut hash: u64 = 0xcbf29ce484222325;
124 for &byte in data {
125 hash ^= byte as u64;
126 hash = hash.wrapping_mul(0x100000001b3);
127 }
128 format!("{:016x}", hash)
129}