Skip to main content

mixtape_tools/sqlite/migration/
run.rs

1//! Run migrations tool
2
3use crate::prelude::*;
4use crate::sqlite::error::SqliteToolError;
5use crate::sqlite::manager::DATABASE_MANAGER;
6use chrono::Utc;
7
8use super::{compute_checksum, ensure_migrations_table, MIGRATIONS_TABLE};
9
10/// Input for running pending migrations
11#[derive(Debug, Deserialize, JsonSchema)]
12pub struct RunMigrationsInput {
13    /// Database to run migrations on (uses default if not specified)
14    #[serde(default)]
15    pub db_path: Option<String>,
16}
17
18/// Runs all pending migrations in version order
19///
20/// Each migration is executed within a transaction. If a migration fails,
21/// it is rolled back and subsequent migrations are not attempted.
22pub struct RunMigrationsTool;
23
24impl Tool for RunMigrationsTool {
25    type Input = RunMigrationsInput;
26
27    fn name(&self) -> &str {
28        "sqlite_run_migrations"
29    }
30
31    fn description(&self) -> &str {
32        "Apply all pending schema migrations in version order. Each migration runs in a \
33         transaction. If a migration fails, it is rolled back and no further migrations run."
34    }
35
36    async fn execute(&self, input: Self::Input) -> Result<ToolResult, ToolError> {
37        let result = tokio::task::spawn_blocking(move || -> Result<_, SqliteToolError> {
38            let conn = DATABASE_MANAGER.get(input.db_path.as_deref())?;
39            let mut conn = conn.lock().unwrap();
40
41            // Ensure migrations table exists
42            ensure_migrations_table(&conn)?;
43
44            // Get pending migrations ordered by version
45            let mut stmt = conn.prepare(&format!(
46                "SELECT version, name, sql, checksum FROM {MIGRATIONS_TABLE} \
47                 WHERE applied_at IS NULL ORDER BY version ASC"
48            ))?;
49
50            let pending: Vec<(String, String, String, String)> = stmt
51                .query_map([], |row| {
52                    Ok((
53                        row.get::<_, String>(0)?,
54                        row.get::<_, String>(1)?,
55                        row.get::<_, String>(2)?,
56                        row.get::<_, String>(3)?,
57                    ))
58                })?
59                .collect::<Result<Vec<_>, _>>()?;
60
61            drop(stmt);
62
63            if pending.is_empty() {
64                return Ok((0, Vec::new()));
65            }
66
67            let mut applied = Vec::new();
68
69            for (version, name, sql, stored_checksum) in pending {
70                // Verify checksum
71                let actual_checksum = compute_checksum(&sql);
72                if actual_checksum != stored_checksum {
73                    return Err(SqliteToolError::MigrationChecksumMismatch {
74                        version,
75                        expected: stored_checksum,
76                        actual: actual_checksum,
77                    });
78                }
79
80                // Execute migration in a transaction
81                let tx = conn.transaction()?;
82
83                // Execute the migration SQL
84                tx.execute_batch(&sql)?;
85
86                // Mark as applied
87                let applied_at = Utc::now().to_rfc3339();
88                tx.execute(
89                    &format!("UPDATE {MIGRATIONS_TABLE} SET applied_at = ?1 WHERE version = ?2"),
90                    rusqlite::params![applied_at, version],
91                )?;
92
93                tx.commit()?;
94
95                applied.push(serde_json::json!({
96                    "version": version,
97                    "name": name,
98                    "applied_at": applied_at
99                }));
100            }
101
102            Ok((applied.len(), applied))
103        })
104        .await
105        .map_err(|e| ToolError::Custom(format!("Task join error: {e}")))?;
106
107        match result {
108            Ok((count, applied)) => Ok(ToolResult::Json(serde_json::json!({
109                "status": "success",
110                "migrations_applied": count,
111                "applied": applied,
112                "message": if count == 0 {
113                    "No pending migrations to apply".to_string()
114                } else {
115                    format!("{} migration(s) applied successfully", count)
116                }
117            }))),
118            Err(e) => Err(e.into()),
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::sqlite::migration::add::AddMigrationInput;
127    use crate::sqlite::migration::AddMigrationTool;
128    use crate::sqlite::test_utils::{unwrap_json, TestDatabase};
129
130    #[tokio::test]
131    async fn test_run_migrations_empty() {
132        let db = TestDatabase::new().await;
133
134        let tool = RunMigrationsTool;
135        let input = RunMigrationsInput {
136            db_path: Some(db.key()),
137        };
138
139        let result = tool.execute(input).await.unwrap();
140        let json = unwrap_json(result);
141
142        assert_eq!(json["status"], "success");
143        assert_eq!(json["migrations_applied"], 0);
144    }
145
146    #[tokio::test]
147    async fn test_run_single_migration() {
148        let db = TestDatabase::new().await;
149
150        // Add a migration
151        let add_tool = AddMigrationTool;
152        add_tool
153            .execute(AddMigrationInput {
154                name: "create users table".to_string(),
155                sql: "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);".to_string(),
156                db_path: Some(db.key()),
157            })
158            .await
159            .unwrap();
160
161        // Run migrations
162        let run_tool = RunMigrationsTool;
163        let result = run_tool
164            .execute(RunMigrationsInput {
165                db_path: Some(db.key()),
166            })
167            .await
168            .unwrap();
169
170        let json = unwrap_json(result);
171
172        assert_eq!(json["status"], "success");
173        assert_eq!(json["migrations_applied"], 1);
174
175        // Verify table was created
176        let rows =
177            db.query("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'");
178        assert_eq!(rows[0][0], 1);
179    }
180
181    #[tokio::test]
182    async fn test_run_migrations_idempotent() {
183        let db = TestDatabase::new().await;
184
185        // Add a migration
186        let add_tool = AddMigrationTool;
187        add_tool
188            .execute(AddMigrationInput {
189                name: "create users table".to_string(),
190                sql: "CREATE TABLE users (id INTEGER PRIMARY KEY);".to_string(),
191                db_path: Some(db.key()),
192            })
193            .await
194            .unwrap();
195
196        // Run migrations twice
197        let run_tool = RunMigrationsTool;
198
199        let result1 = run_tool
200            .execute(RunMigrationsInput {
201                db_path: Some(db.key()),
202            })
203            .await
204            .unwrap();
205        let json1 = unwrap_json(result1);
206        assert_eq!(json1["migrations_applied"], 1);
207
208        // Second run should apply 0 migrations
209        let result2 = run_tool
210            .execute(RunMigrationsInput {
211                db_path: Some(db.key()),
212            })
213            .await
214            .unwrap();
215        let json2 = unwrap_json(result2);
216        assert_eq!(json2["migrations_applied"], 0);
217    }
218
219    #[tokio::test]
220    async fn test_run_multiple_migrations_in_order() {
221        let db = TestDatabase::new().await;
222        let add_tool = AddMigrationTool;
223
224        // Add first migration
225        add_tool
226            .execute(AddMigrationInput {
227                name: "create users table".to_string(),
228                sql: "CREATE TABLE users (id INTEGER PRIMARY KEY);".to_string(),
229                db_path: Some(db.key()),
230            })
231            .await
232            .unwrap();
233
234        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
235
236        // Add second migration (depends on first)
237        add_tool
238            .execute(AddMigrationInput {
239                name: "add email to users".to_string(),
240                sql: "ALTER TABLE users ADD COLUMN email TEXT;".to_string(),
241                db_path: Some(db.key()),
242            })
243            .await
244            .unwrap();
245
246        // Run all migrations
247        let run_tool = RunMigrationsTool;
248        let result = run_tool
249            .execute(RunMigrationsInput {
250                db_path: Some(db.key()),
251            })
252            .await
253            .unwrap();
254
255        let json = unwrap_json(result);
256
257        assert_eq!(json["status"], "success");
258        assert_eq!(json["migrations_applied"], 2);
259
260        // Verify both changes applied - check email column exists
261        let rows = db.query("SELECT COUNT(*) FROM pragma_table_info('users') WHERE name='email'");
262        assert_eq!(rows[0][0], 1);
263    }
264}