1use std::path::PathBuf;
4use std::time::Instant;
5
6use crate::diff::{SchemaDiff, SchemaDiffer};
7use crate::error::{MigrateResult, MigrationError};
8use crate::file::{MigrationFile, MigrationFileManager};
9use crate::history::{MigrationHistoryRepository, MigrationRecord};
10use crate::resolution::{Resolution, ResolutionConfig};
11use crate::sql::{MigrationSql, PostgresSqlGenerator};
12
13#[derive(Debug, Clone)]
15pub struct MigrationConfig {
16 pub migrations_dir: PathBuf,
18 pub resolutions_file: PathBuf,
20 pub dry_run: bool,
22 pub allow_data_loss: bool,
24 pub fail_on_checksum_mismatch: bool,
26 pub auto_baseline: bool,
28}
29
30impl Default for MigrationConfig {
31 fn default() -> Self {
32 Self {
33 migrations_dir: PathBuf::from("./migrations"),
34 resolutions_file: PathBuf::from("./migrations/resolutions.toml"),
35 dry_run: false,
36 allow_data_loss: false,
37 fail_on_checksum_mismatch: true,
38 auto_baseline: false,
39 }
40 }
41}
42
43impl MigrationConfig {
44 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn migrations_dir(mut self, dir: impl Into<PathBuf>) -> Self {
51 self.migrations_dir = dir.into();
52 self
53 }
54
55 pub fn resolutions_file(mut self, path: impl Into<PathBuf>) -> Self {
57 self.resolutions_file = path.into();
58 self
59 }
60
61 pub fn dry_run(mut self, dry_run: bool) -> Self {
63 self.dry_run = dry_run;
64 self
65 }
66
67 pub fn allow_data_loss(mut self, allow: bool) -> Self {
69 self.allow_data_loss = allow;
70 self
71 }
72
73 pub fn fail_on_checksum_mismatch(mut self, fail: bool) -> Self {
75 self.fail_on_checksum_mismatch = fail;
76 self
77 }
78
79 pub fn auto_baseline(mut self, auto: bool) -> Self {
81 self.auto_baseline = auto;
82 self
83 }
84}
85
86#[derive(Debug)]
88pub struct MigrationResult {
89 pub applied_count: usize,
91 pub duration_ms: i64,
93 pub applied_migrations: Vec<String>,
95 pub baselined_migrations: Vec<String>,
97 pub skipped_migrations: Vec<String>,
99 pub warnings: Vec<String>,
101}
102
103impl MigrationResult {
104 pub fn total_processed(&self) -> usize {
106 self.applied_count + self.baselined_migrations.len()
107 }
108
109 pub fn has_changes(&self) -> bool {
111 self.applied_count > 0 || !self.baselined_migrations.is_empty()
112 }
113
114 pub fn summary(&self) -> String {
116 let mut parts = Vec::new();
117
118 if self.applied_count > 0 {
119 parts.push(format!("{} applied", self.applied_count));
120 }
121
122 if !self.baselined_migrations.is_empty() {
123 parts.push(format!("{} baselined", self.baselined_migrations.len()));
124 }
125
126 if !self.skipped_migrations.is_empty() {
127 parts.push(format!("{} skipped", self.skipped_migrations.len()));
128 }
129
130 if parts.is_empty() {
131 "No migrations applied".to_string()
132 } else {
133 format!("{} in {}ms", parts.join(", "), self.duration_ms)
134 }
135 }
136}
137
138#[derive(Debug)]
140pub struct MigrationPlan {
141 pub pending: Vec<MigrationFile>,
143 pub skipped: Vec<String>,
145 pub baselines: Vec<String>,
147 pub resolved_checksums: Vec<ChecksumResolution>,
149 pub unresolved_checksums: Vec<ChecksumMismatch>,
151 pub diff: Option<SchemaDiff>,
153 pub sql: Option<MigrationSql>,
155 pub warnings: Vec<String>,
157}
158
159#[derive(Debug, Clone)]
161pub struct ChecksumResolution {
162 pub migration_id: String,
164 pub expected: String,
166 pub actual: String,
168 pub reason: String,
170}
171
172#[derive(Debug, Clone)]
174pub struct ChecksumMismatch {
175 pub migration_id: String,
177 pub expected: String,
179 pub actual: String,
181}
182
183impl MigrationPlan {
184 pub fn empty() -> Self {
186 Self {
187 pending: Vec::new(),
188 skipped: Vec::new(),
189 baselines: Vec::new(),
190 resolved_checksums: Vec::new(),
191 unresolved_checksums: Vec::new(),
192 diff: None,
193 sql: None,
194 warnings: Vec::new(),
195 }
196 }
197
198 pub fn is_empty(&self) -> bool {
200 self.pending.is_empty()
201 && self.baselines.is_empty()
202 && self.diff.as_ref().is_none_or(|d| d.is_empty())
203 }
204
205 pub fn has_blocking_issues(&self) -> bool {
207 !self.unresolved_checksums.is_empty()
208 }
209
210 pub fn summary(&self) -> String {
212 let mut parts = Vec::new();
213
214 if !self.pending.is_empty() {
215 parts.push(format!("{} pending migrations", self.pending.len()));
216 }
217
218 if !self.skipped.is_empty() {
219 parts.push(format!("{} skipped", self.skipped.len()));
220 }
221
222 if !self.baselines.is_empty() {
223 parts.push(format!("{} baselines", self.baselines.len()));
224 }
225
226 if !self.resolved_checksums.is_empty() {
227 parts.push(format!(
228 "{} resolved checksums",
229 self.resolved_checksums.len()
230 ));
231 }
232
233 if !self.unresolved_checksums.is_empty() {
234 parts.push(format!(
235 "{} UNRESOLVED checksums",
236 self.unresolved_checksums.len()
237 ));
238 }
239
240 if let Some(diff) = &self.diff {
241 parts.push(diff.summary());
242 }
243
244 if parts.is_empty() {
245 "No changes to apply".to_string()
246 } else {
247 parts.join("; ")
248 }
249 }
250}
251
252pub struct MigrationEngine<H: MigrationHistoryRepository> {
254 config: MigrationConfig,
255 history: H,
256 file_manager: MigrationFileManager,
257 sql_generator: PostgresSqlGenerator,
258 resolutions: ResolutionConfig,
259}
260
261impl<H: MigrationHistoryRepository> MigrationEngine<H> {
262 pub fn new(config: MigrationConfig, history: H) -> Self {
264 let file_manager = MigrationFileManager::new(&config.migrations_dir);
265 Self {
266 config,
267 history,
268 file_manager,
269 sql_generator: PostgresSqlGenerator,
270 resolutions: ResolutionConfig::new(),
271 }
272 }
273
274 pub fn with_resolutions(
276 config: MigrationConfig,
277 history: H,
278 resolutions: ResolutionConfig,
279 ) -> Self {
280 let file_manager = MigrationFileManager::new(&config.migrations_dir);
281 Self {
282 config,
283 history,
284 file_manager,
285 sql_generator: PostgresSqlGenerator,
286 resolutions,
287 }
288 }
289
290 pub async fn load_resolutions(&mut self) -> MigrateResult<()> {
292 self.resolutions = ResolutionConfig::load(&self.config.resolutions_file).await?;
293 Ok(())
294 }
295
296 pub async fn save_resolutions(&self) -> MigrateResult<()> {
298 self.resolutions.save(&self.config.resolutions_file).await
299 }
300
301 pub async fn add_resolution(&mut self, resolution: Resolution) -> MigrateResult<()> {
303 self.resolutions.add(resolution);
304 self.save_resolutions().await
305 }
306
307 pub fn resolutions(&self) -> &ResolutionConfig {
309 &self.resolutions
310 }
311
312 pub fn resolutions_mut(&mut self) -> &mut ResolutionConfig {
314 &mut self.resolutions
315 }
316
317 pub async fn initialize(&mut self) -> MigrateResult<()> {
319 self.file_manager.ensure_dir().await?;
321
322 self.history.initialize().await?;
324
325 self.load_resolutions().await?;
327
328 Ok(())
329 }
330
331 pub async fn plan(&self, current_schema: &prax_schema::Schema) -> MigrateResult<MigrationPlan> {
333 let mut plan = MigrationPlan::empty();
334
335 let applied = self.history.get_applied().await?;
337 let applied_ids: std::collections::HashSet<_> =
338 applied.iter().map(|r| r.id.as_str()).collect();
339
340 let files = self.file_manager.list_migrations().await?;
342
343 for file in files {
345 if self.resolutions.should_skip(&file.id) {
347 plan.skipped.push(file.id.clone());
348 continue;
349 }
350
351 if self.resolutions.is_baseline(&file.id) && !applied_ids.contains(file.id.as_str()) {
353 plan.baselines.push(file.id.clone());
354 continue;
355 }
356
357 let effective_id = self
359 .resolutions
360 .get_renamed(&file.id)
361 .map(String::from)
362 .unwrap_or_else(|| file.id.clone());
363
364 if !applied_ids.contains(effective_id.as_str()) {
365 plan.pending.push(file);
366 } else if let Some(record) = applied.iter().find(|r| r.id == effective_id) {
367 if record.checksum != file.checksum {
369 if self
370 .resolutions
371 .accepts_checksum(&file.id, &record.checksum, &file.checksum)
372 {
373 if let Some(resolution) = self.resolutions.get(&file.id) {
375 plan.resolved_checksums.push(ChecksumResolution {
376 migration_id: file.id.clone(),
377 expected: record.checksum.clone(),
378 actual: file.checksum.clone(),
379 reason: resolution.reason.clone(),
380 });
381 }
382 } else {
383 plan.unresolved_checksums.push(ChecksumMismatch {
385 migration_id: file.id.clone(),
386 expected: record.checksum.clone(),
387 actual: file.checksum.clone(),
388 });
389
390 if self.config.fail_on_checksum_mismatch {
391 plan.warnings.push(format!(
392 "Migration '{}' has been modified since it was applied. \
393 Add a resolution to accept this change: \
394 prax migrate resolve checksum {} {} {}",
395 file.id, file.id, record.checksum, file.checksum
396 ));
397 }
398 }
399 }
400 }
401 }
402
403 let differ = SchemaDiffer::new(current_schema.clone());
405 let diff = differ.diff()?;
406
407 if !diff.is_empty() {
408 if !self.config.allow_data_loss {
410 if !diff.drop_models.is_empty() {
411 plan.warnings.push(format!(
412 "Would drop {} tables: {}. Set allow_data_loss=true to proceed.",
413 diff.drop_models.len(),
414 diff.drop_models.join(", ")
415 ));
416 }
417
418 for alter in &diff.alter_models {
419 if !alter.drop_fields.is_empty() {
420 plan.warnings.push(format!(
421 "Would drop columns in '{}': {}. Set allow_data_loss=true to proceed.",
422 alter.name,
423 alter.drop_fields.join(", ")
424 ));
425 }
426 }
427 }
428
429 let sql = self.sql_generator.generate(&diff);
430 plan.diff = Some(diff);
431 plan.sql = Some(sql);
432 }
433
434 Ok(plan)
435 }
436
437 pub async fn migrate(&self) -> MigrateResult<MigrationResult> {
439 let mut result = MigrationResult {
440 applied_count: 0,
441 duration_ms: 0,
442 applied_migrations: Vec::new(),
443 baselined_migrations: Vec::new(),
444 skipped_migrations: Vec::new(),
445 warnings: Vec::new(),
446 };
447
448 let start = Instant::now();
449
450 let _lock = self.history.acquire_lock().await?;
452
453 let applied = self.history.get_applied().await?;
455 let applied_ids: std::collections::HashSet<_> =
456 applied.iter().map(|r| r.id.as_str()).collect();
457
458 let files = self.file_manager.list_migrations().await?;
459
460 for file in files {
461 if self.resolutions.should_skip(&file.id) {
463 result.skipped_migrations.push(file.id.clone());
464 continue;
465 }
466
467 let effective_id = self
469 .resolutions
470 .get_renamed(&file.id)
471 .map(String::from)
472 .unwrap_or_else(|| file.id.clone());
473
474 if applied_ids.contains(effective_id.as_str()) {
475 if let Some(record) = applied.iter().find(|r| r.id == effective_id) {
477 if record.checksum != file.checksum
478 && !self.resolutions.accepts_checksum(
479 &file.id,
480 &record.checksum,
481 &file.checksum,
482 )
483 && self.config.fail_on_checksum_mismatch
484 {
485 return Err(MigrationError::ChecksumMismatch {
486 id: file.id.clone(),
487 expected: record.checksum.clone(),
488 actual: file.checksum.clone(),
489 });
490 }
491 }
492 continue;
493 }
494
495 if self.resolutions.is_baseline(&file.id) {
497 if self.config.dry_run {
498 result
499 .warnings
500 .push(format!("[DRY RUN] Would baseline: {}", file.id));
501 } else {
502 self.history
504 .record_applied(&file.id, &file.checksum, 0)
505 .await?;
506 result.baselined_migrations.push(file.id.clone());
507 }
508 continue;
509 }
510
511 if self.config.dry_run {
512 result.applied_migrations.push(file.id.clone());
513 result
514 .warnings
515 .push(format!("[DRY RUN] Would apply: {}", file.id));
516 continue;
517 }
518
519 let migration_start = Instant::now();
521 self.apply_migration(&file).await?;
522 let duration_ms = migration_start.elapsed().as_millis() as i64;
523
524 self.history
526 .record_applied(&file.id, &file.checksum, duration_ms)
527 .await?;
528
529 result.applied_migrations.push(file.id);
530 result.applied_count += 1;
531 }
532
533 result.duration_ms = start.elapsed().as_millis() as i64;
534 Ok(result)
535 }
536
537 async fn apply_migration(&self, _migration: &MigrationFile) -> MigrateResult<()> {
539 Ok(())
542 }
543
544 pub async fn rollback(&self) -> MigrateResult<Option<String>> {
546 if self.config.dry_run {
547 if let Some(last) = self.history.get_last_applied().await? {
548 return Ok(Some(format!("[DRY RUN] Would rollback: {}", last.id)));
549 }
550 return Ok(None);
551 }
552
553 let _lock = self.history.acquire_lock().await?;
554
555 let last = self.history.get_last_applied().await?;
556 if let Some(record) = last {
557 let files = self.file_manager.list_migrations().await?;
559 let migration = files.into_iter().find(|f| f.id == record.id);
560
561 if let Some(m) = migration {
562 if m.down_sql.is_empty() {
563 return Err(MigrationError::InvalidMigration(format!(
564 "Migration '{}' has no down migration",
565 m.id
566 )));
567 }
568
569 self.rollback_migration(&m).await?;
571
572 self.history.record_rollback(&m.id).await?;
574
575 return Ok(Some(m.id));
576 }
577 }
578
579 Ok(None)
580 }
581
582 async fn rollback_migration(&self, _migration: &MigrationFile) -> MigrateResult<()> {
584 Ok(())
586 }
587
588 pub async fn create_migration(
590 &self,
591 name: &str,
592 schema: &prax_schema::Schema,
593 ) -> MigrateResult<PathBuf> {
594 let differ = SchemaDiffer::new(schema.clone());
596 let diff = differ.diff()?;
597
598 if diff.is_empty() {
599 return Err(MigrationError::NoChanges);
600 }
601
602 let sql = self.sql_generator.generate(&diff);
604
605 let id = self.file_manager.generate_id();
607 let migration = MigrationFile::new(id, name, sql);
608
609 let path = self.file_manager.write_migration(&migration).await?;
611
612 Ok(path)
613 }
614
615 pub async fn status(&self) -> MigrateResult<MigrationStatus> {
617 let applied = self.history.get_applied().await?;
618 let files = self.file_manager.list_migrations().await?;
619
620 let applied_ids: std::collections::HashSet<_> =
621 applied.iter().map(|r| r.id.as_str()).collect();
622
623 let pending: Vec<_> = files
624 .iter()
625 .filter(|f| !applied_ids.contains(f.id.as_str()))
626 .map(|f| f.id.clone())
627 .collect();
628
629 let total_applied = applied.len();
630 let total_pending = pending.len();
631
632 Ok(MigrationStatus {
633 applied,
634 pending,
635 total_applied,
636 total_pending,
637 })
638 }
639}
640
641#[derive(Debug)]
643pub struct MigrationStatus {
644 pub applied: Vec<MigrationRecord>,
646 pub pending: Vec<String>,
648 pub total_applied: usize,
650 pub total_pending: usize,
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[test]
659 fn test_config_default() {
660 let config = MigrationConfig::default();
661 assert_eq!(config.migrations_dir, PathBuf::from("./migrations"));
662 assert!(!config.dry_run);
663 assert!(!config.allow_data_loss);
664 assert!(config.fail_on_checksum_mismatch);
665 }
666
667 #[test]
668 fn test_config_builder() {
669 let config = MigrationConfig::new()
670 .migrations_dir("./custom_migrations")
671 .resolutions_file("./custom/resolutions.toml")
672 .dry_run(true)
673 .allow_data_loss(true)
674 .fail_on_checksum_mismatch(false);
675
676 assert_eq!(config.migrations_dir, PathBuf::from("./custom_migrations"));
677 assert_eq!(
678 config.resolutions_file,
679 PathBuf::from("./custom/resolutions.toml")
680 );
681 assert!(config.dry_run);
682 assert!(config.allow_data_loss);
683 assert!(!config.fail_on_checksum_mismatch);
684 }
685
686 #[test]
687 fn test_migration_plan_empty() {
688 let plan = MigrationPlan::empty();
689
690 assert!(plan.is_empty());
691 assert!(!plan.has_blocking_issues());
692 assert_eq!(plan.summary(), "No changes to apply");
693 }
694
695 #[test]
696 fn test_migration_plan_with_pending() {
697 let mut plan = MigrationPlan::empty();
698 plan.pending.push(MigrationFile {
699 path: PathBuf::from("migrations/test"),
700 id: "test".to_string(),
701 name: "test".to_string(),
702 up_sql: "SELECT 1".to_string(),
703 down_sql: String::new(),
704 checksum: "abc".to_string(),
705 });
706
707 assert!(!plan.is_empty());
708 assert!(plan.summary().contains("1 pending"));
709 }
710
711 #[test]
712 fn test_migration_plan_with_unresolved_checksum() {
713 let mut plan = MigrationPlan::empty();
714 plan.unresolved_checksums.push(ChecksumMismatch {
715 migration_id: "test".to_string(),
716 expected: "abc".to_string(),
717 actual: "xyz".to_string(),
718 });
719
720 assert!(plan.has_blocking_issues());
721 assert!(plan.summary().contains("UNRESOLVED"));
722 }
723
724 #[test]
725 fn test_migration_result_summary() {
726 let result = MigrationResult {
727 applied_count: 3,
728 duration_ms: 150,
729 applied_migrations: vec!["m1".into(), "m2".into(), "m3".into()],
730 baselined_migrations: vec!["b1".into()],
731 skipped_migrations: vec!["s1".into(), "s2".into()],
732 warnings: Vec::new(),
733 };
734
735 assert_eq!(result.total_processed(), 4);
736 assert!(result.has_changes());
737 assert!(result.summary().contains("3 applied"));
738 assert!(result.summary().contains("1 baselined"));
739 assert!(result.summary().contains("2 skipped"));
740 }
741}