elif_orm/migrations/
runner.rs

1//! Migration Runner - Executes migrations against the database
2//!
3//! Handles the actual execution of migrations, tracking applied migrations,
4//! and managing migration batches.
5
6use sqlx::{PgPool, Row};
7use std::collections::HashSet;
8
9use super::definitions::{Migration, MigrationRecord, MigrationRunResult};
10use super::manager::MigrationManager;
11use crate::error::{OrmError, OrmResult};
12
13/// Migration runner that executes migrations against a database
14pub struct MigrationRunner {
15    manager: MigrationManager,
16    pool: PgPool,
17}
18
19impl MigrationRunner {
20    /// Create a new migration runner
21    pub fn new(manager: MigrationManager, pool: PgPool) -> Self {
22        Self { manager, pool }
23    }
24
25    /// Create a new migration runner from database URL
26    pub async fn from_url(manager: MigrationManager, database_url: &str) -> OrmResult<Self> {
27        let pool = PgPool::connect(database_url)
28            .await
29            .map_err(|e| OrmError::Migration(format!("Failed to connect to database: {}", e)))?;
30
31        Ok(Self::new(manager, pool))
32    }
33
34    /// Get the database pool
35    pub fn pool(&self) -> &PgPool {
36        &self.pool
37    }
38
39    /// Get the migration manager
40    pub fn manager(&self) -> &MigrationManager {
41        &self.manager
42    }
43
44    /// Run all pending migrations
45    pub async fn run_migrations(&self) -> OrmResult<MigrationRunResult> {
46        let start_time = std::time::Instant::now();
47
48        // Ensure migrations table exists
49        self.ensure_migrations_table().await?;
50
51        // Load all migrations from files
52        let all_migrations = self.manager.load_migrations().await?;
53
54        // Get applied migrations from database
55        let applied_migrations = self.get_applied_migrations().await?;
56        let applied_ids: HashSet<String> = applied_migrations.into_iter().map(|m| m.id).collect();
57
58        // Filter pending migrations
59        let pending_migrations: Vec<_> = all_migrations
60            .into_iter()
61            .filter(|m| !applied_ids.contains(&m.id))
62            .collect();
63
64        if pending_migrations.is_empty() {
65            return Ok(MigrationRunResult {
66                applied_count: 0,
67                applied_migrations: Vec::new(),
68                skipped_count: applied_ids.len(),
69                execution_time_ms: start_time.elapsed().as_millis(),
70            });
71        }
72
73        // Get next batch number
74        let next_batch = self.get_next_batch_number().await?;
75
76        // Apply pending migrations
77        let mut applied_migration_ids = Vec::new();
78
79        for migration in &pending_migrations {
80            println!("Applying migration: {} - {}", migration.id, migration.name);
81
82            // Begin transaction for this migration
83            let mut transaction =
84                self.pool.begin().await.map_err(|e| {
85                    OrmError::Migration(format!("Failed to start transaction: {}", e))
86                })?;
87
88            // Execute migration SQL
89            if !migration.up_sql.trim().is_empty() {
90                for statement in self.manager.split_sql_statements(&migration.up_sql)? {
91                    if !statement.trim().is_empty() {
92                        sqlx::query(&statement)
93                            .execute(&mut *transaction)
94                            .await
95                            .map_err(|e| {
96                                OrmError::Migration(format!(
97                                    "Failed to execute migration {}: {}",
98                                    migration.id, e
99                                ))
100                            })?;
101                    }
102                }
103            }
104
105            // Record migration as applied
106            let (record_sql, params) = self.record_migration_sql(&migration.id, next_batch);
107            let mut query = sqlx::query(&record_sql);
108            for param in params {
109                query = query.bind(param);
110            }
111            query
112                .execute(&mut *transaction)
113                .await
114                .map_err(|e| OrmError::Migration(format!("Failed to record migration: {}", e)))?;
115
116            // Commit transaction
117            transaction
118                .commit()
119                .await
120                .map_err(|e| OrmError::Migration(format!("Failed to commit migration: {}", e)))?;
121
122            applied_migration_ids.push(migration.id.clone());
123        }
124
125        Ok(MigrationRunResult {
126            applied_count: applied_migration_ids.len(),
127            applied_migrations: applied_migration_ids,
128            skipped_count: applied_ids.len(),
129            execution_time_ms: start_time.elapsed().as_millis(),
130        })
131    }
132
133    /// Run a specific migration by ID
134    pub async fn run_migration(&self, migration_id: &str) -> OrmResult<()> {
135        // Load the specific migration
136        let migrations = self.manager.load_migrations().await?;
137        let migration = migrations
138            .iter()
139            .find(|m| m.id == migration_id)
140            .ok_or_else(|| OrmError::Migration(format!("Migration {} not found", migration_id)))?;
141
142        // Check if already applied
143        if self.is_migration_applied(migration_id).await? {
144            return Err(OrmError::Migration(format!(
145                "Migration {} is already applied",
146                migration_id
147            )));
148        }
149
150        // Get next batch number
151        let next_batch = self.get_next_batch_number().await?;
152
153        // Apply the migration
154        self.apply_migration(migration, next_batch).await?;
155
156        Ok(())
157    }
158
159    /// Apply a single migration
160    async fn apply_migration(&self, migration: &Migration, batch: i32) -> OrmResult<()> {
161        let mut transaction = self
162            .pool
163            .begin()
164            .await
165            .map_err(|e| OrmError::Migration(format!("Failed to start transaction: {}", e)))?;
166
167        // Execute migration SQL
168        if !migration.up_sql.trim().is_empty() {
169            for statement in self.manager.split_sql_statements(&migration.up_sql)? {
170                if !statement.trim().is_empty() {
171                    sqlx::query(&statement)
172                        .execute(&mut *transaction)
173                        .await
174                        .map_err(|e| {
175                            OrmError::Migration(format!(
176                                "Failed to execute migration {}: {}",
177                                migration.id, e
178                            ))
179                        })?;
180                }
181            }
182        }
183
184        // Record migration as applied
185        let (record_sql, params) = self.record_migration_sql(&migration.id, batch);
186        let mut query = sqlx::query(&record_sql);
187        for param in params {
188            query = query.bind(param);
189        }
190        query
191            .execute(&mut *transaction)
192            .await
193            .map_err(|e| OrmError::Migration(format!("Failed to record migration: {}", e)))?;
194
195        // Commit transaction
196        transaction
197            .commit()
198            .await
199            .map_err(|e| OrmError::Migration(format!("Failed to commit migration: {}", e)))?;
200
201        Ok(())
202    }
203
204    /// Ensure migrations table exists
205    async fn ensure_migrations_table(&self) -> OrmResult<()> {
206        let sql = self.create_migrations_table_sql();
207        sqlx::query(&sql).execute(&self.pool).await.map_err(|e| {
208            OrmError::Migration(format!("Failed to create migrations table: {}", e))
209        })?;
210        Ok(())
211    }
212
213    /// Get applied migrations from database
214    async fn get_applied_migrations(&self) -> OrmResult<Vec<MigrationRecord>> {
215        let sql = self.get_applied_migrations_sql();
216        let rows = sqlx::query(&sql).fetch_all(&self.pool).await.map_err(|e| {
217            OrmError::Migration(format!("Failed to query applied migrations: {}", e))
218        })?;
219
220        let mut records = Vec::new();
221        for row in rows {
222            let id: String = row
223                .try_get("id")
224                .map_err(|e| OrmError::Migration(format!("Failed to get migration id: {}", e)))?;
225            let applied_at: chrono::DateTime<chrono::Utc> = row
226                .try_get("applied_at")
227                .map_err(|e| OrmError::Migration(format!("Failed to get applied_at: {}", e)))?;
228            let batch: i32 = row
229                .try_get("batch")
230                .map_err(|e| OrmError::Migration(format!("Failed to get batch: {}", e)))?;
231
232            records.push(MigrationRecord {
233                id,
234                applied_at,
235                batch,
236            });
237        }
238
239        Ok(records)
240    }
241
242    /// Check if a specific migration has been applied
243    async fn is_migration_applied(&self, migration_id: &str) -> OrmResult<bool> {
244        let (sql, params) = self.check_migration_sql(migration_id);
245        let mut query = sqlx::query(&sql);
246        for param in params {
247            query = query.bind(param);
248        }
249
250        let result = query
251            .fetch_optional(&self.pool)
252            .await
253            .map_err(|e| OrmError::Migration(format!("Failed to check migration status: {}", e)))?;
254
255        Ok(result.is_some())
256    }
257
258    /// Get the next batch number
259    async fn get_next_batch_number(&self) -> OrmResult<i32> {
260        let sql = self.get_latest_batch_sql();
261        let row = sqlx::query(&sql)
262            .fetch_one(&self.pool)
263            .await
264            .map_err(|e| OrmError::Migration(format!("Failed to get latest batch: {}", e)))?;
265
266        let latest_batch: i32 = row.try_get(0).unwrap_or(0);
267        Ok(latest_batch + 1)
268    }
269
270    /// SQL to create the migrations tracking table
271    fn create_migrations_table_sql(&self) -> String {
272        format!(
273            "CREATE TABLE IF NOT EXISTS {} (\n    \
274                id VARCHAR(255) PRIMARY KEY,\n    \
275                applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n    \
276                batch INTEGER NOT NULL\n\
277            );",
278            self.manager.config().migrations_table
279        )
280    }
281
282    /// SQL to check if a migration has been applied
283    fn check_migration_sql(&self, migration_id: &str) -> (String, Vec<String>) {
284        (
285            format!(
286                "SELECT id FROM {} WHERE id = $1",
287                self.manager.config().migrations_table
288            ),
289            vec![migration_id.to_string()],
290        )
291    }
292
293    /// SQL to record a migration as applied
294    fn record_migration_sql(&self, migration_id: &str, batch: i32) -> (String, Vec<String>) {
295        (
296            format!(
297                "INSERT INTO {} (id, applied_at, batch) VALUES ($1, $2::timestamp, $3::integer)",
298                self.manager.config().migrations_table
299            ),
300            vec![
301                migration_id.to_string(),
302                chrono::Utc::now().to_rfc3339(),
303                batch.to_string(),
304            ],
305        )
306    }
307
308    /// SQL to get the latest batch number
309    fn get_latest_batch_sql(&self) -> String {
310        format!(
311            "SELECT COALESCE(MAX(batch), 0) FROM {}",
312            self.manager.config().migrations_table
313        )
314    }
315
316    /// SQL to get applied migrations
317    fn get_applied_migrations_sql(&self) -> String {
318        format!(
319            "SELECT id, applied_at, batch FROM {} ORDER BY batch DESC, applied_at DESC",
320            self.manager.config().migrations_table
321        )
322    }
323
324    /// Get migration status for all migrations (applied and pending)
325    pub async fn get_migration_status(&self) -> OrmResult<Vec<(Migration, bool)>> {
326        // Load all migrations from files
327        let all_migrations = self.manager.load_migrations().await?;
328
329        // Get applied migrations from database
330        let applied_migrations = self.get_applied_migrations().await?;
331        let applied_ids: HashSet<String> = applied_migrations.into_iter().map(|m| m.id).collect();
332
333        // Map migrations to their status
334        let mut status_list = Vec::new();
335        for migration in all_migrations {
336            let is_applied = applied_ids.contains(&migration.id);
337            status_list.push((migration, is_applied));
338        }
339
340        Ok(status_list)
341    }
342}