1use crate::error::{Error, Result};
2use crate::phase::Phase;
3use crate::store::MigrationStore;
4use crate::Migration;
5
6#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct MigrationStatus {
9 pub version: u64,
10 pub name: String,
11 pub phase: Phase,
12 pub applied: bool,
13 pub applied_at: Option<u64>,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct DryRunPlan {
19 pub version: u64,
20 pub name: String,
21 pub phase: Phase,
22 pub can_rollback: bool,
23}
24
25#[derive(Debug, Clone)]
27pub struct DryRunResult {
28 pub pending: Vec<DryRunPlan>,
29 pub total: usize,
30}
31
32impl DryRunResult {
33 pub fn is_empty(&self) -> bool {
34 self.pending.is_empty()
35 }
36}
37
38pub struct MigrationRunner<'a, Ctx, S: MigrationStore> {
44 store: S,
45 migrations: Vec<Box<dyn Migration<Ctx> + 'a>>,
46}
47
48impl<'a, Ctx, S: MigrationStore> MigrationRunner<'a, Ctx, S> {
49 pub fn new(store: S) -> Self {
50 Self {
51 store,
52 migrations: Vec::new(),
53 }
54 }
55
56 pub fn into_store(self) -> S {
58 self.store
59 }
60
61 pub fn store(&self) -> &S {
63 &self.store
64 }
65
66 pub fn store_mut(&mut self) -> &mut S {
68 &mut self.store
69 }
70
71 pub fn add<M: Migration<Ctx> + 'a>(mut self, migration: M) -> Self {
73 self.migrations.push(Box::new(migration));
74 self
75 }
76
77 pub fn add_all<I, M>(mut self, migrations: I) -> Self
79 where
80 I: IntoIterator<Item = M>,
81 M: Migration<Ctx> + 'a,
82 {
83 for m in migrations {
84 self.migrations.push(Box::new(m));
85 }
86 self
87 }
88
89 pub fn init(&mut self) -> Result<()> {
91 self.store.init()?;
92 self.validate()?;
93 Ok(())
94 }
95
96 fn validate(&self) -> Result<()> {
98 let mut versions: Vec<u64> = self.migrations.iter().map(|m| m.version()).collect();
99 versions.sort();
100 versions.dedup();
101
102 if versions.len() != self.migrations.len() {
103 return Err(Error::InvalidOrder("Duplicate migration versions".into()));
104 }
105
106 for (i, &version) in versions.iter().enumerate() {
107 let expected = (i + 1) as u64;
108 if version != expected {
109 return Err(Error::InvalidOrder(format!(
110 "Expected version {}, found {}. Versions must be sequential.",
111 expected, version
112 )));
113 }
114 }
115
116 Ok(())
117 }
118
119 pub fn current_version(&self) -> Result<u64> {
121 self.store.current_version()
122 }
123
124 pub fn status(&self) -> Result<Vec<MigrationStatus>> {
126 let applied = self.store.applied()?;
127 let applied_map: std::collections::HashMap<u64, u64> =
128 applied.into_iter().map(|r| (r.version, r.applied_at)).collect();
129
130 let mut statuses: Vec<MigrationStatus> = self
131 .migrations
132 .iter()
133 .map(|m| {
134 let version = m.version();
135 let applied_at = applied_map.get(&version).copied();
136 MigrationStatus {
137 version,
138 name: m.name().to_string(),
139 phase: m.phase(),
140 applied: applied_at.is_some(),
141 applied_at,
142 }
143 })
144 .collect();
145
146 statuses.sort_by_key(|s| s.version);
147 Ok(statuses)
148 }
149
150 pub fn pending(&self) -> Result<Vec<&dyn Migration<Ctx>>> {
152 let current = self.store.current_version()?;
153 let mut pending: Vec<&dyn Migration<Ctx>> = self
154 .migrations
155 .iter()
156 .filter(|m| m.version() > current)
157 .map(|m| m.as_ref())
158 .collect();
159
160 pending.sort_by_key(|m| m.version());
161 Ok(pending)
162 }
163
164 pub fn pending_phase(&self, phase: Phase) -> Result<Vec<&dyn Migration<Ctx>>> {
166 let current = self.store.current_version()?;
167 let mut pending: Vec<&dyn Migration<Ctx>> = self
168 .migrations
169 .iter()
170 .filter(|m| m.version() > current && m.phase() == phase)
171 .map(|m| m.as_ref())
172 .collect();
173
174 pending.sort_by_key(|m| m.version());
175 Ok(pending)
176 }
177
178 pub fn migrate(&mut self, ctx: &mut Ctx) -> Result<usize> {
180 let pending: Vec<u64> = self.pending()?.iter().map(|m| m.version()).collect();
181 let count = pending.len();
182
183 for version in pending {
184 self.apply_version(ctx, version)?;
185 }
186
187 Ok(count)
188 }
189
190 pub fn migrate_phase(&mut self, ctx: &mut Ctx, phase: Phase) -> Result<usize> {
192 let pending: Vec<u64> = self
193 .pending_phase(phase)?
194 .iter()
195 .map(|m| m.version())
196 .collect();
197 let count = pending.len();
198
199 for version in pending {
200 self.apply_version(ctx, version)?;
201 }
202
203 Ok(count)
204 }
205
206 pub fn dry_run(&self) -> Result<DryRunResult> {
208 let pending = self.pending()?;
209
210 let plans: Vec<DryRunPlan> = pending
211 .iter()
212 .map(|m| DryRunPlan {
213 version: m.version(),
214 name: m.name().to_string(),
215 phase: m.phase(),
216 can_rollback: m.can_rollback(),
217 })
218 .collect();
219
220 Ok(DryRunResult {
221 total: plans.len(),
222 pending: plans,
223 })
224 }
225
226 pub fn dry_run_phase(&self, phase: Phase) -> Result<DryRunResult> {
228 let pending = self.pending_phase(phase)?;
229
230 let plans: Vec<DryRunPlan> = pending
231 .iter()
232 .map(|m| DryRunPlan {
233 version: m.version(),
234 name: m.name().to_string(),
235 phase: m.phase(),
236 can_rollback: m.can_rollback(),
237 })
238 .collect();
239
240 Ok(DryRunResult {
241 total: plans.len(),
242 pending: plans,
243 })
244 }
245
246 pub fn migrate_to(&mut self, ctx: &mut Ctx, target: u64) -> Result<usize> {
248 let current = self.store.current_version()?;
249 let mut count = 0;
250
251 if target > current {
252 let to_apply: Vec<u64> = self
254 .migrations
255 .iter()
256 .filter(|m| m.version() > current && m.version() <= target)
257 .map(|m| m.version())
258 .collect();
259
260 for version in to_apply {
261 self.apply_version(ctx, version)?;
262 count += 1;
263 }
264 } else if target < current {
265 let mut to_rollback: Vec<u64> = self
267 .migrations
268 .iter()
269 .filter(|m| m.version() > target && m.version() <= current)
270 .map(|m| m.version())
271 .collect();
272
273 to_rollback.sort_by(|a, b| b.cmp(a)); for version in to_rollback {
276 self.rollback_version(ctx, version)?;
277 count += 1;
278 }
279 }
280
281 Ok(count)
282 }
283
284 fn apply_version(&mut self, ctx: &mut Ctx, version: u64) -> Result<()> {
286 let migration = self
287 .migrations
288 .iter()
289 .find(|m| m.version() == version)
290 .ok_or(Error::NotFound(version))?;
291
292 let name = migration.name().to_string();
293
294 migration.apply(ctx).map_err(|e| Error::MigrationFailed {
295 version,
296 message: e.to_string(),
297 })?;
298
299 self.store.mark_applied(version, &name)?;
300 Ok(())
301 }
302
303 fn rollback_version(&mut self, ctx: &mut Ctx, version: u64) -> Result<()> {
305 let migration = self
306 .migrations
307 .iter()
308 .find(|m| m.version() == version)
309 .ok_or(Error::NotFound(version))?;
310
311 if !migration.can_rollback() {
312 return Err(Error::RollbackNotSupported(version));
313 }
314
315 migration.rollback(ctx).map_err(|e| Error::MigrationFailed {
316 version,
317 message: e.to_string(),
318 })?;
319
320 self.store.mark_rolled_back(version)?;
321 Ok(())
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use crate::store::MemoryStore;
329 use crate::FnMigration;
330
331 #[test]
332 fn test_runner_migrate() {
333 let store = MemoryStore::new();
334 let mut runner = MigrationRunner::new(store)
335 .add(FnMigration::new(1, "first", |ctx: &mut Vec<i32>| {
336 ctx.push(1);
337 Ok(())
338 }))
339 .add(FnMigration::new(2, "second", |ctx: &mut Vec<i32>| {
340 ctx.push(2);
341 Ok(())
342 }));
343
344 runner.init().unwrap();
345
346 let mut ctx = vec![];
347 let count = runner.migrate(&mut ctx).unwrap();
348
349 assert_eq!(count, 2);
350 assert_eq!(ctx, vec![1, 2]);
351 assert_eq!(runner.current_version().unwrap(), 2);
352 }
353
354 #[test]
355 fn test_runner_migrate_phase() {
356 let store = MemoryStore::new();
357 let mut runner = MigrationRunner::new(store)
358 .add(FnMigration::new(1, "pre_1", |ctx: &mut Vec<&str>| {
359 ctx.push("pre_1");
360 Ok(())
361 }))
362 .add(
363 FnMigration::new(2, "post_1", |ctx: &mut Vec<&str>| {
364 ctx.push("post_1");
365 Ok(())
366 })
367 .phase(Phase::PostDeploy),
368 )
369 .add(FnMigration::new(3, "pre_2", |ctx: &mut Vec<&str>| {
370 ctx.push("pre_2");
371 Ok(())
372 }));
373
374 runner.init().unwrap();
375
376 let mut ctx = vec![];
377
378 let count = runner.migrate_phase(&mut ctx, Phase::PreDeploy).unwrap();
380 assert_eq!(count, 2); assert_eq!(ctx, vec!["pre_1", "pre_2"]);
382 assert_eq!(runner.current_version().unwrap(), 3);
383
384 let pending_post = runner.pending_phase(Phase::PostDeploy).unwrap();
387 assert_eq!(pending_post.len(), 0);
388 }
389
390 #[test]
391 fn test_runner_dry_run() {
392 let store = MemoryStore::new();
393 let mut runner = MigrationRunner::new(store)
394 .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
395 .add(
396 FnMigration::new(2, "second", |_: &mut ()| Ok(()))
397 .phase(Phase::PostDeploy)
398 .with_rollback(|_: &mut ()| Ok(())),
399 );
400
401 runner.init().unwrap();
402
403 let result = runner.dry_run().unwrap();
404 assert_eq!(result.total, 2);
405 assert_eq!(result.pending[0].version, 1);
406 assert_eq!(result.pending[0].phase, Phase::PreDeploy);
407 assert!(!result.pending[0].can_rollback);
408 assert_eq!(result.pending[1].version, 2);
409 assert_eq!(result.pending[1].phase, Phase::PostDeploy);
410 assert!(result.pending[1].can_rollback);
411 }
412
413 #[test]
414 fn test_runner_dry_run_phase() {
415 let store = MemoryStore::new();
416 let mut runner = MigrationRunner::new(store)
417 .add(FnMigration::new(1, "pre_1", |_: &mut ()| Ok(())))
418 .add(
419 FnMigration::new(2, "post_1", |_: &mut ()| Ok(())).phase(Phase::PostDeploy),
420 )
421 .add(FnMigration::new(3, "pre_2", |_: &mut ()| Ok(())));
422
423 runner.init().unwrap();
424
425 let pre_result = runner.dry_run_phase(Phase::PreDeploy).unwrap();
426 assert_eq!(pre_result.total, 2);
427 assert_eq!(pre_result.pending[0].name, "pre_1");
428 assert_eq!(pre_result.pending[1].name, "pre_2");
429
430 let post_result = runner.dry_run_phase(Phase::PostDeploy).unwrap();
431 assert_eq!(post_result.total, 1);
432 assert_eq!(post_result.pending[0].name, "post_1");
433 }
434
435 #[test]
436 fn test_runner_migrate_to() {
437 let store = MemoryStore::new();
438 let mut runner = MigrationRunner::new(store)
439 .add(
440 FnMigration::new(1, "first", |ctx: &mut Vec<i32>| {
441 ctx.push(1);
442 Ok(())
443 })
444 .with_rollback(|ctx: &mut Vec<i32>| {
445 ctx.retain(|&x| x != 1);
446 Ok(())
447 }),
448 )
449 .add(
450 FnMigration::new(2, "second", |ctx: &mut Vec<i32>| {
451 ctx.push(2);
452 Ok(())
453 })
454 .with_rollback(|ctx: &mut Vec<i32>| {
455 ctx.retain(|&x| x != 2);
456 Ok(())
457 }),
458 );
459
460 runner.init().unwrap();
461
462 let mut ctx = vec![];
463
464 runner.migrate_to(&mut ctx, 2).unwrap();
466 assert_eq!(ctx, vec![1, 2]);
467 assert_eq!(runner.current_version().unwrap(), 2);
468
469 runner.migrate_to(&mut ctx, 1).unwrap();
471 assert_eq!(ctx, vec![1]);
472 assert_eq!(runner.current_version().unwrap(), 1);
473
474 runner.migrate_to(&mut ctx, 0).unwrap();
476 assert!(ctx.is_empty());
477 assert_eq!(runner.current_version().unwrap(), 0);
478 }
479
480 #[test]
481 fn test_runner_status() {
482 let store = MemoryStore::new();
483 let mut runner = MigrationRunner::new(store)
484 .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
485 .add(
486 FnMigration::new(2, "second", |_: &mut ()| Ok(())).phase(Phase::PostDeploy),
487 );
488
489 runner.init().unwrap();
490
491 let status = runner.status().unwrap();
492 assert_eq!(status.len(), 2);
493 assert!(!status[0].applied);
494 assert_eq!(status[0].phase, Phase::PreDeploy);
495 assert!(!status[1].applied);
496 assert_eq!(status[1].phase, Phase::PostDeploy);
497
498 runner.migrate(&mut ()).unwrap();
499
500 let status = runner.status().unwrap();
501 assert!(status[0].applied);
502 assert!(status[1].applied);
503 }
504
505 #[test]
506 fn test_runner_validates_order() {
507 let store = MemoryStore::new();
508 let mut runner = MigrationRunner::new(store)
509 .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
510 .add(FnMigration::new(3, "third", |_: &mut ()| Ok(()))); let result = runner.init();
513 assert!(matches!(result, Err(Error::InvalidOrder(_))));
514 }
515
516 #[test]
517 fn test_runner_validates_duplicates() {
518 let store = MemoryStore::new();
519 let mut runner = MigrationRunner::new(store)
520 .add(FnMigration::new(1, "first", |_: &mut ()| Ok(())))
521 .add(FnMigration::new(1, "duplicate", |_: &mut ()| Ok(()))); let result = runner.init();
524 assert!(matches!(result, Err(Error::InvalidOrder(_))));
525 }
526}