Skip to main content

icydb_core/db/migration/
mod.rs

1//! Module: db::migration
2//! Responsibility: explicit migration plan contracts and commit-marker-backed execution.
3//! Does not own: migration row-op derivation policy or schema transformation design.
4//! Boundary: callers provide explicit row-op steps; this module executes them durably.
5
6#[cfg(test)]
7mod tests;
8
9use crate::{
10    db::{
11        Db,
12        commit::{
13            CommitMarker, CommitRowOp, begin_commit_with_migration_state,
14            clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
15        },
16        data::RawDataKey,
17    },
18    error::InternalError,
19    traits::CanisterKind,
20};
21
22const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
23const MIGRATION_STATE_MAGIC: [u8; 2] = *b"MS";
24const MIGRATION_STATE_VERSION_CURRENT: u8 = 1;
25const MIGRATION_STATE_NONE_ROW_KEY_TAG: u8 = 0;
26const MIGRATION_STATE_SOME_ROW_KEY_TAG: u8 = 1;
27
28///
29/// MigrationCursor
30///
31/// Explicit migration resume cursor.
32/// This cursor tracks the next step index to execute in one migration plan.
33/// The migration runtime persists this cursor durably between executions.
34///
35
36#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
37pub struct MigrationCursor {
38    next_step: usize,
39}
40
41impl MigrationCursor {
42    /// Construct the starting migration cursor.
43    #[must_use]
44    pub const fn start() -> Self {
45        Self { next_step: 0 }
46    }
47
48    /// Return the next migration step index to execute.
49    #[must_use]
50    pub const fn next_step(self) -> usize {
51        self.next_step
52    }
53
54    const fn from_step(step_index: usize) -> Self {
55        Self {
56            next_step: step_index,
57        }
58    }
59
60    // Advance one step after successful migration-step execution.
61    const fn advance(self) -> Self {
62        Self {
63            next_step: self.next_step.saturating_add(1),
64        }
65    }
66}
67
68///
69/// PersistedMigrationState
70///
71/// Durable migration-progress record stored in commit control state.
72/// `step_index` stores the next step to execute for `migration_id` and
73/// `migration_version`.
74/// `last_applied_row_key` records the last row key from the last successful
75/// migration step for diagnostics and recovery observability.
76///
77
78#[derive(Clone, Debug, Eq, PartialEq)]
79struct PersistedMigrationState {
80    migration_id: String,
81    migration_version: u64,
82    step_index: u64,
83    last_applied_row_key: Option<Vec<u8>>,
84}
85
86///
87/// MigrationRowOp
88///
89/// Public migration row operation DTO used to build explicit migration steps.
90/// This DTO mirrors commit row-op payload shape without exposing commit internals.
91/// Migration execution converts these DTOs into commit marker row operations.
92///
93
94#[derive(Clone, Debug)]
95pub struct MigrationRowOp {
96    entity_path: String,
97    key: RawDataKey,
98    before: Option<Vec<u8>>,
99    after: Option<Vec<u8>>,
100    schema_fingerprint: [u8; 16],
101}
102
103impl MigrationRowOp {
104    /// Construct one validated migration row operation DTO.
105    ///
106    /// The constructor is the only public construction boundary for migration
107    /// row operations. It validates the runtime entity path, rejects no-op row
108    /// operations, and decodes the raw data key once before the operation enters
109    /// the lower migration engine.
110    pub fn new(
111        entity_path: impl Into<String>,
112        key: Vec<u8>,
113        before: Option<Vec<u8>>,
114        after: Option<Vec<u8>>,
115        schema_fingerprint: [u8; 16],
116    ) -> Result<Self, InternalError> {
117        let entity_path = entity_path.into();
118        validate_non_empty_label(entity_path.as_str(), "migration row op entity path")?;
119        if before.is_none() && after.is_none() {
120            return Err(InternalError::migration_row_op_payload_required(
121                entity_path.as_str(),
122            ));
123        }
124
125        let validated = CommitRowOp::try_new_bytes(
126            entity_path.clone(),
127            key.as_slice(),
128            before.clone(),
129            after.clone(),
130            schema_fingerprint,
131        )?;
132
133        Ok(Self {
134            entity_path,
135            key: validated.key,
136            before,
137            after,
138            schema_fingerprint,
139        })
140    }
141
142    /// Return the runtime entity path resolved by commit runtime hooks.
143    #[must_use]
144    pub const fn entity_path(&self) -> &str {
145        self.entity_path.as_str()
146    }
147
148    /// Borrow the encoded raw data key bytes for the target row identity.
149    #[must_use]
150    pub const fn key(&self) -> &[u8] {
151        self.key.as_bytes()
152    }
153
154    /// Borrow the optional encoded before-image row payload.
155    #[must_use]
156    pub fn before(&self) -> Option<&[u8]> {
157        self.before.as_deref()
158    }
159
160    /// Borrow the optional encoded after-image row payload.
161    #[must_use]
162    pub fn after(&self) -> Option<&[u8]> {
163        self.after.as_deref()
164    }
165
166    /// Return the schema fingerprint expected by commit prepare/replay.
167    #[must_use]
168    pub const fn schema_fingerprint(&self) -> [u8; 16] {
169        self.schema_fingerprint
170    }
171}
172
173impl TryFrom<MigrationRowOp> for CommitRowOp {
174    type Error = InternalError;
175
176    fn try_from(op: MigrationRowOp) -> Result<Self, Self::Error> {
177        Ok(Self::new(
178            op.entity_path,
179            op.key,
180            op.before,
181            op.after,
182            op.schema_fingerprint,
183        ))
184    }
185}
186
187///
188/// MigrationStep
189///
190/// One explicit migration step represented as ordered commit row operations.
191/// Step ordering is deterministic and preserved exactly at execution time.
192/// Empty step names and empty row-op vectors are rejected by constructor.
193///
194
195#[derive(Clone, Debug)]
196pub struct MigrationStep {
197    name: String,
198    row_ops: Vec<CommitRowOp>,
199}
200
201impl MigrationStep {
202    /// Build one validated migration step from public migration row-op DTOs.
203    pub fn from_row_ops(
204        name: impl Into<String>,
205        row_ops: Vec<MigrationRowOp>,
206    ) -> Result<Self, InternalError> {
207        let commit_row_ops = row_ops
208            .into_iter()
209            .map(CommitRowOp::try_from)
210            .collect::<Result<Vec<_>, _>>()?;
211        Self::new(name, commit_row_ops)
212    }
213
214    /// Build one validated migration step.
215    pub(in crate::db) fn new(
216        name: impl Into<String>,
217        row_ops: Vec<CommitRowOp>,
218    ) -> Result<Self, InternalError> {
219        let name = name.into();
220        validate_non_empty_label(name.as_str(), "migration step name")?;
221
222        if row_ops.is_empty() {
223            return Err(InternalError::migration_step_row_ops_required(&name));
224        }
225
226        Ok(Self { name, row_ops })
227    }
228
229    /// Return this step's stable display name.
230    #[must_use]
231    pub const fn name(&self) -> &str {
232        self.name.as_str()
233    }
234
235    /// Return the number of row operations in this step.
236    #[must_use]
237    pub const fn row_op_count(&self) -> usize {
238        self.row_ops.len()
239    }
240}
241
242///
243/// MigrationPlan
244///
245/// Explicit, ordered migration contract composed of named row-op steps.
246/// The plan id is stable caller-owned metadata for observability and retries.
247/// The plan version is caller-owned monotonic metadata for upgrade safety.
248/// Steps are executed sequentially in insertion order and never reordered.
249///
250
251#[derive(Clone, Debug)]
252pub struct MigrationPlan {
253    id: String,
254    version: u64,
255    steps: Vec<MigrationStep>,
256}
257
258impl MigrationPlan {
259    /// Build one validated migration plan.
260    pub fn new(
261        id: impl Into<String>,
262        version: u64,
263        steps: Vec<MigrationStep>,
264    ) -> Result<Self, InternalError> {
265        let id = id.into();
266        validate_non_empty_label(id.as_str(), "migration plan id")?;
267        if version == 0 {
268            return Err(InternalError::migration_plan_version_required(&id));
269        }
270
271        if steps.is_empty() {
272            return Err(InternalError::migration_plan_steps_required(&id));
273        }
274
275        Ok(Self { id, version, steps })
276    }
277
278    /// Return this plan's stable id.
279    #[must_use]
280    pub const fn id(&self) -> &str {
281        self.id.as_str()
282    }
283
284    /// Return this plan's stable version.
285    #[must_use]
286    pub const fn version(&self) -> u64 {
287        self.version
288    }
289
290    /// Return the number of steps in this plan.
291    #[must_use]
292    pub const fn len(&self) -> usize {
293        self.steps.len()
294    }
295
296    /// Return whether this plan has no steps.
297    #[must_use]
298    pub const fn is_empty(&self) -> bool {
299        self.steps.is_empty()
300    }
301
302    fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
303        self.steps.get(index).ok_or_else(|| {
304            InternalError::migration_cursor_out_of_bounds(
305                self.id(),
306                self.version(),
307                index,
308                self.len(),
309            )
310        })
311    }
312}
313
314///
315/// MigrationRunState
316///
317/// Bounded migration-execution completion status.
318///
319
320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
321pub enum MigrationRunState {
322    /// No remaining steps; migration plan is complete at returned cursor.
323    Complete,
324    /// Remaining steps exist; rerun the same plan to resume from durable state.
325    NeedsResume,
326}
327
328///
329/// MigrationRunOutcome
330///
331/// Summary of one bounded migration-execution run.
332/// This captures the next cursor plus applied-step/row-op counters.
333/// Durable cursor persistence is internal to migration runtime state.
334///
335
336#[derive(Clone, Copy, Debug, Eq, PartialEq)]
337pub struct MigrationRunOutcome {
338    cursor: MigrationCursor,
339    applied_steps: usize,
340    applied_row_ops: usize,
341    state: MigrationRunState,
342}
343
344impl MigrationRunOutcome {
345    const fn new(
346        cursor: MigrationCursor,
347        applied_steps: usize,
348        applied_row_ops: usize,
349        state: MigrationRunState,
350    ) -> Self {
351        Self {
352            cursor,
353            applied_steps,
354            applied_row_ops,
355            state,
356        }
357    }
358
359    /// Return the next migration cursor.
360    #[must_use]
361    pub const fn cursor(self) -> MigrationCursor {
362        self.cursor
363    }
364
365    /// Return the number of steps applied in this bounded run.
366    #[must_use]
367    pub const fn applied_steps(self) -> usize {
368        self.applied_steps
369    }
370
371    /// Return the number of row ops applied in this bounded run.
372    #[must_use]
373    pub const fn applied_row_ops(self) -> usize {
374        self.applied_row_ops
375    }
376
377    /// Return bounded-run completion state.
378    #[must_use]
379    pub const fn state(self) -> MigrationRunState {
380        self.state
381    }
382}
383
384/// Execute one bounded migration run from durable internal cursor state.
385///
386/// Contract:
387/// - always runs commit recovery before applying migration steps
388/// - executes at most `max_steps` deterministic steps in-order
389/// - each step is persisted through commit-marker protocol
390/// - migration cursor progress is atomically persisted with step marker writes
391/// - step failures preserve marker authority for explicit fail-closed recovery
392pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
393    db: &Db<C>,
394    plan: &MigrationPlan,
395    max_steps: usize,
396) -> Result<MigrationRunOutcome, InternalError> {
397    // Phase 1: validate run-shape controls before touching commit state.
398    if max_steps == 0 {
399        return Err(InternalError::migration_execution_requires_max_steps(
400            plan.id(),
401        ));
402    }
403
404    // Phase 2: recover any in-flight commit marker before migration execution.
405    db.ensure_recovered_state()?;
406
407    // Phase 3: load durable migration cursor state from commit control storage.
408    let mut next_cursor = load_durable_cursor_for_plan(plan)?;
409
410    // Phase 4: execute a bounded number of deterministic migration steps.
411    let mut applied_steps = 0usize;
412    let mut applied_row_ops = 0usize;
413    while applied_steps < max_steps {
414        if next_cursor.next_step() >= plan.len() {
415            break;
416        }
417
418        let step_index = next_cursor.next_step();
419        let step = plan.step_at(step_index)?;
420        let next_cursor_after_step = next_cursor.advance();
421        let next_state_bytes =
422            encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
423        execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
424
425        applied_steps = applied_steps.saturating_add(1);
426        applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
427        next_cursor = next_cursor_after_step;
428    }
429
430    let state = if next_cursor.next_step() == plan.len() {
431        clear_migration_state_bytes()?;
432        MigrationRunState::Complete
433    } else {
434        MigrationRunState::NeedsResume
435    };
436
437    Ok(MigrationRunOutcome::new(
438        next_cursor,
439        applied_steps,
440        applied_row_ops,
441        state,
442    ))
443}
444
445fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
446    let Some(bytes) = load_migration_state_bytes()? else {
447        return Ok(MigrationCursor::start());
448    };
449    let state = decode_persisted_migration_state(&bytes)?;
450    if state.migration_id != plan.id() || state.migration_version != plan.version() {
451        return Err(InternalError::migration_in_progress_conflict(
452            plan.id(),
453            plan.version(),
454            &state.migration_id,
455            state.migration_version,
456        ));
457    }
458
459    let step_index = usize::try_from(state.step_index).map_err(|_| {
460        InternalError::migration_persisted_step_index_invalid_usize(
461            plan.id(),
462            plan.version(),
463            state.step_index,
464        )
465    })?;
466    if step_index > plan.len() {
467        return Err(InternalError::migration_persisted_step_index_out_of_bounds(
468            plan.id(),
469            plan.version(),
470            step_index,
471            plan.len(),
472        ));
473    }
474
475    if step_index == plan.len() {
476        clear_migration_state_bytes()?;
477    }
478
479    Ok(MigrationCursor::from_step(step_index))
480}
481
482fn encode_durable_cursor_state(
483    plan: &MigrationPlan,
484    cursor: MigrationCursor,
485    last_applied_row_op: Option<&CommitRowOp>,
486) -> Result<Vec<u8>, InternalError> {
487    let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
488        InternalError::migration_next_step_index_u64_required(plan.id(), plan.version())
489    })?;
490    let state = PersistedMigrationState {
491        migration_id: plan.id().to_string(),
492        migration_version: plan.version(),
493        step_index,
494        last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.as_bytes().to_vec()),
495    };
496
497    encode_persisted_migration_state(&state)
498}
499
500fn decode_persisted_migration_state(
501    bytes: &[u8],
502) -> Result<PersistedMigrationState, InternalError> {
503    // Phase 1: reject oversized payloads before any structural decode work.
504    if bytes.len() > MAX_MIGRATION_STATE_BYTES {
505        return Err(InternalError::serialize_corruption(format!(
506            "migration state decode failed: payload size {} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
507            bytes.len(),
508        )));
509    }
510
511    // Phase 2: validate the fixed header and read the fixed-width fields.
512    let mut cursor = bytes;
513    decode_migration_state_magic(&mut cursor)?;
514    let format_version = decode_migration_state_u8(&mut cursor, "format version")?;
515    validate_migration_state_format_version(format_version)?;
516    let migration_id = decode_migration_state_string(&mut cursor, "migration_id")?;
517    let migration_version = decode_migration_state_u64(&mut cursor, "migration_version")?;
518    let step_index = decode_migration_state_u64(&mut cursor, "step_index")?;
519    let last_applied_row_key =
520        decode_migration_state_optional_bytes(&mut cursor, "last_applied_row_key")?;
521
522    // Phase 3: reject trailing bytes so the codec stays single-version and exact.
523    if !cursor.is_empty() {
524        return Err(InternalError::serialize_corruption(
525            "migration state decode failed: trailing bytes",
526        ));
527    }
528
529    Ok(PersistedMigrationState {
530        migration_id,
531        migration_version,
532        step_index,
533        last_applied_row_key,
534    })
535}
536
537fn encode_persisted_migration_state(
538    state: &PersistedMigrationState,
539) -> Result<Vec<u8>, InternalError> {
540    // Phase 1: pre-compute the bounded payload size so overflow and policy
541    // violations fail before any bytes are emitted.
542    let row_key_len = state.last_applied_row_key.as_ref().map_or(0usize, Vec::len);
543    let encoded_len = MIGRATION_STATE_MAGIC
544        .len()
545        .saturating_add(1)
546        .saturating_add(4)
547        .saturating_add(state.migration_id.len())
548        .saturating_add(8)
549        .saturating_add(8)
550        .saturating_add(1)
551        .saturating_add(if state.last_applied_row_key.is_some() {
552            4usize.saturating_add(row_key_len)
553        } else {
554            0
555        });
556    if encoded_len > MAX_MIGRATION_STATE_BYTES {
557        return Err(InternalError::migration_state_serialize_failed(format!(
558            "payload size {encoded_len} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
559        )));
560    }
561
562    let migration_id_len = u32::try_from(state.migration_id.len()).map_err(|_| {
563        InternalError::migration_state_serialize_failed("migration_id exceeds u32 length")
564    })?;
565    let row_key_len_u32 = u32::try_from(row_key_len).map_err(|_| {
566        InternalError::migration_state_serialize_failed("last_applied_row_key exceeds u32 length")
567    })?;
568
569    // Phase 2: write the fixed binary payload in one exact stable order.
570    let mut encoded = Vec::with_capacity(encoded_len);
571    encoded.extend_from_slice(&MIGRATION_STATE_MAGIC);
572    encoded.push(MIGRATION_STATE_VERSION_CURRENT);
573    encoded.extend_from_slice(&migration_id_len.to_be_bytes());
574    encoded.extend_from_slice(state.migration_id.as_bytes());
575    encoded.extend_from_slice(&state.migration_version.to_be_bytes());
576    encoded.extend_from_slice(&state.step_index.to_be_bytes());
577
578    match state.last_applied_row_key.as_ref() {
579        Some(row_key) => {
580            encoded.push(MIGRATION_STATE_SOME_ROW_KEY_TAG);
581            encoded.extend_from_slice(&row_key_len_u32.to_be_bytes());
582            encoded.extend_from_slice(row_key);
583        }
584        None => encoded.push(MIGRATION_STATE_NONE_ROW_KEY_TAG),
585    }
586
587    Ok(encoded)
588}
589
590// Decode and validate the fixed migration-state magic prefix.
591fn decode_migration_state_magic(bytes: &mut &[u8]) -> Result<(), InternalError> {
592    let magic = take_migration_state_bytes(bytes, MIGRATION_STATE_MAGIC.len(), "magic")?;
593    if magic != MIGRATION_STATE_MAGIC {
594        return Err(InternalError::serialize_corruption(
595            "migration state decode failed: invalid magic",
596        ));
597    }
598
599    Ok(())
600}
601
602// Decode one fixed-width u8 field from the migration-state payload.
603fn decode_migration_state_u8(bytes: &mut &[u8], label: &'static str) -> Result<u8, InternalError> {
604    Ok(take_migration_state_bytes(bytes, 1, label)?[0])
605}
606
607// Decode one fixed-width u64 field from the migration-state payload.
608fn decode_migration_state_u64(
609    bytes: &mut &[u8],
610    label: &'static str,
611) -> Result<u64, InternalError> {
612    let raw = take_migration_state_bytes(bytes, 8, label)?;
613
614    Ok(u64::from_be_bytes([
615        raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
616    ]))
617}
618
619// Decode one length-prefixed UTF-8 string field from the migration-state payload.
620fn decode_migration_state_string(
621    bytes: &mut &[u8],
622    label: &'static str,
623) -> Result<String, InternalError> {
624    let raw = decode_migration_state_length_prefixed_bytes(bytes, label)?;
625
626    String::from_utf8(raw.to_vec()).map_err(|_| {
627        InternalError::serialize_corruption(format!(
628            "migration state decode failed: {label} is not valid UTF-8",
629        ))
630    })
631}
632
633// Decode one tagged optional byte vector from the migration-state payload.
634fn decode_migration_state_optional_bytes(
635    bytes: &mut &[u8],
636    label: &'static str,
637) -> Result<Option<Vec<u8>>, InternalError> {
638    let tag = decode_migration_state_u8(bytes, label)?;
639
640    match tag {
641        MIGRATION_STATE_NONE_ROW_KEY_TAG => Ok(None),
642        MIGRATION_STATE_SOME_ROW_KEY_TAG => Ok(Some(
643            decode_migration_state_length_prefixed_bytes(bytes, label)?.to_vec(),
644        )),
645        _ => Err(InternalError::serialize_corruption(format!(
646            "migration state decode failed: invalid {label} tag {tag}",
647        ))),
648    }
649}
650
651// Decode one length-prefixed byte slice from the migration-state payload.
652fn decode_migration_state_length_prefixed_bytes<'a>(
653    bytes: &mut &'a [u8],
654    label: &'static str,
655) -> Result<&'a [u8], InternalError> {
656    let raw_len = take_migration_state_bytes(bytes, 4, label)?;
657    let len = usize::try_from(u32::from_be_bytes([
658        raw_len[0], raw_len[1], raw_len[2], raw_len[3],
659    ]))
660    .map_err(|_| {
661        InternalError::serialize_corruption(format!(
662            "migration state decode failed: {label} length out of range",
663        ))
664    })?;
665
666    take_migration_state_bytes(bytes, len, label)
667}
668
669// Borrow one exact byte span from the migration-state payload.
670fn take_migration_state_bytes<'a>(
671    bytes: &mut &'a [u8],
672    len: usize,
673    label: &'static str,
674) -> Result<&'a [u8], InternalError> {
675    if bytes.len() < len {
676        return Err(InternalError::serialize_corruption(format!(
677            "migration state decode failed: truncated {label}",
678        )));
679    }
680
681    let (head, tail) = bytes.split_at(len);
682    *bytes = tail;
683
684    Ok(head)
685}
686
687// Validate the single supported migration-state format version.
688fn validate_migration_state_format_version(format_version: u8) -> Result<(), InternalError> {
689    if format_version == MIGRATION_STATE_VERSION_CURRENT {
690        return Ok(());
691    }
692
693    Err(InternalError::serialize_incompatible_persisted_format(
694        format!(
695            "migration state format version {format_version} is unsupported by runtime version {MIGRATION_STATE_VERSION_CURRENT}",
696        ),
697    ))
698}
699
700fn execute_migration_step<C: CanisterKind>(
701    db: &Db<C>,
702    plan: &MigrationPlan,
703    step_index: usize,
704    step: &MigrationStep,
705    next_state_bytes: Vec<u8>,
706) -> Result<(), InternalError> {
707    // Phase 1: persist marker authority + next-step cursor state atomically.
708    let marker = CommitMarker::new(step.row_ops.clone())
709        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
710    let commit = begin_commit_with_migration_state(marker, next_state_bytes)
711        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
712
713    // Phase 2: apply step row ops under commit-window durability semantics.
714    finish_commit(commit, |_| apply_marker_row_ops(db, &step.row_ops))
715        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
716
717    Ok(())
718}
719
720fn apply_marker_row_ops<C: CanisterKind>(
721    db: &Db<C>,
722    row_ops: &[CommitRowOp],
723) -> Result<(), InternalError> {
724    // Phase 1: pre-prepare all row operations before mutating stores.
725    let mut prepared = Vec::with_capacity(row_ops.len());
726    for row_op in row_ops {
727        prepared.push(db.prepare_row_commit_op(row_op)?);
728    }
729
730    // Phase 2: apply the prepared operations in stable marker order.
731    for prepared_op in prepared {
732        prepared_op.apply();
733    }
734
735    Ok(())
736}
737
738fn annotate_step_error(
739    plan: &MigrationPlan,
740    step_index: usize,
741    step_name: &str,
742    err: InternalError,
743) -> InternalError {
744    let source_message = err.message().to_string();
745
746    err.with_message(format!(
747        "migration '{}' step {} ('{}') failed: {}",
748        plan.id(),
749        step_index,
750        step_name,
751        source_message,
752    ))
753}
754
755fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
756    if value.trim().is_empty() {
757        return Err(InternalError::migration_label_empty(label));
758    }
759
760    Ok(())
761}