lib_migrations_core/
runner.rs

1use crate::error::{Error, Result};
2use crate::phase::Phase;
3use crate::store::MigrationStore;
4use crate::Migration;
5
6/// Status of a migration
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct MigrationStatus {
9    pub version: u64,
10    pub name: String,
11    pub phase: Phase,
12    pub applied: bool,
13    pub applied_at: Option<u64>,
14}
15
16/// A planned migration for dry-run output
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct DryRunPlan {
19    pub version: u64,
20    pub name: String,
21    pub phase: Phase,
22    pub can_rollback: bool,
23}
24
25/// Result of a dry-run
26#[derive(Debug, Clone)]
27pub struct DryRunResult {
28    pub pending: Vec<DryRunPlan>,
29    pub total: usize,
30}
31
32impl DryRunResult {
33    pub fn is_empty(&self) -> bool {
34        self.pending.is_empty()
35    }
36}
37
38/// Runs migrations against a context using a store for tracking.
39///
40/// Generic parameters:
41/// - `Ctx`: The context passed to migrations (e.g., database connection)
42/// - `S`: The store implementation for tracking applied migrations
43pub struct MigrationRunner<'a, Ctx, S: MigrationStore> {
44    store: S,
45    migrations: Vec<Box<dyn Migration<Ctx> + 'a>>,
46}
47
48impl<'a, Ctx, S: MigrationStore> MigrationRunner<'a, Ctx, S> {
49    pub fn new(store: S) -> Self {
50        Self {
51            store,
52            migrations: Vec::new(),
53        }
54    }
55
56    /// Consume the runner and return the store
57    pub fn into_store(self) -> S {
58        self.store
59    }
60
61    /// Get a reference to the store
62    pub fn store(&self) -> &S {
63        &self.store
64    }
65
66    /// Get a mutable reference to the store
67    pub fn store_mut(&mut self) -> &mut S {
68        &mut self.store
69    }
70
71    /// Add a migration
72    pub fn add<M: Migration<Ctx> + 'a>(mut self, migration: M) -> Self {
73        self.migrations.push(Box::new(migration));
74        self
75    }
76
77    /// Add multiple migrations
78    pub fn add_all<I, M>(mut self, migrations: I) -> Self
79    where
80        I: IntoIterator<Item = M>,
81        M: Migration<Ctx> + 'a,
82    {
83        for m in migrations {
84            self.migrations.push(Box::new(m));
85        }
86        self
87    }
88
89    /// Initialize store and validate migrations
90    pub fn init(&mut self) -> Result<()> {
91        self.store.init()?;
92        self.validate()?;
93        Ok(())
94    }
95
96    /// Validate migration versions are sequential (1, 2, 3, ...)
97    fn validate(&self) -> Result<()> {
98        let mut versions: Vec<u64> = self.migrations.iter().map(|m| m.version()).collect();
99        versions.sort();
100        versions.dedup();
101
102        if versions.len() != self.migrations.len() {
103            return Err(Error::InvalidOrder("Duplicate migration versions".into()));
104        }
105
106        for (i, &version) in versions.iter().enumerate() {
107            let expected = (i + 1) as u64;
108            if version != expected {
109                return Err(Error::InvalidOrder(format!(
110                    "Expected version {}, found {}. Versions must be sequential.",
111                    expected, version
112                )));
113            }
114        }
115
116        Ok(())
117    }
118
119    /// Get current schema version
120    pub fn current_version(&self) -> Result<u64> {
121        self.store.current_version()
122    }
123
124    /// Get status of all migrations
125    pub fn status(&self) -> Result<Vec<MigrationStatus>> {
126        let applied = self.store.applied()?;
127        let applied_map: std::collections::HashMap<u64, u64> =
128            applied.into_iter().map(|r| (r.version, r.applied_at)).collect();
129
130        let mut statuses: Vec<MigrationStatus> = self
131            .migrations
132            .iter()
133            .map(|m| {
134                let version = m.version();
135                let applied_at = applied_map.get(&version).copied();
136                MigrationStatus {
137                    version,
138                    name: m.name().to_string(),
139                    phase: m.phase(),
140                    applied: applied_at.is_some(),
141                    applied_at,
142                }
143            })
144            .collect();
145
146        statuses.sort_by_key(|s| s.version);
147        Ok(statuses)
148    }
149
150    /// Get pending (unapplied) migrations
151    pub fn pending(&self) -> Result<Vec<&dyn Migration<Ctx>>> {
152        let current = self.store.current_version()?;
153        let mut pending: Vec<&dyn Migration<Ctx>> = self
154            .migrations
155            .iter()
156            .filter(|m| m.version() > current)
157            .map(|m| m.as_ref())
158            .collect();
159
160        pending.sort_by_key(|m| m.version());
161        Ok(pending)
162    }
163
164    /// Get pending migrations for a specific phase
165    pub fn pending_phase(&self, phase: Phase) -> Result<Vec<&dyn Migration<Ctx>>> {
166        let current = self.store.current_version()?;
167        let mut pending: Vec<&dyn Migration<Ctx>> = self
168            .migrations
169            .iter()
170            .filter(|m| m.version() > current && m.phase() == phase)
171            .map(|m| m.as_ref())
172            .collect();
173
174        pending.sort_by_key(|m| m.version());
175        Ok(pending)
176    }
177
178    /// Run all pending migrations
179    pub fn migrate(&mut self, ctx: &mut Ctx) -> Result<usize> {
180        let pending: Vec<u64> = self.pending()?.iter().map(|m| m.version()).collect();
181        let count = pending.len();
182
183        for version in pending {
184            self.apply_version(ctx, version)?;
185        }
186
187        Ok(count)
188    }
189
190    /// Run pending migrations for a specific phase only
191    pub fn migrate_phase(&mut self, ctx: &mut Ctx, phase: Phase) -> Result<usize> {
192        let pending: Vec<u64> = self
193            .pending_phase(phase)?
194            .iter()
195            .map(|m| m.version())
196            .collect();
197        let count = pending.len();
198
199        for version in pending {
200            self.apply_version(ctx, version)?;
201        }
202
203        Ok(count)
204    }
205
206    /// Dry-run: show what would be applied without actually applying
207    pub fn dry_run(&self) -> Result<DryRunResult> {
208        let pending = self.pending()?;
209
210        let plans: Vec<DryRunPlan> = pending
211            .iter()
212            .map(|m| DryRunPlan {
213                version: m.version(),
214                name: m.name().to_string(),
215                phase: m.phase(),
216                can_rollback: m.can_rollback(),
217            })
218            .collect();
219
220        Ok(DryRunResult {
221            total: plans.len(),
222            pending: plans,
223        })
224    }
225
226    /// Dry-run for a specific phase
227    pub fn dry_run_phase(&self, phase: Phase) -> Result<DryRunResult> {
228        let pending = self.pending_phase(phase)?;
229
230        let plans: Vec<DryRunPlan> = pending
231            .iter()
232            .map(|m| DryRunPlan {
233                version: m.version(),
234                name: m.name().to_string(),
235                phase: m.phase(),
236                can_rollback: m.can_rollback(),
237            })
238            .collect();
239
240        Ok(DryRunResult {
241            total: plans.len(),
242            pending: plans,
243        })
244    }
245
246    /// Migrate to a specific version (up or down)
247    pub fn migrate_to(&mut self, ctx: &mut Ctx, target: u64) -> Result<usize> {
248        let current = self.store.current_version()?;
249        let mut count = 0;
250
251        if target > current {
252            // Migrate up
253            let to_apply: Vec<u64> = self
254                .migrations
255                .iter()
256                .filter(|m| m.version() > current && m.version() <= target)
257                .map(|m| m.version())
258                .collect();
259
260            for version in to_apply {
261                self.apply_version(ctx, version)?;
262                count += 1;
263            }
264        } else if target < current {
265            // Migrate down
266            let mut to_rollback: Vec<u64> = self
267                .migrations
268                .iter()
269                .filter(|m| m.version() > target && m.version() <= current)
270                .map(|m| m.version())
271                .collect();
272
273            to_rollback.sort_by(|a, b| b.cmp(a)); // Descending
274
275            for version in to_rollback {
276                self.rollback_version(ctx, version)?;
277                count += 1;
278            }
279        }
280
281        Ok(count)
282    }
283
284    /// Apply a specific migration by version
285    fn apply_version(&mut self, ctx: &mut Ctx, version: u64) -> Result<()> {
286        let migration = self
287            .migrations
288            .iter()
289            .find(|m| m.version() == version)
290            .ok_or(Error::NotFound(version))?;
291
292        let name = migration.name().to_string();
293
294        migration.apply(ctx).map_err(|e| Error::MigrationFailed {
295            version,
296            message: e.to_string(),
297        })?;
298
299        self.store.mark_applied(version, &name)?;
300        Ok(())
301    }
302
303    /// Rollback a specific migration by version
304    fn rollback_version(&mut self, ctx: &mut Ctx, version: u64) -> Result<()> {
305        let migration = self
306            .migrations
307            .iter()
308            .find(|m| m.version() == version)
309            .ok_or(Error::NotFound(version))?;
310
311        if !migration.can_rollback() {
312            return Err(Error::RollbackNotSupported(version));
313        }
314
315        migration.rollback(ctx).map_err(|e| Error::MigrationFailed {
316            version,
317            message: e.to_string(),
318        })?;
319
320        self.store.mark_rolled_back(version)?;
321        Ok(())
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use crate::store::MemoryStore;
329    use crate::FnMigration;
330
331    #[test]
332    fn test_runner_migrate() {
333        let store = MemoryStore::new();
334        let mut runner = MigrationRunner::new(store)
335            .add(FnMigration::new(1, "first", |ctx: &mut Vec<i32>| {
336                ctx.push(1);
337                Ok(())
338            }))
339            .add(FnMigration::new(2, "second", |ctx: &mut Vec<i32>| {
340                ctx.push(2);
341                Ok(())
342            }));
343
344        runner.init().unwrap();
345
346        let mut ctx = vec![];
347        let count = runner.migrate(&mut ctx).unwrap();
348
349        assert_eq!(count, 2);
350        assert_eq!(ctx, vec![1, 2]);
351        assert_eq!(runner.current_version().unwrap(), 2);
352    }
353
354    #[test]
355    fn test_runner_migrate_phase() {
356        let store = MemoryStore::new();
357        let mut runner = MigrationRunner::new(store)
358            .add(FnMigration::new(1, "pre_1", |ctx: &mut Vec<&str>| {
359                ctx.push("pre_1");
360                Ok(())
361            }))
362            .add(
363                FnMigration::new(2, "post_1", |ctx: &mut Vec<&str>| {
364                    ctx.push("post_1");
365                    Ok(())
366                })
367                .phase(Phase::PostDeploy),
368            )
369            .add(FnMigration::new(3, "pre_2", |ctx: &mut Vec<&str>| {
370                ctx.push("pre_2");
371                Ok(())
372            }));
373
374        runner.init().unwrap();
375
376        let mut ctx = vec![];
377
378        // Run only pre-deploy
379        let count = runner.migrate_phase(&mut ctx, Phase::PreDeploy).unwrap();
380        assert_eq!(count, 2); // pre_1 and pre_2
381        assert_eq!(ctx, vec!["pre_1", "pre_2"]);
382        assert_eq!(runner.current_version().unwrap(), 3);
383
384        // post_1 was skipped, so pending post-deploy is empty
385        // (because version 2 is already past current_version check)
386        let pending_post = runner.pending_phase(Phase::PostDeploy).unwrap();
387        assert_eq!(pending_post.len(), 0);
388    }
389
390    #[test]
391    fn test_runner_dry_run() {
392        let store = MemoryStore::new();
393        let mut runner = MigrationRunner::new(store)
394            .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
395            .add(
396                FnMigration::new(2, "second", |_: &mut ()| Ok(()))
397                    .phase(Phase::PostDeploy)
398                    .with_rollback(|_: &mut ()| Ok(())),
399            );
400
401        runner.init().unwrap();
402
403        let result = runner.dry_run().unwrap();
404        assert_eq!(result.total, 2);
405        assert_eq!(result.pending[0].version, 1);
406        assert_eq!(result.pending[0].phase, Phase::PreDeploy);
407        assert!(!result.pending[0].can_rollback);
408        assert_eq!(result.pending[1].version, 2);
409        assert_eq!(result.pending[1].phase, Phase::PostDeploy);
410        assert!(result.pending[1].can_rollback);
411    }
412
413    #[test]
414    fn test_runner_dry_run_phase() {
415        let store = MemoryStore::new();
416        let mut runner = MigrationRunner::new(store)
417            .add(FnMigration::new(1, "pre_1", |_: &mut ()| Ok(())))
418            .add(
419                FnMigration::new(2, "post_1", |_: &mut ()| Ok(())).phase(Phase::PostDeploy),
420            )
421            .add(FnMigration::new(3, "pre_2", |_: &mut ()| Ok(())));
422
423        runner.init().unwrap();
424
425        let pre_result = runner.dry_run_phase(Phase::PreDeploy).unwrap();
426        assert_eq!(pre_result.total, 2);
427        assert_eq!(pre_result.pending[0].name, "pre_1");
428        assert_eq!(pre_result.pending[1].name, "pre_2");
429
430        let post_result = runner.dry_run_phase(Phase::PostDeploy).unwrap();
431        assert_eq!(post_result.total, 1);
432        assert_eq!(post_result.pending[0].name, "post_1");
433    }
434
435    #[test]
436    fn test_runner_migrate_to() {
437        let store = MemoryStore::new();
438        let mut runner = MigrationRunner::new(store)
439            .add(
440                FnMigration::new(1, "first", |ctx: &mut Vec<i32>| {
441                    ctx.push(1);
442                    Ok(())
443                })
444                .with_rollback(|ctx: &mut Vec<i32>| {
445                    ctx.retain(|&x| x != 1);
446                    Ok(())
447                }),
448            )
449            .add(
450                FnMigration::new(2, "second", |ctx: &mut Vec<i32>| {
451                    ctx.push(2);
452                    Ok(())
453                })
454                .with_rollback(|ctx: &mut Vec<i32>| {
455                    ctx.retain(|&x| x != 2);
456                    Ok(())
457                }),
458            );
459
460        runner.init().unwrap();
461
462        let mut ctx = vec![];
463
464        // Migrate to version 2
465        runner.migrate_to(&mut ctx, 2).unwrap();
466        assert_eq!(ctx, vec![1, 2]);
467        assert_eq!(runner.current_version().unwrap(), 2);
468
469        // Rollback to version 1
470        runner.migrate_to(&mut ctx, 1).unwrap();
471        assert_eq!(ctx, vec![1]);
472        assert_eq!(runner.current_version().unwrap(), 1);
473
474        // Rollback to version 0
475        runner.migrate_to(&mut ctx, 0).unwrap();
476        assert!(ctx.is_empty());
477        assert_eq!(runner.current_version().unwrap(), 0);
478    }
479
480    #[test]
481    fn test_runner_status() {
482        let store = MemoryStore::new();
483        let mut runner = MigrationRunner::new(store)
484            .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
485            .add(
486                FnMigration::new(2, "second", |_: &mut ()| Ok(())).phase(Phase::PostDeploy),
487            );
488
489        runner.init().unwrap();
490
491        let status = runner.status().unwrap();
492        assert_eq!(status.len(), 2);
493        assert!(!status[0].applied);
494        assert_eq!(status[0].phase, Phase::PreDeploy);
495        assert!(!status[1].applied);
496        assert_eq!(status[1].phase, Phase::PostDeploy);
497
498        runner.migrate(&mut ()).unwrap();
499
500        let status = runner.status().unwrap();
501        assert!(status[0].applied);
502        assert!(status[1].applied);
503    }
504
505    #[test]
506    fn test_runner_validates_order() {
507        let store = MemoryStore::new();
508        let mut runner = MigrationRunner::new(store)
509            .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
510            .add(FnMigration::new(3, "third", |_: &mut ()| Ok(()))); // Skip 2
511
512        let result = runner.init();
513        assert!(matches!(result, Err(Error::InvalidOrder(_))));
514    }
515
516    #[test]
517    fn test_runner_validates_duplicates() {
518        let store = MemoryStore::new();
519        let mut runner = MigrationRunner::new(store)
520            .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
521            .add(FnMigration::new(1, "duplicate", |_: &mut ()| Ok(()))); // Duplicate
522
523        let result = runner.init();
524        assert!(matches!(result, Err(Error::InvalidOrder(_))));
525    }
526}