1use std::path::PathBuf;
4use std::time::Instant;
5
6use crate::diff::{SchemaDiff, SchemaDiffer};
7use crate::error::{MigrateResult, MigrationError};
8use crate::file::{MigrationFile, MigrationFileManager};
9use crate::history::{MigrationHistoryRepository, MigrationRecord};
10use crate::resolution::{Resolution, ResolutionConfig};
11use crate::sql::{MigrationSql, PostgresSqlGenerator};
12
13#[derive(Debug, Clone)]
15pub struct MigrationConfig {
16 pub migrations_dir: PathBuf,
18 pub resolutions_file: PathBuf,
20 pub dry_run: bool,
22 pub allow_data_loss: bool,
24 pub fail_on_checksum_mismatch: bool,
26 pub auto_baseline: bool,
28}
29
30impl Default for MigrationConfig {
31 fn default() -> Self {
32 Self {
33 migrations_dir: PathBuf::from("./migrations"),
34 resolutions_file: PathBuf::from("./migrations/resolutions.toml"),
35 dry_run: false,
36 allow_data_loss: false,
37 fail_on_checksum_mismatch: true,
38 auto_baseline: false,
39 }
40 }
41}
42
43impl MigrationConfig {
44 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn migrations_dir(mut self, dir: impl Into<PathBuf>) -> Self {
51 self.migrations_dir = dir.into();
52 self
53 }
54
55 pub fn resolutions_file(mut self, path: impl Into<PathBuf>) -> Self {
57 self.resolutions_file = path.into();
58 self
59 }
60
61 pub fn dry_run(mut self, dry_run: bool) -> Self {
63 self.dry_run = dry_run;
64 self
65 }
66
67 pub fn allow_data_loss(mut self, allow: bool) -> Self {
69 self.allow_data_loss = allow;
70 self
71 }
72
73 pub fn fail_on_checksum_mismatch(mut self, fail: bool) -> Self {
75 self.fail_on_checksum_mismatch = fail;
76 self
77 }
78
79 pub fn auto_baseline(mut self, auto: bool) -> Self {
81 self.auto_baseline = auto;
82 self
83 }
84}
85
86#[derive(Debug)]
88pub struct MigrationResult {
89 pub applied_count: usize,
91 pub duration_ms: i64,
93 pub applied_migrations: Vec<String>,
95 pub baselined_migrations: Vec<String>,
97 pub skipped_migrations: Vec<String>,
99 pub warnings: Vec<String>,
101}
102
103impl MigrationResult {
104 pub fn total_processed(&self) -> usize {
106 self.applied_count + self.baselined_migrations.len()
107 }
108
109 pub fn has_changes(&self) -> bool {
111 self.applied_count > 0 || !self.baselined_migrations.is_empty()
112 }
113
114 pub fn summary(&self) -> String {
116 let mut parts = Vec::new();
117
118 if self.applied_count > 0 {
119 parts.push(format!("{} applied", self.applied_count));
120 }
121
122 if !self.baselined_migrations.is_empty() {
123 parts.push(format!("{} baselined", self.baselined_migrations.len()));
124 }
125
126 if !self.skipped_migrations.is_empty() {
127 parts.push(format!("{} skipped", self.skipped_migrations.len()));
128 }
129
130 if parts.is_empty() {
131 "No migrations applied".to_string()
132 } else {
133 format!("{} in {}ms", parts.join(", "), self.duration_ms)
134 }
135 }
136}
137
138#[derive(Debug)]
140pub struct MigrationPlan {
141 pub pending: Vec<MigrationFile>,
143 pub skipped: Vec<String>,
145 pub baselines: Vec<String>,
147 pub resolved_checksums: Vec<ChecksumResolution>,
149 pub unresolved_checksums: Vec<ChecksumMismatch>,
151 pub diff: Option<SchemaDiff>,
153 pub sql: Option<MigrationSql>,
155 pub warnings: Vec<String>,
157}
158
159#[derive(Debug, Clone)]
161pub struct ChecksumResolution {
162 pub migration_id: String,
164 pub expected: String,
166 pub actual: String,
168 pub reason: String,
170}
171
172#[derive(Debug, Clone)]
174pub struct ChecksumMismatch {
175 pub migration_id: String,
177 pub expected: String,
179 pub actual: String,
181}
182
183impl MigrationPlan {
184 pub fn empty() -> Self {
186 Self {
187 pending: Vec::new(),
188 skipped: Vec::new(),
189 baselines: Vec::new(),
190 resolved_checksums: Vec::new(),
191 unresolved_checksums: Vec::new(),
192 diff: None,
193 sql: None,
194 warnings: Vec::new(),
195 }
196 }
197
198 pub fn is_empty(&self) -> bool {
200 self.pending.is_empty()
201 && self.baselines.is_empty()
202 && self.diff.as_ref().is_none_or(|d| d.is_empty())
203 }
204
205 pub fn has_blocking_issues(&self) -> bool {
207 !self.unresolved_checksums.is_empty()
208 }
209
210 pub fn summary(&self) -> String {
212 let mut parts = Vec::new();
213
214 if !self.pending.is_empty() {
215 parts.push(format!("{} pending migrations", self.pending.len()));
216 }
217
218 if !self.skipped.is_empty() {
219 parts.push(format!("{} skipped", self.skipped.len()));
220 }
221
222 if !self.baselines.is_empty() {
223 parts.push(format!("{} baselines", self.baselines.len()));
224 }
225
226 if !self.resolved_checksums.is_empty() {
227 parts.push(format!(
228 "{} resolved checksums",
229 self.resolved_checksums.len()
230 ));
231 }
232
233 if !self.unresolved_checksums.is_empty() {
234 parts.push(format!(
235 "{} UNRESOLVED checksums",
236 self.unresolved_checksums.len()
237 ));
238 }
239
240 if let Some(diff) = &self.diff {
241 parts.push(diff.summary());
242 }
243
244 if parts.is_empty() {
245 "No changes to apply".to_string()
246 } else {
247 parts.join("; ")
248 }
249 }
250}
251
252pub struct MigrationEngine<H: MigrationHistoryRepository> {
254 config: MigrationConfig,
255 history: H,
256 file_manager: MigrationFileManager,
257 sql_generator: PostgresSqlGenerator,
258 resolutions: ResolutionConfig,
259}
260
261impl<H: MigrationHistoryRepository> MigrationEngine<H> {
262 pub fn new(config: MigrationConfig, history: H) -> Self {
264 let file_manager = MigrationFileManager::new(&config.migrations_dir);
265 Self {
266 config,
267 history,
268 file_manager,
269 sql_generator: PostgresSqlGenerator,
270 resolutions: ResolutionConfig::new(),
271 }
272 }
273
274 pub fn with_resolutions(
276 config: MigrationConfig,
277 history: H,
278 resolutions: ResolutionConfig,
279 ) -> Self {
280 let file_manager = MigrationFileManager::new(&config.migrations_dir);
281 Self {
282 config,
283 history,
284 file_manager,
285 sql_generator: PostgresSqlGenerator,
286 resolutions,
287 }
288 }
289
290 pub async fn load_resolutions(&mut self) -> MigrateResult<()> {
292 self.resolutions = ResolutionConfig::load(&self.config.resolutions_file).await?;
293 Ok(())
294 }
295
296 pub async fn save_resolutions(&self) -> MigrateResult<()> {
298 self.resolutions.save(&self.config.resolutions_file).await
299 }
300
301 pub async fn add_resolution(&mut self, resolution: Resolution) -> MigrateResult<()> {
303 self.resolutions.add(resolution);
304 self.save_resolutions().await
305 }
306
307 pub fn resolutions(&self) -> &ResolutionConfig {
309 &self.resolutions
310 }
311
312 pub fn resolutions_mut(&mut self) -> &mut ResolutionConfig {
314 &mut self.resolutions
315 }
316
317 pub async fn initialize(&mut self) -> MigrateResult<()> {
319 self.file_manager.ensure_dir().await?;
321
322 self.history.initialize().await?;
324
325 self.load_resolutions().await?;
327
328 Ok(())
329 }
330
331 pub async fn plan(&self, current_schema: &prax_schema::Schema) -> MigrateResult<MigrationPlan> {
333 let mut plan = MigrationPlan::empty();
334
335 let applied = self.history.get_applied().await?;
337 let applied_ids: std::collections::HashSet<_> =
338 applied.iter().map(|r| r.id.as_str()).collect();
339
340 let files = self.file_manager.list_migrations().await?;
342
343 for file in files {
345 if self.resolutions.should_skip(&file.id) {
347 plan.skipped.push(file.id.clone());
348 continue;
349 }
350
351 if self.resolutions.is_baseline(&file.id) && !applied_ids.contains(file.id.as_str()) {
353 plan.baselines.push(file.id.clone());
354 continue;
355 }
356
357 let effective_id = self
359 .resolutions
360 .get_renamed(&file.id)
361 .map(String::from)
362 .unwrap_or_else(|| file.id.clone());
363
364 if !applied_ids.contains(effective_id.as_str()) {
365 plan.pending.push(file);
366 } else if let Some(record) = applied.iter().find(|r| r.id == effective_id) {
367 if record.checksum != file.checksum {
369 if self
370 .resolutions
371 .accepts_checksum(&file.id, &record.checksum, &file.checksum)
372 {
373 if let Some(resolution) = self.resolutions.get(&file.id) {
375 plan.resolved_checksums.push(ChecksumResolution {
376 migration_id: file.id.clone(),
377 expected: record.checksum.clone(),
378 actual: file.checksum.clone(),
379 reason: resolution.reason.clone(),
380 });
381 }
382 } else {
383 plan.unresolved_checksums.push(ChecksumMismatch {
385 migration_id: file.id.clone(),
386 expected: record.checksum.clone(),
387 actual: file.checksum.clone(),
388 });
389
390 if self.config.fail_on_checksum_mismatch {
391 plan.warnings.push(format!(
392 "Migration '{}' has been modified since it was applied. \
393 Add a resolution to accept this change: \
394 prax migrate resolve checksum {} {} {}",
395 file.id, file.id, record.checksum, file.checksum
396 ));
397 }
398 }
399 }
400 }
401 }
402
403 let differ = SchemaDiffer::new(current_schema.clone());
405 let diff = differ.diff()?;
406
407 if !diff.is_empty() {
408 if !self.config.allow_data_loss {
410 if !diff.drop_models.is_empty() {
411 plan.warnings.push(format!(
412 "Would drop {} tables: {}. Set allow_data_loss=true to proceed.",
413 diff.drop_models.len(),
414 diff.drop_models.join(", ")
415 ));
416 }
417
418 for alter in &diff.alter_models {
419 if !alter.drop_fields.is_empty() {
420 plan.warnings.push(format!(
421 "Would drop columns in '{}': {}. Set allow_data_loss=true to proceed.",
422 alter.name,
423 alter.drop_fields.join(", ")
424 ));
425 }
426 }
427 }
428
429 let sql = self.sql_generator.generate(&diff);
430 plan.diff = Some(diff);
431 plan.sql = Some(sql);
432 }
433
434 Ok(plan)
435 }
436
437 pub async fn migrate(&self) -> MigrateResult<MigrationResult> {
439 let mut result = MigrationResult {
440 applied_count: 0,
441 duration_ms: 0,
442 applied_migrations: Vec::new(),
443 baselined_migrations: Vec::new(),
444 skipped_migrations: Vec::new(),
445 warnings: Vec::new(),
446 };
447
448 let start = Instant::now();
449
450 let _lock = self.history.acquire_lock().await?;
452
453 let applied = self.history.get_applied().await?;
455 let applied_ids: std::collections::HashSet<_> =
456 applied.iter().map(|r| r.id.as_str()).collect();
457
458 let files = self.file_manager.list_migrations().await?;
459
460 for file in files {
461 if self.resolutions.should_skip(&file.id) {
463 result.skipped_migrations.push(file.id.clone());
464 continue;
465 }
466
467 let effective_id = self
469 .resolutions
470 .get_renamed(&file.id)
471 .map(String::from)
472 .unwrap_or_else(|| file.id.clone());
473
474 if applied_ids.contains(effective_id.as_str()) {
475 if let Some(record) = applied.iter().find(|r| r.id == effective_id)
477 && record.checksum != file.checksum
478 && !self.resolutions.accepts_checksum(
479 &file.id,
480 &record.checksum,
481 &file.checksum,
482 )
483 && self.config.fail_on_checksum_mismatch
484 {
485 return Err(MigrationError::ChecksumMismatch {
486 id: file.id.clone(),
487 expected: record.checksum.clone(),
488 actual: file.checksum.clone(),
489 });
490 }
491 continue;
492 }
493
494 if self.resolutions.is_baseline(&file.id) {
496 if self.config.dry_run {
497 result
498 .warnings
499 .push(format!("[DRY RUN] Would baseline: {}", file.id));
500 } else {
501 self.history
503 .record_applied(&file.id, &file.checksum, 0)
504 .await?;
505 result.baselined_migrations.push(file.id.clone());
506 }
507 continue;
508 }
509
510 if self.config.dry_run {
511 result.applied_migrations.push(file.id.clone());
512 result
513 .warnings
514 .push(format!("[DRY RUN] Would apply: {}", file.id));
515 continue;
516 }
517
518 let migration_start = Instant::now();
520 self.apply_migration(&file).await?;
521 let duration_ms = migration_start.elapsed().as_millis() as i64;
522
523 self.history
525 .record_applied(&file.id, &file.checksum, duration_ms)
526 .await?;
527
528 result.applied_migrations.push(file.id);
529 result.applied_count += 1;
530 }
531
532 result.duration_ms = start.elapsed().as_millis() as i64;
533 Ok(result)
534 }
535
536 async fn apply_migration(&self, _migration: &MigrationFile) -> MigrateResult<()> {
538 Ok(())
541 }
542
543 pub async fn rollback(&self) -> MigrateResult<Option<String>> {
545 if self.config.dry_run {
546 if let Some(last) = self.history.get_last_applied().await? {
547 return Ok(Some(format!("[DRY RUN] Would rollback: {}", last.id)));
548 }
549 return Ok(None);
550 }
551
552 let _lock = self.history.acquire_lock().await?;
553
554 let last = self.history.get_last_applied().await?;
555 if let Some(record) = last {
556 let files = self.file_manager.list_migrations().await?;
558 let migration = files.into_iter().find(|f| f.id == record.id);
559
560 if let Some(m) = migration {
561 if m.down_sql.is_empty() {
562 return Err(MigrationError::InvalidMigration(format!(
563 "Migration '{}' has no down migration",
564 m.id
565 )));
566 }
567
568 self.rollback_migration(&m).await?;
570
571 self.history.record_rollback(&m.id).await?;
573
574 return Ok(Some(m.id));
575 }
576 }
577
578 Ok(None)
579 }
580
581 async fn rollback_migration(&self, _migration: &MigrationFile) -> MigrateResult<()> {
583 Ok(())
585 }
586
587 pub async fn create_migration(
589 &self,
590 name: &str,
591 schema: &prax_schema::Schema,
592 ) -> MigrateResult<PathBuf> {
593 let differ = SchemaDiffer::new(schema.clone());
595 let diff = differ.diff()?;
596
597 if diff.is_empty() {
598 return Err(MigrationError::NoChanges);
599 }
600
601 let sql = self.sql_generator.generate(&diff);
603
604 let id = self.file_manager.generate_id();
606 let migration = MigrationFile::new(id, name, sql);
607
608 let path = self.file_manager.write_migration(&migration).await?;
610
611 Ok(path)
612 }
613
614 pub async fn status(&self) -> MigrateResult<MigrationStatus> {
616 let applied = self.history.get_applied().await?;
617 let files = self.file_manager.list_migrations().await?;
618
619 let applied_ids: std::collections::HashSet<_> =
620 applied.iter().map(|r| r.id.as_str()).collect();
621
622 let pending: Vec<_> = files
623 .iter()
624 .filter(|f| !applied_ids.contains(f.id.as_str()))
625 .map(|f| f.id.clone())
626 .collect();
627
628 let total_applied = applied.len();
629 let total_pending = pending.len();
630
631 Ok(MigrationStatus {
632 applied,
633 pending,
634 total_applied,
635 total_pending,
636 })
637 }
638}
639
640#[derive(Debug)]
642pub struct MigrationStatus {
643 pub applied: Vec<MigrationRecord>,
645 pub pending: Vec<String>,
647 pub total_applied: usize,
649 pub total_pending: usize,
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn test_config_default() {
659 let config = MigrationConfig::default();
660 assert_eq!(config.migrations_dir, PathBuf::from("./migrations"));
661 assert!(!config.dry_run);
662 assert!(!config.allow_data_loss);
663 assert!(config.fail_on_checksum_mismatch);
664 }
665
666 #[test]
667 fn test_config_builder() {
668 let config = MigrationConfig::new()
669 .migrations_dir("./custom_migrations")
670 .resolutions_file("./custom/resolutions.toml")
671 .dry_run(true)
672 .allow_data_loss(true)
673 .fail_on_checksum_mismatch(false);
674
675 assert_eq!(config.migrations_dir, PathBuf::from("./custom_migrations"));
676 assert_eq!(
677 config.resolutions_file,
678 PathBuf::from("./custom/resolutions.toml")
679 );
680 assert!(config.dry_run);
681 assert!(config.allow_data_loss);
682 assert!(!config.fail_on_checksum_mismatch);
683 }
684
685 #[test]
686 fn test_migration_plan_empty() {
687 let plan = MigrationPlan::empty();
688
689 assert!(plan.is_empty());
690 assert!(!plan.has_blocking_issues());
691 assert_eq!(plan.summary(), "No changes to apply");
692 }
693
694 #[test]
695 fn test_migration_plan_with_pending() {
696 let mut plan = MigrationPlan::empty();
697 plan.pending.push(MigrationFile {
698 path: PathBuf::from("migrations/test"),
699 id: "test".to_string(),
700 name: "test".to_string(),
701 up_sql: "SELECT 1".to_string(),
702 down_sql: String::new(),
703 checksum: "abc".to_string(),
704 });
705
706 assert!(!plan.is_empty());
707 assert!(plan.summary().contains("1 pending"));
708 }
709
710 #[test]
711 fn test_migration_plan_with_unresolved_checksum() {
712 let mut plan = MigrationPlan::empty();
713 plan.unresolved_checksums.push(ChecksumMismatch {
714 migration_id: "test".to_string(),
715 expected: "abc".to_string(),
716 actual: "xyz".to_string(),
717 });
718
719 assert!(plan.has_blocking_issues());
720 assert!(plan.summary().contains("UNRESOLVED"));
721 }
722
723 #[test]
724 fn test_migration_result_summary() {
725 let result = MigrationResult {
726 applied_count: 3,
727 duration_ms: 150,
728 applied_migrations: vec!["m1".into(), "m2".into(), "m3".into()],
729 baselined_migrations: vec!["b1".into()],
730 skipped_migrations: vec!["s1".into(), "s2".into()],
731 warnings: Vec::new(),
732 };
733
734 assert_eq!(result.total_processed(), 4);
735 assert!(result.has_changes());
736 assert!(result.summary().contains("3 applied"));
737 assert!(result.summary().contains("1 baselined"));
738 assert!(result.summary().contains("2 skipped"));
739 }
740}