Skip to main content

stakpak_api/local/migrations/
mod.rs

1//! Database migrations
2//!
3//! Each migration has a unique version number and is applied in order.
4//! Applied migrations are tracked in the `_migrations` table.
5//!
6//! Migrations support both `apply` and `rollback` operations via async functions.
7
8use libsql::Connection;
9use std::future::Future;
10use std::pin::Pin;
11
12mod v001_initial_schema;
13mod v002_nullable_columns;
14
15/// Async migration function type
16pub type MigrationFn =
17    fn(&Connection) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + '_>>;
18
19/// A database migration with apply and rollback functions
20pub struct Migration {
21    /// Unique version number (must be sequential)
22    pub version: u32,
23    /// Human-readable description
24    pub description: &'static str,
25    /// Function to apply the migration
26    pub apply: MigrationFn,
27    /// Function to rollback the migration
28    pub rollback: MigrationFn,
29}
30
31/// All migrations in order
32pub fn all_migrations() -> Vec<Migration> {
33    vec![
34        v001_initial_schema::migration(),
35        v002_nullable_columns::migration(),
36    ]
37}
38
39/// Apply all pending migrations
40pub async fn apply_all(conn: &Connection) -> Result<Vec<u32>, String> {
41    init_migrations_table(conn).await?;
42
43    let applied = get_applied_versions(conn).await?;
44    let mut newly_applied = Vec::new();
45
46    for migration in all_migrations() {
47        if applied.contains(&migration.version) {
48            continue;
49        }
50
51        apply_migration(conn, &migration).await?;
52        newly_applied.push(migration.version);
53    }
54
55    Ok(newly_applied)
56}
57
58/// Rollback the last applied migration
59pub async fn rollback_last(conn: &Connection) -> Result<Option<u32>, String> {
60    let applied = get_applied_versions(conn).await?;
61
62    if let Some(&last_version) = applied.last() {
63        let migrations = all_migrations();
64        if let Some(migration) = migrations.iter().find(|m| m.version == last_version) {
65            rollback_migration(conn, migration).await?;
66            return Ok(Some(last_version));
67        }
68    }
69
70    Ok(None)
71}
72
73/// Rollback to a specific version (keeps that version, removes newer ones)
74pub async fn rollback_to(conn: &Connection, target_version: u32) -> Result<Vec<u32>, String> {
75    let applied = get_applied_versions(conn).await?;
76    let migrations = all_migrations();
77    let mut rolled_back = Vec::new();
78
79    for &version in applied.iter().rev() {
80        if version <= target_version {
81            break;
82        }
83
84        if let Some(migration) = migrations.iter().find(|m| m.version == version) {
85            rollback_migration(conn, migration).await?;
86            rolled_back.push(version);
87        }
88    }
89
90    Ok(rolled_back)
91}
92
93/// Get current migration version (0 if none applied)
94pub async fn current_version(conn: &Connection) -> Result<u32, String> {
95    let applied = get_applied_versions(conn).await?;
96    Ok(applied.last().copied().unwrap_or(0))
97}
98
99/// Get list of applied migration versions
100pub async fn get_applied_versions(conn: &Connection) -> Result<Vec<u32>, String> {
101    let mut rows = conn
102        .query(
103            "SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'",
104            (),
105        )
106        .await
107        .map_err(|e| e.to_string())?;
108
109    if rows.next().await.map_err(|e| e.to_string())?.is_none() {
110        return Ok(Vec::new());
111    }
112    drop(rows);
113
114    let mut applied: Vec<u32> = Vec::new();
115    let mut rows = conn
116        .query("SELECT version FROM _migrations ORDER BY version", ())
117        .await
118        .map_err(|e| e.to_string())?;
119
120    while let Ok(Some(row)) = rows.next().await {
121        if let Ok(version) = row.get::<u32>(0) {
122            applied.push(version);
123        }
124    }
125
126    Ok(applied)
127}
128
129/// Get migration status
130pub async fn status(conn: &Connection) -> Result<MigrationStatus, String> {
131    let applied = get_applied_versions(conn).await?;
132    let all = all_migrations();
133
134    let pending: Vec<u32> = all
135        .iter()
136        .filter(|m| !applied.contains(&m.version))
137        .map(|m| m.version)
138        .collect();
139
140    Ok(MigrationStatus { applied, pending })
141}
142
143pub struct MigrationStatus {
144    pub applied: Vec<u32>,
145    pub pending: Vec<u32>,
146}
147
148// ============================================================================
149// Internal
150// ============================================================================
151
152async fn init_migrations_table(conn: &Connection) -> Result<(), String> {
153    conn.execute(
154        "CREATE TABLE IF NOT EXISTS _migrations (
155            version INTEGER PRIMARY KEY,
156            description TEXT NOT NULL,
157            applied_at TEXT NOT NULL
158        )",
159        (),
160    )
161    .await
162    .map_err(|e| e.to_string())?;
163    Ok(())
164}
165
166async fn apply_migration(conn: &Connection, migration: &Migration) -> Result<(), String> {
167    conn.execute("PRAGMA foreign_keys=OFF", ())
168        .await
169        .map_err(|e| e.to_string())?;
170
171    (migration.apply)(conn).await?;
172
173    conn.execute(
174        "INSERT INTO _migrations (version, description, applied_at) VALUES (?, ?, datetime('now'))",
175        (migration.version, migration.description),
176    )
177    .await
178    .map_err(|e| e.to_string())?;
179
180    conn.execute("PRAGMA foreign_keys=ON", ())
181        .await
182        .map_err(|e| e.to_string())?;
183
184    Ok(())
185}
186
187async fn rollback_migration(conn: &Connection, migration: &Migration) -> Result<(), String> {
188    conn.execute("PRAGMA foreign_keys=OFF", ())
189        .await
190        .map_err(|e| e.to_string())?;
191
192    (migration.rollback)(conn).await?;
193
194    conn.execute(
195        "DELETE FROM _migrations WHERE version = ?",
196        [migration.version],
197    )
198    .await
199    .map_err(|e| e.to_string())?;
200
201    conn.execute("PRAGMA foreign_keys=ON", ())
202        .await
203        .map_err(|e| e.to_string())?;
204
205    Ok(())
206}
207
208// ============================================================================
209// Public API for storage.rs
210// ============================================================================
211
212/// Run all pending migrations
213pub async fn run_migrations(conn: &Connection) -> Result<(), String> {
214    apply_all(conn).await?;
215    Ok(())
216}