stakpak_api/local/migrations/
mod.rs1use libsql::Connection;
9use std::future::Future;
10use std::pin::Pin;
11
12mod v001_initial_schema;
13mod v002_nullable_columns;
14
15pub type MigrationFn =
17 fn(&Connection) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + '_>>;
18
19pub struct Migration {
21 pub version: u32,
23 pub description: &'static str,
25 pub apply: MigrationFn,
27 pub rollback: MigrationFn,
29}
30
31pub fn all_migrations() -> Vec<Migration> {
33 vec![
34 v001_initial_schema::migration(),
35 v002_nullable_columns::migration(),
36 ]
37}
38
39pub async fn apply_all(conn: &Connection) -> Result<Vec<u32>, String> {
41 init_migrations_table(conn).await?;
42
43 let applied = get_applied_versions(conn).await?;
44 let mut newly_applied = Vec::new();
45
46 for migration in all_migrations() {
47 if applied.contains(&migration.version) {
48 continue;
49 }
50
51 apply_migration(conn, &migration).await?;
52 newly_applied.push(migration.version);
53 }
54
55 Ok(newly_applied)
56}
57
58pub async fn rollback_last(conn: &Connection) -> Result<Option<u32>, String> {
60 let applied = get_applied_versions(conn).await?;
61
62 if let Some(&last_version) = applied.last() {
63 let migrations = all_migrations();
64 if let Some(migration) = migrations.iter().find(|m| m.version == last_version) {
65 rollback_migration(conn, migration).await?;
66 return Ok(Some(last_version));
67 }
68 }
69
70 Ok(None)
71}
72
73pub async fn rollback_to(conn: &Connection, target_version: u32) -> Result<Vec<u32>, String> {
75 let applied = get_applied_versions(conn).await?;
76 let migrations = all_migrations();
77 let mut rolled_back = Vec::new();
78
79 for &version in applied.iter().rev() {
80 if version <= target_version {
81 break;
82 }
83
84 if let Some(migration) = migrations.iter().find(|m| m.version == version) {
85 rollback_migration(conn, migration).await?;
86 rolled_back.push(version);
87 }
88 }
89
90 Ok(rolled_back)
91}
92
93pub async fn current_version(conn: &Connection) -> Result<u32, String> {
95 let applied = get_applied_versions(conn).await?;
96 Ok(applied.last().copied().unwrap_or(0))
97}
98
99pub async fn get_applied_versions(conn: &Connection) -> Result<Vec<u32>, String> {
101 let mut rows = conn
102 .query(
103 "SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'",
104 (),
105 )
106 .await
107 .map_err(|e| e.to_string())?;
108
109 if rows.next().await.map_err(|e| e.to_string())?.is_none() {
110 return Ok(Vec::new());
111 }
112 drop(rows);
113
114 let mut applied: Vec<u32> = Vec::new();
115 let mut rows = conn
116 .query("SELECT version FROM _migrations ORDER BY version", ())
117 .await
118 .map_err(|e| e.to_string())?;
119
120 while let Ok(Some(row)) = rows.next().await {
121 if let Ok(version) = row.get::<u32>(0) {
122 applied.push(version);
123 }
124 }
125
126 Ok(applied)
127}
128
129pub async fn status(conn: &Connection) -> Result<MigrationStatus, String> {
131 let applied = get_applied_versions(conn).await?;
132 let all = all_migrations();
133
134 let pending: Vec<u32> = all
135 .iter()
136 .filter(|m| !applied.contains(&m.version))
137 .map(|m| m.version)
138 .collect();
139
140 Ok(MigrationStatus { applied, pending })
141}
142
143pub struct MigrationStatus {
144 pub applied: Vec<u32>,
145 pub pending: Vec<u32>,
146}
147
148async fn init_migrations_table(conn: &Connection) -> Result<(), String> {
153 conn.execute(
154 "CREATE TABLE IF NOT EXISTS _migrations (
155 version INTEGER PRIMARY KEY,
156 description TEXT NOT NULL,
157 applied_at TEXT NOT NULL
158 )",
159 (),
160 )
161 .await
162 .map_err(|e| e.to_string())?;
163 Ok(())
164}
165
166async fn apply_migration(conn: &Connection, migration: &Migration) -> Result<(), String> {
167 conn.execute("PRAGMA foreign_keys=OFF", ())
168 .await
169 .map_err(|e| e.to_string())?;
170
171 (migration.apply)(conn).await?;
172
173 conn.execute(
174 "INSERT INTO _migrations (version, description, applied_at) VALUES (?, ?, datetime('now'))",
175 (migration.version, migration.description),
176 )
177 .await
178 .map_err(|e| e.to_string())?;
179
180 conn.execute("PRAGMA foreign_keys=ON", ())
181 .await
182 .map_err(|e| e.to_string())?;
183
184 Ok(())
185}
186
187async fn rollback_migration(conn: &Connection, migration: &Migration) -> Result<(), String> {
188 conn.execute("PRAGMA foreign_keys=OFF", ())
189 .await
190 .map_err(|e| e.to_string())?;
191
192 (migration.rollback)(conn).await?;
193
194 conn.execute(
195 "DELETE FROM _migrations WHERE version = ?",
196 [migration.version],
197 )
198 .await
199 .map_err(|e| e.to_string())?;
200
201 conn.execute("PRAGMA foreign_keys=ON", ())
202 .await
203 .map_err(|e| e.to_string())?;
204
205 Ok(())
206}
207
208pub async fn run_migrations(conn: &Connection) -> Result<(), String> {
214 apply_all(conn).await?;
215 Ok(())
216}