mixtape_tools/sqlite/migration/
run.rs1use crate::prelude::*;
4use crate::sqlite::error::SqliteToolError;
5use crate::sqlite::manager::DATABASE_MANAGER;
6use chrono::Utc;
7
8use super::{compute_checksum, ensure_migrations_table, MIGRATIONS_TABLE};
9
10#[derive(Debug, Deserialize, JsonSchema)]
12pub struct RunMigrationsInput {
13 #[serde(default)]
15 pub db_path: Option<String>,
16}
17
18pub struct RunMigrationsTool;
23
24impl Tool for RunMigrationsTool {
25 type Input = RunMigrationsInput;
26
27 fn name(&self) -> &str {
28 "sqlite_run_migrations"
29 }
30
31 fn description(&self) -> &str {
32 "Apply all pending schema migrations in version order. Each migration runs in a \
33 transaction. If a migration fails, it is rolled back and no further migrations run."
34 }
35
36 async fn execute(&self, input: Self::Input) -> Result<ToolResult, ToolError> {
37 let result = tokio::task::spawn_blocking(move || -> Result<_, SqliteToolError> {
38 let conn = DATABASE_MANAGER.get(input.db_path.as_deref())?;
39 let mut conn = conn.lock().unwrap();
40
41 ensure_migrations_table(&conn)?;
43
44 let mut stmt = conn.prepare(&format!(
46 "SELECT version, name, sql, checksum FROM {MIGRATIONS_TABLE} \
47 WHERE applied_at IS NULL ORDER BY version ASC"
48 ))?;
49
50 let pending: Vec<(String, String, String, String)> = stmt
51 .query_map([], |row| {
52 Ok((
53 row.get::<_, String>(0)?,
54 row.get::<_, String>(1)?,
55 row.get::<_, String>(2)?,
56 row.get::<_, String>(3)?,
57 ))
58 })?
59 .collect::<Result<Vec<_>, _>>()?;
60
61 drop(stmt);
62
63 if pending.is_empty() {
64 return Ok((0, Vec::new()));
65 }
66
67 let mut applied = Vec::new();
68
69 for (version, name, sql, stored_checksum) in pending {
70 let actual_checksum = compute_checksum(&sql);
72 if actual_checksum != stored_checksum {
73 return Err(SqliteToolError::MigrationChecksumMismatch {
74 version,
75 expected: stored_checksum,
76 actual: actual_checksum,
77 });
78 }
79
80 let tx = conn.transaction()?;
82
83 tx.execute_batch(&sql)?;
85
86 let applied_at = Utc::now().to_rfc3339();
88 tx.execute(
89 &format!("UPDATE {MIGRATIONS_TABLE} SET applied_at = ?1 WHERE version = ?2"),
90 rusqlite::params![applied_at, version],
91 )?;
92
93 tx.commit()?;
94
95 applied.push(serde_json::json!({
96 "version": version,
97 "name": name,
98 "applied_at": applied_at
99 }));
100 }
101
102 Ok((applied.len(), applied))
103 })
104 .await
105 .map_err(|e| ToolError::Custom(format!("Task join error: {e}")))?;
106
107 match result {
108 Ok((count, applied)) => Ok(ToolResult::Json(serde_json::json!({
109 "status": "success",
110 "migrations_applied": count,
111 "applied": applied,
112 "message": if count == 0 {
113 "No pending migrations to apply".to_string()
114 } else {
115 format!("{} migration(s) applied successfully", count)
116 }
117 }))),
118 Err(e) => Err(e.into()),
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::sqlite::migration::add::AddMigrationInput;
127 use crate::sqlite::migration::AddMigrationTool;
128 use crate::sqlite::test_utils::{unwrap_json, TestDatabase};
129
130 #[tokio::test]
131 async fn test_run_migrations_empty() {
132 let db = TestDatabase::new().await;
133
134 let tool = RunMigrationsTool;
135 let input = RunMigrationsInput {
136 db_path: Some(db.key()),
137 };
138
139 let result = tool.execute(input).await.unwrap();
140 let json = unwrap_json(result);
141
142 assert_eq!(json["status"], "success");
143 assert_eq!(json["migrations_applied"], 0);
144 }
145
146 #[tokio::test]
147 async fn test_run_single_migration() {
148 let db = TestDatabase::new().await;
149
150 let add_tool = AddMigrationTool;
152 add_tool
153 .execute(AddMigrationInput {
154 name: "create users table".to_string(),
155 sql: "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);".to_string(),
156 db_path: Some(db.key()),
157 })
158 .await
159 .unwrap();
160
161 let run_tool = RunMigrationsTool;
163 let result = run_tool
164 .execute(RunMigrationsInput {
165 db_path: Some(db.key()),
166 })
167 .await
168 .unwrap();
169
170 let json = unwrap_json(result);
171
172 assert_eq!(json["status"], "success");
173 assert_eq!(json["migrations_applied"], 1);
174
175 let rows =
177 db.query("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='users'");
178 assert_eq!(rows[0][0], 1);
179 }
180
181 #[tokio::test]
182 async fn test_run_migrations_idempotent() {
183 let db = TestDatabase::new().await;
184
185 let add_tool = AddMigrationTool;
187 add_tool
188 .execute(AddMigrationInput {
189 name: "create users table".to_string(),
190 sql: "CREATE TABLE users (id INTEGER PRIMARY KEY);".to_string(),
191 db_path: Some(db.key()),
192 })
193 .await
194 .unwrap();
195
196 let run_tool = RunMigrationsTool;
198
199 let result1 = run_tool
200 .execute(RunMigrationsInput {
201 db_path: Some(db.key()),
202 })
203 .await
204 .unwrap();
205 let json1 = unwrap_json(result1);
206 assert_eq!(json1["migrations_applied"], 1);
207
208 let result2 = run_tool
210 .execute(RunMigrationsInput {
211 db_path: Some(db.key()),
212 })
213 .await
214 .unwrap();
215 let json2 = unwrap_json(result2);
216 assert_eq!(json2["migrations_applied"], 0);
217 }
218
219 #[tokio::test]
220 async fn test_run_multiple_migrations_in_order() {
221 let db = TestDatabase::new().await;
222 let add_tool = AddMigrationTool;
223
224 add_tool
226 .execute(AddMigrationInput {
227 name: "create users table".to_string(),
228 sql: "CREATE TABLE users (id INTEGER PRIMARY KEY);".to_string(),
229 db_path: Some(db.key()),
230 })
231 .await
232 .unwrap();
233
234 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
235
236 add_tool
238 .execute(AddMigrationInput {
239 name: "add email to users".to_string(),
240 sql: "ALTER TABLE users ADD COLUMN email TEXT;".to_string(),
241 db_path: Some(db.key()),
242 })
243 .await
244 .unwrap();
245
246 let run_tool = RunMigrationsTool;
248 let result = run_tool
249 .execute(RunMigrationsInput {
250 db_path: Some(db.key()),
251 })
252 .await
253 .unwrap();
254
255 let json = unwrap_json(result);
256
257 assert_eq!(json["status"], "success");
258 assert_eq!(json["migrations_applied"], 2);
259
260 let rows = db.query("SELECT COUNT(*) FROM pragma_table_info('users') WHERE name='email'");
262 assert_eq!(rows[0][0], 1);
263 }
264}