Skip to main content

icydb_core/db/migration/
mod.rs

1//! Module: db::migration
2//! Responsibility: explicit migration plan contracts and commit-marker-backed execution.
3//! Does not own: migration row-op derivation policy or schema transformation design.
4//! Boundary: callers provide explicit row-op steps; this module executes them durably.
5
6#[cfg(test)]
7mod tests;
8
9use crate::{
10    db::{
11        Db,
12        commit::{
13            CommitMarker, CommitRowOp, begin_commit_with_migration_state,
14            clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
15        },
16    },
17    error::InternalError,
18    traits::CanisterKind,
19};
20
21const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
22const MIGRATION_STATE_MAGIC: [u8; 2] = *b"MS";
23const MIGRATION_STATE_VERSION_CURRENT: u8 = 1;
24const MIGRATION_STATE_NONE_ROW_KEY_TAG: u8 = 0;
25const MIGRATION_STATE_SOME_ROW_KEY_TAG: u8 = 1;
26
27///
28/// MigrationCursor
29///
30/// Explicit migration resume cursor.
31/// This cursor tracks the next step index to execute in one migration plan.
32/// The migration runtime persists this cursor durably between executions.
33///
34
35#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
36pub struct MigrationCursor {
37    next_step: usize,
38}
39
40impl MigrationCursor {
41    /// Construct the starting migration cursor.
42    #[must_use]
43    pub const fn start() -> Self {
44        Self { next_step: 0 }
45    }
46
47    /// Return the next migration step index to execute.
48    #[must_use]
49    pub const fn next_step(self) -> usize {
50        self.next_step
51    }
52
53    const fn from_step(step_index: usize) -> Self {
54        Self {
55            next_step: step_index,
56        }
57    }
58
59    // Advance one step after successful migration-step execution.
60    const fn advance(self) -> Self {
61        Self {
62            next_step: self.next_step.saturating_add(1),
63        }
64    }
65}
66
67///
68/// PersistedMigrationState
69///
70/// Durable migration-progress record stored in commit control state.
71/// `step_index` stores the next step to execute for `migration_id` and
72/// `migration_version`.
73/// `last_applied_row_key` records the last row key from the last successful
74/// migration step for diagnostics and recovery observability.
75///
76
77#[derive(Clone, Debug, Eq, PartialEq)]
78struct PersistedMigrationState {
79    migration_id: String,
80    migration_version: u64,
81    step_index: u64,
82    last_applied_row_key: Option<Vec<u8>>,
83}
84
85///
86/// MigrationRowOp
87///
88/// Public migration row operation DTO used to build explicit migration steps.
89/// This DTO mirrors commit row-op payload shape without exposing commit internals.
90/// Migration execution converts these DTOs into commit marker row operations.
91///
92
93#[derive(Clone, Debug)]
94pub struct MigrationRowOp {
95    /// Runtime entity path resolved by commit runtime hooks during execution.
96    pub entity_path: String,
97    /// Encoded raw data key bytes for target row identity.
98    pub key: Vec<u8>,
99    /// Optional encoded before-image row payload.
100    pub before: Option<Vec<u8>>,
101    /// Optional encoded after-image row payload.
102    pub after: Option<Vec<u8>>,
103    /// Schema fingerprint expected by commit prepare/replay for this row op.
104    pub schema_fingerprint: [u8; 16],
105}
106
107impl MigrationRowOp {
108    /// Construct one migration row operation DTO.
109    #[must_use]
110    pub fn new(
111        entity_path: impl Into<String>,
112        key: Vec<u8>,
113        before: Option<Vec<u8>>,
114        after: Option<Vec<u8>>,
115        schema_fingerprint: [u8; 16],
116    ) -> Self {
117        Self {
118            entity_path: entity_path.into(),
119            key,
120            before,
121            after,
122            schema_fingerprint,
123        }
124    }
125}
126
127impl TryFrom<MigrationRowOp> for CommitRowOp {
128    type Error = InternalError;
129
130    fn try_from(op: MigrationRowOp) -> Result<Self, Self::Error> {
131        Self::try_new_bytes(
132            op.entity_path,
133            op.key.as_slice(),
134            op.before,
135            op.after,
136            op.schema_fingerprint,
137        )
138    }
139}
140
141///
142/// MigrationStep
143///
144/// One explicit migration step represented as ordered commit row operations.
145/// Step ordering is deterministic and preserved exactly at execution time.
146/// Empty step names and empty row-op vectors are rejected by constructor.
147///
148
149#[derive(Clone, Debug)]
150pub struct MigrationStep {
151    name: String,
152    row_ops: Vec<CommitRowOp>,
153}
154
155impl MigrationStep {
156    /// Build one validated migration step from public migration row-op DTOs.
157    pub fn from_row_ops(
158        name: impl Into<String>,
159        row_ops: Vec<MigrationRowOp>,
160    ) -> Result<Self, InternalError> {
161        let commit_row_ops = row_ops
162            .into_iter()
163            .map(CommitRowOp::try_from)
164            .collect::<Result<Vec<_>, _>>()?;
165        Self::new(name, commit_row_ops)
166    }
167
168    /// Build one validated migration step.
169    pub(in crate::db) fn new(
170        name: impl Into<String>,
171        row_ops: Vec<CommitRowOp>,
172    ) -> Result<Self, InternalError> {
173        let name = name.into();
174        validate_non_empty_label(name.as_str(), "migration step name")?;
175
176        if row_ops.is_empty() {
177            return Err(InternalError::migration_step_row_ops_required(&name));
178        }
179
180        Ok(Self { name, row_ops })
181    }
182
183    /// Return this step's stable display name.
184    #[must_use]
185    pub const fn name(&self) -> &str {
186        self.name.as_str()
187    }
188
189    /// Return the number of row operations in this step.
190    #[must_use]
191    pub const fn row_op_count(&self) -> usize {
192        self.row_ops.len()
193    }
194}
195
196///
197/// MigrationPlan
198///
199/// Explicit, ordered migration contract composed of named row-op steps.
200/// The plan id is stable caller-owned metadata for observability and retries.
201/// The plan version is caller-owned monotonic metadata for upgrade safety.
202/// Steps are executed sequentially in insertion order and never reordered.
203///
204
205#[derive(Clone, Debug)]
206pub struct MigrationPlan {
207    id: String,
208    version: u64,
209    steps: Vec<MigrationStep>,
210}
211
212impl MigrationPlan {
213    /// Build one validated migration plan.
214    pub fn new(
215        id: impl Into<String>,
216        version: u64,
217        steps: Vec<MigrationStep>,
218    ) -> Result<Self, InternalError> {
219        let id = id.into();
220        validate_non_empty_label(id.as_str(), "migration plan id")?;
221        if version == 0 {
222            return Err(InternalError::migration_plan_version_required(&id));
223        }
224
225        if steps.is_empty() {
226            return Err(InternalError::migration_plan_steps_required(&id));
227        }
228
229        Ok(Self { id, version, steps })
230    }
231
232    /// Return this plan's stable id.
233    #[must_use]
234    pub const fn id(&self) -> &str {
235        self.id.as_str()
236    }
237
238    /// Return this plan's stable version.
239    #[must_use]
240    pub const fn version(&self) -> u64 {
241        self.version
242    }
243
244    /// Return the number of steps in this plan.
245    #[must_use]
246    pub const fn len(&self) -> usize {
247        self.steps.len()
248    }
249
250    /// Return whether this plan has no steps.
251    #[must_use]
252    pub const fn is_empty(&self) -> bool {
253        self.steps.is_empty()
254    }
255
256    fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
257        self.steps.get(index).ok_or_else(|| {
258            InternalError::migration_cursor_out_of_bounds(
259                self.id(),
260                self.version(),
261                index,
262                self.len(),
263            )
264        })
265    }
266}
267
268///
269/// MigrationRunState
270///
271/// Bounded migration-execution completion status.
272///
273
274#[derive(Clone, Copy, Debug, Eq, PartialEq)]
275pub enum MigrationRunState {
276    /// No remaining steps; migration plan is complete at returned cursor.
277    Complete,
278    /// Remaining steps exist; rerun the same plan to resume from durable state.
279    NeedsResume,
280}
281
282///
283/// MigrationRunOutcome
284///
285/// Summary of one bounded migration-execution run.
286/// This captures the next cursor plus applied-step/row-op counters.
287/// Durable cursor persistence is internal to migration runtime state.
288///
289
290#[derive(Clone, Copy, Debug, Eq, PartialEq)]
291pub struct MigrationRunOutcome {
292    cursor: MigrationCursor,
293    applied_steps: usize,
294    applied_row_ops: usize,
295    state: MigrationRunState,
296}
297
298impl MigrationRunOutcome {
299    const fn new(
300        cursor: MigrationCursor,
301        applied_steps: usize,
302        applied_row_ops: usize,
303        state: MigrationRunState,
304    ) -> Self {
305        Self {
306            cursor,
307            applied_steps,
308            applied_row_ops,
309            state,
310        }
311    }
312
313    /// Return the next migration cursor.
314    #[must_use]
315    pub const fn cursor(self) -> MigrationCursor {
316        self.cursor
317    }
318
319    /// Return the number of steps applied in this bounded run.
320    #[must_use]
321    pub const fn applied_steps(self) -> usize {
322        self.applied_steps
323    }
324
325    /// Return the number of row ops applied in this bounded run.
326    #[must_use]
327    pub const fn applied_row_ops(self) -> usize {
328        self.applied_row_ops
329    }
330
331    /// Return bounded-run completion state.
332    #[must_use]
333    pub const fn state(self) -> MigrationRunState {
334        self.state
335    }
336}
337
338/// Execute one bounded migration run from durable internal cursor state.
339///
340/// Contract:
341/// - always runs commit recovery before applying migration steps
342/// - executes at most `max_steps` deterministic steps in-order
343/// - each step is persisted through commit-marker protocol
344/// - migration cursor progress is atomically persisted with step marker writes
345/// - step failures preserve marker authority for explicit fail-closed recovery
346pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
347    db: &Db<C>,
348    plan: &MigrationPlan,
349    max_steps: usize,
350) -> Result<MigrationRunOutcome, InternalError> {
351    // Phase 1: validate run-shape controls before touching commit state.
352    if max_steps == 0 {
353        return Err(InternalError::migration_execution_requires_max_steps(
354            plan.id(),
355        ));
356    }
357
358    // Phase 2: recover any in-flight commit marker before migration execution.
359    db.ensure_recovered_state()?;
360
361    // Phase 3: load durable migration cursor state from commit control storage.
362    let mut next_cursor = load_durable_cursor_for_plan(plan)?;
363
364    // Phase 4: execute a bounded number of deterministic migration steps.
365    let mut applied_steps = 0usize;
366    let mut applied_row_ops = 0usize;
367    while applied_steps < max_steps {
368        if next_cursor.next_step() >= plan.len() {
369            break;
370        }
371
372        let step_index = next_cursor.next_step();
373        let step = plan.step_at(step_index)?;
374        let next_cursor_after_step = next_cursor.advance();
375        let next_state_bytes =
376            encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
377        execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
378
379        applied_steps = applied_steps.saturating_add(1);
380        applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
381        next_cursor = next_cursor_after_step;
382    }
383
384    let state = if next_cursor.next_step() == plan.len() {
385        clear_migration_state_bytes()?;
386        MigrationRunState::Complete
387    } else {
388        MigrationRunState::NeedsResume
389    };
390
391    Ok(MigrationRunOutcome::new(
392        next_cursor,
393        applied_steps,
394        applied_row_ops,
395        state,
396    ))
397}
398
399fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
400    let Some(bytes) = load_migration_state_bytes()? else {
401        return Ok(MigrationCursor::start());
402    };
403    let state = decode_persisted_migration_state(&bytes)?;
404    if state.migration_id != plan.id() || state.migration_version != plan.version() {
405        return Err(InternalError::migration_in_progress_conflict(
406            plan.id(),
407            plan.version(),
408            &state.migration_id,
409            state.migration_version,
410        ));
411    }
412
413    let step_index = usize::try_from(state.step_index).map_err(|_| {
414        InternalError::migration_persisted_step_index_invalid_usize(
415            plan.id(),
416            plan.version(),
417            state.step_index,
418        )
419    })?;
420    if step_index > plan.len() {
421        return Err(InternalError::migration_persisted_step_index_out_of_bounds(
422            plan.id(),
423            plan.version(),
424            step_index,
425            plan.len(),
426        ));
427    }
428
429    if step_index == plan.len() {
430        clear_migration_state_bytes()?;
431    }
432
433    Ok(MigrationCursor::from_step(step_index))
434}
435
436fn encode_durable_cursor_state(
437    plan: &MigrationPlan,
438    cursor: MigrationCursor,
439    last_applied_row_op: Option<&CommitRowOp>,
440) -> Result<Vec<u8>, InternalError> {
441    let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
442        InternalError::migration_next_step_index_u64_required(plan.id(), plan.version())
443    })?;
444    let state = PersistedMigrationState {
445        migration_id: plan.id().to_string(),
446        migration_version: plan.version(),
447        step_index,
448        last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.as_bytes().to_vec()),
449    };
450
451    encode_persisted_migration_state(&state)
452}
453
454fn decode_persisted_migration_state(
455    bytes: &[u8],
456) -> Result<PersistedMigrationState, InternalError> {
457    // Phase 1: reject oversized payloads before any structural decode work.
458    if bytes.len() > MAX_MIGRATION_STATE_BYTES {
459        return Err(InternalError::serialize_corruption(format!(
460            "migration state decode failed: payload size {} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
461            bytes.len(),
462        )));
463    }
464
465    // Phase 2: validate the fixed header and read the fixed-width fields.
466    let mut cursor = bytes;
467    decode_migration_state_magic(&mut cursor)?;
468    let format_version = decode_migration_state_u8(&mut cursor, "format version")?;
469    validate_migration_state_format_version(format_version)?;
470    let migration_id = decode_migration_state_string(&mut cursor, "migration_id")?;
471    let migration_version = decode_migration_state_u64(&mut cursor, "migration_version")?;
472    let step_index = decode_migration_state_u64(&mut cursor, "step_index")?;
473    let last_applied_row_key =
474        decode_migration_state_optional_bytes(&mut cursor, "last_applied_row_key")?;
475
476    // Phase 3: reject trailing bytes so the codec stays single-version and exact.
477    if !cursor.is_empty() {
478        return Err(InternalError::serialize_corruption(
479            "migration state decode failed: trailing bytes",
480        ));
481    }
482
483    Ok(PersistedMigrationState {
484        migration_id,
485        migration_version,
486        step_index,
487        last_applied_row_key,
488    })
489}
490
491fn encode_persisted_migration_state(
492    state: &PersistedMigrationState,
493) -> Result<Vec<u8>, InternalError> {
494    // Phase 1: pre-compute the bounded payload size so overflow and policy
495    // violations fail before any bytes are emitted.
496    let row_key_len = state.last_applied_row_key.as_ref().map_or(0usize, Vec::len);
497    let encoded_len = MIGRATION_STATE_MAGIC
498        .len()
499        .saturating_add(1)
500        .saturating_add(4)
501        .saturating_add(state.migration_id.len())
502        .saturating_add(8)
503        .saturating_add(8)
504        .saturating_add(1)
505        .saturating_add(if state.last_applied_row_key.is_some() {
506            4usize.saturating_add(row_key_len)
507        } else {
508            0
509        });
510    if encoded_len > MAX_MIGRATION_STATE_BYTES {
511        return Err(InternalError::migration_state_serialize_failed(format!(
512            "payload size {encoded_len} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
513        )));
514    }
515
516    let migration_id_len = u32::try_from(state.migration_id.len()).map_err(|_| {
517        InternalError::migration_state_serialize_failed("migration_id exceeds u32 length")
518    })?;
519    let row_key_len_u32 = u32::try_from(row_key_len).map_err(|_| {
520        InternalError::migration_state_serialize_failed("last_applied_row_key exceeds u32 length")
521    })?;
522
523    // Phase 2: write the fixed binary payload in one exact stable order.
524    let mut encoded = Vec::with_capacity(encoded_len);
525    encoded.extend_from_slice(&MIGRATION_STATE_MAGIC);
526    encoded.push(MIGRATION_STATE_VERSION_CURRENT);
527    encoded.extend_from_slice(&migration_id_len.to_be_bytes());
528    encoded.extend_from_slice(state.migration_id.as_bytes());
529    encoded.extend_from_slice(&state.migration_version.to_be_bytes());
530    encoded.extend_from_slice(&state.step_index.to_be_bytes());
531
532    match state.last_applied_row_key.as_ref() {
533        Some(row_key) => {
534            encoded.push(MIGRATION_STATE_SOME_ROW_KEY_TAG);
535            encoded.extend_from_slice(&row_key_len_u32.to_be_bytes());
536            encoded.extend_from_slice(row_key);
537        }
538        None => encoded.push(MIGRATION_STATE_NONE_ROW_KEY_TAG),
539    }
540
541    Ok(encoded)
542}
543
544// Decode and validate the fixed migration-state magic prefix.
545fn decode_migration_state_magic(bytes: &mut &[u8]) -> Result<(), InternalError> {
546    let magic = take_migration_state_bytes(bytes, MIGRATION_STATE_MAGIC.len(), "magic")?;
547    if magic != MIGRATION_STATE_MAGIC {
548        return Err(InternalError::serialize_corruption(
549            "migration state decode failed: invalid magic",
550        ));
551    }
552
553    Ok(())
554}
555
556// Decode one fixed-width u8 field from the migration-state payload.
557fn decode_migration_state_u8(bytes: &mut &[u8], label: &'static str) -> Result<u8, InternalError> {
558    Ok(take_migration_state_bytes(bytes, 1, label)?[0])
559}
560
561// Decode one fixed-width u64 field from the migration-state payload.
562fn decode_migration_state_u64(
563    bytes: &mut &[u8],
564    label: &'static str,
565) -> Result<u64, InternalError> {
566    let raw = take_migration_state_bytes(bytes, 8, label)?;
567
568    Ok(u64::from_be_bytes([
569        raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
570    ]))
571}
572
573// Decode one length-prefixed UTF-8 string field from the migration-state payload.
574fn decode_migration_state_string(
575    bytes: &mut &[u8],
576    label: &'static str,
577) -> Result<String, InternalError> {
578    let raw = decode_migration_state_length_prefixed_bytes(bytes, label)?;
579
580    String::from_utf8(raw.to_vec()).map_err(|_| {
581        InternalError::serialize_corruption(format!(
582            "migration state decode failed: {label} is not valid UTF-8",
583        ))
584    })
585}
586
587// Decode one tagged optional byte vector from the migration-state payload.
588fn decode_migration_state_optional_bytes(
589    bytes: &mut &[u8],
590    label: &'static str,
591) -> Result<Option<Vec<u8>>, InternalError> {
592    let tag = decode_migration_state_u8(bytes, label)?;
593
594    match tag {
595        MIGRATION_STATE_NONE_ROW_KEY_TAG => Ok(None),
596        MIGRATION_STATE_SOME_ROW_KEY_TAG => Ok(Some(
597            decode_migration_state_length_prefixed_bytes(bytes, label)?.to_vec(),
598        )),
599        _ => Err(InternalError::serialize_corruption(format!(
600            "migration state decode failed: invalid {label} tag {tag}",
601        ))),
602    }
603}
604
605// Decode one length-prefixed byte slice from the migration-state payload.
606fn decode_migration_state_length_prefixed_bytes<'a>(
607    bytes: &mut &'a [u8],
608    label: &'static str,
609) -> Result<&'a [u8], InternalError> {
610    let raw_len = take_migration_state_bytes(bytes, 4, label)?;
611    let len = usize::try_from(u32::from_be_bytes([
612        raw_len[0], raw_len[1], raw_len[2], raw_len[3],
613    ]))
614    .map_err(|_| {
615        InternalError::serialize_corruption(format!(
616            "migration state decode failed: {label} length out of range",
617        ))
618    })?;
619
620    take_migration_state_bytes(bytes, len, label)
621}
622
623// Borrow one exact byte span from the migration-state payload.
624fn take_migration_state_bytes<'a>(
625    bytes: &mut &'a [u8],
626    len: usize,
627    label: &'static str,
628) -> Result<&'a [u8], InternalError> {
629    if bytes.len() < len {
630        return Err(InternalError::serialize_corruption(format!(
631            "migration state decode failed: truncated {label}",
632        )));
633    }
634
635    let (head, tail) = bytes.split_at(len);
636    *bytes = tail;
637
638    Ok(head)
639}
640
641// Validate the single supported migration-state format version.
642fn validate_migration_state_format_version(format_version: u8) -> Result<(), InternalError> {
643    if format_version == MIGRATION_STATE_VERSION_CURRENT {
644        return Ok(());
645    }
646
647    Err(InternalError::serialize_incompatible_persisted_format(
648        format!(
649            "migration state format version {format_version} is unsupported by runtime version {MIGRATION_STATE_VERSION_CURRENT}",
650        ),
651    ))
652}
653
654fn execute_migration_step<C: CanisterKind>(
655    db: &Db<C>,
656    plan: &MigrationPlan,
657    step_index: usize,
658    step: &MigrationStep,
659    next_state_bytes: Vec<u8>,
660) -> Result<(), InternalError> {
661    // Phase 1: persist marker authority + next-step cursor state atomically.
662    let marker = CommitMarker::new(step.row_ops.clone())
663        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
664    let commit = begin_commit_with_migration_state(marker, next_state_bytes)
665        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
666
667    // Phase 2: apply step row ops under commit-window durability semantics.
668    finish_commit(commit, |_| apply_marker_row_ops(db, &step.row_ops))
669        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
670
671    Ok(())
672}
673
674fn apply_marker_row_ops<C: CanisterKind>(
675    db: &Db<C>,
676    row_ops: &[CommitRowOp],
677) -> Result<(), InternalError> {
678    // Phase 1: pre-prepare all row operations before mutating stores.
679    let mut prepared = Vec::with_capacity(row_ops.len());
680    for row_op in row_ops {
681        prepared.push(db.prepare_row_commit_op(row_op)?);
682    }
683
684    // Phase 2: apply the prepared operations in stable marker order.
685    for prepared_op in prepared {
686        prepared_op.apply();
687    }
688
689    Ok(())
690}
691
692fn annotate_step_error(
693    plan: &MigrationPlan,
694    step_index: usize,
695    step_name: &str,
696    err: InternalError,
697) -> InternalError {
698    let source_message = err.message().to_string();
699
700    err.with_message(format!(
701        "migration '{}' step {} ('{}') failed: {}",
702        plan.id(),
703        step_index,
704        step_name,
705        source_message,
706    ))
707}
708
709fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
710    if value.trim().is_empty() {
711        return Err(InternalError::migration_label_empty(label));
712    }
713
714    Ok(())
715}