Skip to main content

prax_migrate/
engine.rs

1//! Migration engine implementation.
2
3use std::path::PathBuf;
4use std::time::Instant;
5
6use crate::diff::{SchemaDiff, SchemaDiffer};
7use crate::error::{MigrateResult, MigrationError};
8use crate::file::{MigrationFile, MigrationFileManager};
9use crate::history::{MigrationHistoryRepository, MigrationRecord};
10use crate::resolution::{Resolution, ResolutionConfig};
11use crate::sql::{MigrationSql, PostgresSqlGenerator};
12
13/// Configuration for the migration engine.
14#[derive(Debug, Clone)]
15pub struct MigrationConfig {
16    /// Path to the migrations directory.
17    pub migrations_dir: PathBuf,
18    /// Path to the resolutions file.
19    pub resolutions_file: PathBuf,
20    /// Whether to run in dry-run mode.
21    pub dry_run: bool,
22    /// Whether to allow data loss (dropping tables/columns).
23    pub allow_data_loss: bool,
24    /// Whether to fail on unresolved checksum mismatches.
25    pub fail_on_checksum_mismatch: bool,
26    /// Whether to apply baseline migrations automatically.
27    pub auto_baseline: bool,
28}
29
30impl Default for MigrationConfig {
31    fn default() -> Self {
32        Self {
33            migrations_dir: PathBuf::from("./migrations"),
34            resolutions_file: PathBuf::from("./migrations/resolutions.toml"),
35            dry_run: false,
36            allow_data_loss: false,
37            fail_on_checksum_mismatch: true,
38            auto_baseline: false,
39        }
40    }
41}
42
43impl MigrationConfig {
44    /// Create a new configuration.
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    /// Set the migrations directory.
50    pub fn migrations_dir(mut self, dir: impl Into<PathBuf>) -> Self {
51        self.migrations_dir = dir.into();
52        self
53    }
54
55    /// Set the resolutions file path.
56    pub fn resolutions_file(mut self, path: impl Into<PathBuf>) -> Self {
57        self.resolutions_file = path.into();
58        self
59    }
60
61    /// Enable dry-run mode.
62    pub fn dry_run(mut self, dry_run: bool) -> Self {
63        self.dry_run = dry_run;
64        self
65    }
66
67    /// Allow data loss operations.
68    pub fn allow_data_loss(mut self, allow: bool) -> Self {
69        self.allow_data_loss = allow;
70        self
71    }
72
73    /// Set whether to fail on checksum mismatches.
74    pub fn fail_on_checksum_mismatch(mut self, fail: bool) -> Self {
75        self.fail_on_checksum_mismatch = fail;
76        self
77    }
78
79    /// Enable automatic baseline application.
80    pub fn auto_baseline(mut self, auto: bool) -> Self {
81        self.auto_baseline = auto;
82        self
83    }
84}
85
86/// Result of a migration operation.
87#[derive(Debug)]
88pub struct MigrationResult {
89    /// Number of migrations applied.
90    pub applied_count: usize,
91    /// Total duration in milliseconds.
92    pub duration_ms: i64,
93    /// IDs of applied migrations.
94    pub applied_migrations: Vec<String>,
95    /// IDs of baselined migrations (marked as applied without running).
96    pub baselined_migrations: Vec<String>,
97    /// IDs of skipped migrations.
98    pub skipped_migrations: Vec<String>,
99    /// Warnings generated during migration.
100    pub warnings: Vec<String>,
101}
102
103impl MigrationResult {
104    /// Get total migrations processed (applied + baselined).
105    pub fn total_processed(&self) -> usize {
106        self.applied_count + self.baselined_migrations.len()
107    }
108
109    /// Check if any migrations were applied or baselined.
110    pub fn has_changes(&self) -> bool {
111        self.applied_count > 0 || !self.baselined_migrations.is_empty()
112    }
113
114    /// Get a summary of the result.
115    pub fn summary(&self) -> String {
116        let mut parts = Vec::new();
117
118        if self.applied_count > 0 {
119            parts.push(format!("{} applied", self.applied_count));
120        }
121
122        if !self.baselined_migrations.is_empty() {
123            parts.push(format!("{} baselined", self.baselined_migrations.len()));
124        }
125
126        if !self.skipped_migrations.is_empty() {
127            parts.push(format!("{} skipped", self.skipped_migrations.len()));
128        }
129
130        if parts.is_empty() {
131            "No migrations applied".to_string()
132        } else {
133            format!("{} in {}ms", parts.join(", "), self.duration_ms)
134        }
135    }
136}
137
138/// Result of a migration plan.
139#[derive(Debug)]
140pub struct MigrationPlan {
141    /// Pending migrations to apply.
142    pub pending: Vec<MigrationFile>,
143    /// Migrations that will be skipped (via resolutions).
144    pub skipped: Vec<String>,
145    /// Migrations that will be baselined (marked as applied without running).
146    pub baselines: Vec<String>,
147    /// Checksum mismatches that are resolved.
148    pub resolved_checksums: Vec<ChecksumResolution>,
149    /// Checksum mismatches that are NOT resolved.
150    pub unresolved_checksums: Vec<ChecksumMismatch>,
151    /// Schema diff for new migrations.
152    pub diff: Option<SchemaDiff>,
153    /// Generated SQL.
154    pub sql: Option<MigrationSql>,
155    /// Warnings.
156    pub warnings: Vec<String>,
157}
158
159/// Information about a resolved checksum mismatch.
160#[derive(Debug, Clone)]
161pub struct ChecksumResolution {
162    /// Migration ID.
163    pub migration_id: String,
164    /// Expected checksum (from history).
165    pub expected: String,
166    /// Actual checksum (from file).
167    pub actual: String,
168    /// Reason for accepting the change.
169    pub reason: String,
170}
171
172/// Information about an unresolved checksum mismatch.
173#[derive(Debug, Clone)]
174pub struct ChecksumMismatch {
175    /// Migration ID.
176    pub migration_id: String,
177    /// Expected checksum (from history).
178    pub expected: String,
179    /// Actual checksum (from file).
180    pub actual: String,
181}
182
183impl MigrationPlan {
184    /// Create an empty migration plan.
185    pub fn empty() -> Self {
186        Self {
187            pending: Vec::new(),
188            skipped: Vec::new(),
189            baselines: Vec::new(),
190            resolved_checksums: Vec::new(),
191            unresolved_checksums: Vec::new(),
192            diff: None,
193            sql: None,
194            warnings: Vec::new(),
195        }
196    }
197
198    /// Check if there's anything to migrate.
199    pub fn is_empty(&self) -> bool {
200        self.pending.is_empty()
201            && self.baselines.is_empty()
202            && self.diff.as_ref().is_none_or(|d| d.is_empty())
203    }
204
205    /// Check if there are blocking issues.
206    pub fn has_blocking_issues(&self) -> bool {
207        !self.unresolved_checksums.is_empty()
208    }
209
210    /// Get a summary of the plan.
211    pub fn summary(&self) -> String {
212        let mut parts = Vec::new();
213
214        if !self.pending.is_empty() {
215            parts.push(format!("{} pending migrations", self.pending.len()));
216        }
217
218        if !self.skipped.is_empty() {
219            parts.push(format!("{} skipped", self.skipped.len()));
220        }
221
222        if !self.baselines.is_empty() {
223            parts.push(format!("{} baselines", self.baselines.len()));
224        }
225
226        if !self.resolved_checksums.is_empty() {
227            parts.push(format!(
228                "{} resolved checksums",
229                self.resolved_checksums.len()
230            ));
231        }
232
233        if !self.unresolved_checksums.is_empty() {
234            parts.push(format!(
235                "{} UNRESOLVED checksums",
236                self.unresolved_checksums.len()
237            ));
238        }
239
240        if let Some(diff) = &self.diff {
241            parts.push(diff.summary());
242        }
243
244        if parts.is_empty() {
245            "No changes to apply".to_string()
246        } else {
247            parts.join("; ")
248        }
249    }
250}
251
252/// The main migration engine.
253pub struct MigrationEngine<H: MigrationHistoryRepository> {
254    config: MigrationConfig,
255    history: H,
256    file_manager: MigrationFileManager,
257    sql_generator: PostgresSqlGenerator,
258    resolutions: ResolutionConfig,
259}
260
261impl<H: MigrationHistoryRepository> MigrationEngine<H> {
262    /// Create a new migration engine.
263    pub fn new(config: MigrationConfig, history: H) -> Self {
264        let file_manager = MigrationFileManager::new(&config.migrations_dir);
265        Self {
266            config,
267            history,
268            file_manager,
269            sql_generator: PostgresSqlGenerator,
270            resolutions: ResolutionConfig::new(),
271        }
272    }
273
274    /// Create a new migration engine with resolutions.
275    pub fn with_resolutions(
276        config: MigrationConfig,
277        history: H,
278        resolutions: ResolutionConfig,
279    ) -> Self {
280        let file_manager = MigrationFileManager::new(&config.migrations_dir);
281        Self {
282            config,
283            history,
284            file_manager,
285            sql_generator: PostgresSqlGenerator,
286            resolutions,
287        }
288    }
289
290    /// Load resolutions from the configured file.
291    pub async fn load_resolutions(&mut self) -> MigrateResult<()> {
292        self.resolutions = ResolutionConfig::load(&self.config.resolutions_file).await?;
293        Ok(())
294    }
295
296    /// Save resolutions to the configured file.
297    pub async fn save_resolutions(&self) -> MigrateResult<()> {
298        self.resolutions.save(&self.config.resolutions_file).await
299    }
300
301    /// Add a resolution and save.
302    pub async fn add_resolution(&mut self, resolution: Resolution) -> MigrateResult<()> {
303        self.resolutions.add(resolution);
304        self.save_resolutions().await
305    }
306
307    /// Get the current resolutions.
308    pub fn resolutions(&self) -> &ResolutionConfig {
309        &self.resolutions
310    }
311
312    /// Get mutable resolutions.
313    pub fn resolutions_mut(&mut self) -> &mut ResolutionConfig {
314        &mut self.resolutions
315    }
316
317    /// Initialize the migration system.
318    pub async fn initialize(&mut self) -> MigrateResult<()> {
319        // Create migrations directory
320        self.file_manager.ensure_dir().await?;
321
322        // Initialize history table
323        self.history.initialize().await?;
324
325        // Load resolutions
326        self.load_resolutions().await?;
327
328        Ok(())
329    }
330
331    /// Plan migrations based on current schema vs database.
332    pub async fn plan(&self, current_schema: &prax_schema::Schema) -> MigrateResult<MigrationPlan> {
333        let mut plan = MigrationPlan::empty();
334
335        // Get applied migrations
336        let applied = self.history.get_applied().await?;
337        let applied_ids: std::collections::HashSet<_> =
338            applied.iter().map(|r| r.id.as_str()).collect();
339
340        // Get file migrations
341        let files = self.file_manager.list_migrations().await?;
342
343        // Find pending migrations
344        for file in files {
345            // Check if this migration should be skipped
346            if self.resolutions.should_skip(&file.id) {
347                plan.skipped.push(file.id.clone());
348                continue;
349            }
350
351            // Check if this is a baseline migration
352            if self.resolutions.is_baseline(&file.id) && !applied_ids.contains(file.id.as_str()) {
353                plan.baselines.push(file.id.clone());
354                continue;
355            }
356
357            // Check for renamed migrations
358            let effective_id = self
359                .resolutions
360                .get_renamed(&file.id)
361                .map(String::from)
362                .unwrap_or_else(|| file.id.clone());
363
364            if !applied_ids.contains(effective_id.as_str()) {
365                plan.pending.push(file);
366            } else if let Some(record) = applied.iter().find(|r| r.id == effective_id) {
367                // Check for checksum mismatch
368                if record.checksum != file.checksum {
369                    if self
370                        .resolutions
371                        .accepts_checksum(&file.id, &record.checksum, &file.checksum)
372                    {
373                        // Checksum change is resolved
374                        if let Some(resolution) = self.resolutions.get(&file.id) {
375                            plan.resolved_checksums.push(ChecksumResolution {
376                                migration_id: file.id.clone(),
377                                expected: record.checksum.clone(),
378                                actual: file.checksum.clone(),
379                                reason: resolution.reason.clone(),
380                            });
381                        }
382                    } else {
383                        // Unresolved checksum mismatch
384                        plan.unresolved_checksums.push(ChecksumMismatch {
385                            migration_id: file.id.clone(),
386                            expected: record.checksum.clone(),
387                            actual: file.checksum.clone(),
388                        });
389
390                        if self.config.fail_on_checksum_mismatch {
391                            plan.warnings.push(format!(
392                                "Migration '{}' has been modified since it was applied. \
393                                 Add a resolution to accept this change: \
394                                 prax migrate resolve checksum {} {} {}",
395                                file.id, file.id, record.checksum, file.checksum
396                            ));
397                        }
398                    }
399                }
400            }
401        }
402
403        // Generate diff for schema changes
404        let differ = SchemaDiffer::new(current_schema.clone());
405        let diff = differ.diff()?;
406
407        if !diff.is_empty() {
408            // Check for data loss
409            if !self.config.allow_data_loss {
410                if !diff.drop_models.is_empty() {
411                    plan.warnings.push(format!(
412                        "Would drop {} tables: {}. Set allow_data_loss=true to proceed.",
413                        diff.drop_models.len(),
414                        diff.drop_models.join(", ")
415                    ));
416                }
417
418                for alter in &diff.alter_models {
419                    if !alter.drop_fields.is_empty() {
420                        plan.warnings.push(format!(
421                            "Would drop columns in '{}': {}. Set allow_data_loss=true to proceed.",
422                            alter.name,
423                            alter.drop_fields.join(", ")
424                        ));
425                    }
426                }
427            }
428
429            let sql = self.sql_generator.generate(&diff);
430            plan.diff = Some(diff);
431            plan.sql = Some(sql);
432        }
433
434        Ok(plan)
435    }
436
437    /// Apply pending migrations.
438    pub async fn migrate(&self) -> MigrateResult<MigrationResult> {
439        let mut result = MigrationResult {
440            applied_count: 0,
441            duration_ms: 0,
442            applied_migrations: Vec::new(),
443            baselined_migrations: Vec::new(),
444            skipped_migrations: Vec::new(),
445            warnings: Vec::new(),
446        };
447
448        let start = Instant::now();
449
450        // Acquire lock
451        let _lock = self.history.acquire_lock().await?;
452
453        // Get pending migrations
454        let applied = self.history.get_applied().await?;
455        let applied_ids: std::collections::HashSet<_> =
456            applied.iter().map(|r| r.id.as_str()).collect();
457
458        let files = self.file_manager.list_migrations().await?;
459
460        for file in files {
461            // Check if this migration should be skipped
462            if self.resolutions.should_skip(&file.id) {
463                result.skipped_migrations.push(file.id.clone());
464                continue;
465            }
466
467            // Check for renamed migrations
468            let effective_id = self
469                .resolutions
470                .get_renamed(&file.id)
471                .map(String::from)
472                .unwrap_or_else(|| file.id.clone());
473
474            if applied_ids.contains(effective_id.as_str()) {
475                // Check for unresolved checksum mismatch
476                if let Some(record) = applied.iter().find(|r| r.id == effective_id)
477                    && record.checksum != file.checksum
478                    && !self.resolutions.accepts_checksum(
479                        &file.id,
480                        &record.checksum,
481                        &file.checksum,
482                    )
483                    && self.config.fail_on_checksum_mismatch
484                {
485                    return Err(MigrationError::ChecksumMismatch {
486                        id: file.id.clone(),
487                        expected: record.checksum.clone(),
488                        actual: file.checksum.clone(),
489                    });
490                }
491                continue;
492            }
493
494            // Check if this is a baseline migration
495            if self.resolutions.is_baseline(&file.id) {
496                if self.config.dry_run {
497                    result
498                        .warnings
499                        .push(format!("[DRY RUN] Would baseline: {}", file.id));
500                } else {
501                    // Record as applied without running
502                    self.history
503                        .record_applied(&file.id, &file.checksum, 0)
504                        .await?;
505                    result.baselined_migrations.push(file.id.clone());
506                }
507                continue;
508            }
509
510            if self.config.dry_run {
511                result.applied_migrations.push(file.id.clone());
512                result
513                    .warnings
514                    .push(format!("[DRY RUN] Would apply: {}", file.id));
515                continue;
516            }
517
518            // Apply migration
519            let migration_start = Instant::now();
520            self.apply_migration(&file).await?;
521            let duration_ms = migration_start.elapsed().as_millis() as i64;
522
523            // Record in history
524            self.history
525                .record_applied(&file.id, &file.checksum, duration_ms)
526                .await?;
527
528            result.applied_migrations.push(file.id);
529            result.applied_count += 1;
530        }
531
532        result.duration_ms = start.elapsed().as_millis() as i64;
533        Ok(result)
534    }
535
536    /// Apply a single migration.
537    async fn apply_migration(&self, _migration: &MigrationFile) -> MigrateResult<()> {
538        // This would execute the SQL through the query engine
539        // For now, we just validate the structure
540        Ok(())
541    }
542
543    /// Rollback the last migration.
544    pub async fn rollback(&self) -> MigrateResult<Option<String>> {
545        if self.config.dry_run {
546            if let Some(last) = self.history.get_last_applied().await? {
547                return Ok(Some(format!("[DRY RUN] Would rollback: {}", last.id)));
548            }
549            return Ok(None);
550        }
551
552        let _lock = self.history.acquire_lock().await?;
553
554        let last = self.history.get_last_applied().await?;
555        if let Some(record) = last {
556            // Find the migration file
557            let files = self.file_manager.list_migrations().await?;
558            let migration = files.into_iter().find(|f| f.id == record.id);
559
560            if let Some(m) = migration {
561                if m.down_sql.is_empty() {
562                    return Err(MigrationError::InvalidMigration(format!(
563                        "Migration '{}' has no down migration",
564                        m.id
565                    )));
566                }
567
568                // Execute down migration
569                self.rollback_migration(&m).await?;
570
571                // Update history
572                self.history.record_rollback(&m.id).await?;
573
574                return Ok(Some(m.id));
575            }
576        }
577
578        Ok(None)
579    }
580
581    /// Rollback a single migration.
582    async fn rollback_migration(&self, _migration: &MigrationFile) -> MigrateResult<()> {
583        // This would execute the down SQL through the query engine
584        Ok(())
585    }
586
587    /// Create a new migration file from schema changes.
588    pub async fn create_migration(
589        &self,
590        name: &str,
591        schema: &prax_schema::Schema,
592    ) -> MigrateResult<PathBuf> {
593        // Generate diff
594        let differ = SchemaDiffer::new(schema.clone());
595        let diff = differ.diff()?;
596
597        if diff.is_empty() {
598            return Err(MigrationError::NoChanges);
599        }
600
601        // Generate SQL
602        let sql = self.sql_generator.generate(&diff);
603
604        // Create migration file
605        let id = self.file_manager.generate_id();
606        let migration = MigrationFile::new(id, name, sql);
607
608        // Write to disk
609        let path = self.file_manager.write_migration(&migration).await?;
610
611        Ok(path)
612    }
613
614    /// Get migration status.
615    pub async fn status(&self) -> MigrateResult<MigrationStatus> {
616        let applied = self.history.get_applied().await?;
617        let files = self.file_manager.list_migrations().await?;
618
619        let applied_ids: std::collections::HashSet<_> =
620            applied.iter().map(|r| r.id.as_str()).collect();
621
622        let pending: Vec<_> = files
623            .iter()
624            .filter(|f| !applied_ids.contains(f.id.as_str()))
625            .map(|f| f.id.clone())
626            .collect();
627
628        let total_applied = applied.len();
629        let total_pending = pending.len();
630
631        Ok(MigrationStatus {
632            applied,
633            pending,
634            total_applied,
635            total_pending,
636        })
637    }
638}
639
640/// Migration status information.
641#[derive(Debug)]
642pub struct MigrationStatus {
643    /// Applied migrations.
644    pub applied: Vec<MigrationRecord>,
645    /// Pending migration IDs.
646    pub pending: Vec<String>,
647    /// Total number of applied migrations.
648    pub total_applied: usize,
649    /// Total number of pending migrations.
650    pub total_pending: usize,
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656
657    #[test]
658    fn test_config_default() {
659        let config = MigrationConfig::default();
660        assert_eq!(config.migrations_dir, PathBuf::from("./migrations"));
661        assert!(!config.dry_run);
662        assert!(!config.allow_data_loss);
663        assert!(config.fail_on_checksum_mismatch);
664    }
665
666    #[test]
667    fn test_config_builder() {
668        let config = MigrationConfig::new()
669            .migrations_dir("./custom_migrations")
670            .resolutions_file("./custom/resolutions.toml")
671            .dry_run(true)
672            .allow_data_loss(true)
673            .fail_on_checksum_mismatch(false);
674
675        assert_eq!(config.migrations_dir, PathBuf::from("./custom_migrations"));
676        assert_eq!(
677            config.resolutions_file,
678            PathBuf::from("./custom/resolutions.toml")
679        );
680        assert!(config.dry_run);
681        assert!(config.allow_data_loss);
682        assert!(!config.fail_on_checksum_mismatch);
683    }
684
685    #[test]
686    fn test_migration_plan_empty() {
687        let plan = MigrationPlan::empty();
688
689        assert!(plan.is_empty());
690        assert!(!plan.has_blocking_issues());
691        assert_eq!(plan.summary(), "No changes to apply");
692    }
693
694    #[test]
695    fn test_migration_plan_with_pending() {
696        let mut plan = MigrationPlan::empty();
697        plan.pending.push(MigrationFile {
698            path: PathBuf::from("migrations/test"),
699            id: "test".to_string(),
700            name: "test".to_string(),
701            up_sql: "SELECT 1".to_string(),
702            down_sql: String::new(),
703            checksum: "abc".to_string(),
704        });
705
706        assert!(!plan.is_empty());
707        assert!(plan.summary().contains("1 pending"));
708    }
709
710    #[test]
711    fn test_migration_plan_with_unresolved_checksum() {
712        let mut plan = MigrationPlan::empty();
713        plan.unresolved_checksums.push(ChecksumMismatch {
714            migration_id: "test".to_string(),
715            expected: "abc".to_string(),
716            actual: "xyz".to_string(),
717        });
718
719        assert!(plan.has_blocking_issues());
720        assert!(plan.summary().contains("UNRESOLVED"));
721    }
722
723    #[test]
724    fn test_migration_result_summary() {
725        let result = MigrationResult {
726            applied_count: 3,
727            duration_ms: 150,
728            applied_migrations: vec!["m1".into(), "m2".into(), "m3".into()],
729            baselined_migrations: vec!["b1".into()],
730            skipped_migrations: vec!["s1".into(), "s2".into()],
731            warnings: Vec::new(),
732        };
733
734        assert_eq!(result.total_processed(), 4);
735        assert!(result.has_changes());
736        assert!(result.summary().contains("3 applied"));
737        assert!(result.summary().contains("1 baselined"));
738        assert!(result.summary().contains("2 skipped"));
739    }
740}