Skip to main content

icydb_core/db/migration/
mod.rs

1//! Module: db::migration
2//! Responsibility: explicit migration plan contracts and commit-marker-backed execution.
3//! Does not own: migration row-op derivation policy or schema transformation design.
4//! Boundary: callers provide explicit row-op steps; this module executes them durably.
5
6#[cfg(test)]
7mod tests;
8
9use crate::{
10    db::{
11        Db,
12        codec::deserialize_persisted_payload,
13        commit::{
14            CommitMarker, CommitRowOp, begin_commit_with_migration_state,
15            clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
16        },
17    },
18    error::InternalError,
19    serialize::serialize,
20    traits::CanisterKind,
21};
22use serde::{Deserialize, Serialize};
23
24const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
25
26///
27/// MigrationCursor
28///
29/// Explicit migration resume cursor.
30/// This cursor tracks the next step index to execute in one migration plan.
31/// The migration runtime persists this cursor durably between executions.
32///
33
34#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
35pub struct MigrationCursor {
36    next_step: usize,
37}
38
39impl MigrationCursor {
40    /// Construct the starting migration cursor.
41    #[must_use]
42    pub const fn start() -> Self {
43        Self { next_step: 0 }
44    }
45
46    /// Return the next migration step index to execute.
47    #[must_use]
48    pub const fn next_step(self) -> usize {
49        self.next_step
50    }
51
52    const fn from_step(step_index: usize) -> Self {
53        Self {
54            next_step: step_index,
55        }
56    }
57
58    // Advance one step after successful migration-step execution.
59    const fn advance(self) -> Self {
60        Self {
61            next_step: self.next_step.saturating_add(1),
62        }
63    }
64}
65
66///
67/// PersistedMigrationState
68///
69/// Durable migration-progress record stored in commit control state.
70/// `step_index` stores the next step to execute for `migration_id` and
71/// `migration_version`.
72/// `last_applied_row_key` records the last row key from the last successful
73/// migration step for diagnostics and recovery observability.
74///
75
76#[derive(Clone, Debug, Deserialize, Serialize)]
77#[serde(deny_unknown_fields)]
78struct PersistedMigrationState {
79    migration_id: String,
80    migration_version: u64,
81    step_index: u64,
82    last_applied_row_key: Option<Vec<u8>>,
83}
84
85///
86/// MigrationRowOp
87///
88/// Public migration row operation DTO used to build explicit migration steps.
89/// This DTO mirrors commit row-op payload shape without exposing commit internals.
90/// Migration execution converts these DTOs into commit marker row operations.
91///
92
93#[derive(Clone, Debug)]
94pub struct MigrationRowOp {
95    /// Runtime entity path resolved by commit runtime hooks during execution.
96    pub entity_path: String,
97    /// Encoded raw data key bytes for target row identity.
98    pub key: Vec<u8>,
99    /// Optional encoded before-image row payload.
100    pub before: Option<Vec<u8>>,
101    /// Optional encoded after-image row payload.
102    pub after: Option<Vec<u8>>,
103    /// Schema fingerprint expected by commit prepare/replay for this row op.
104    pub schema_fingerprint: [u8; 16],
105}
106
107impl MigrationRowOp {
108    /// Construct one migration row operation DTO.
109    #[must_use]
110    pub fn new(
111        entity_path: impl Into<String>,
112        key: Vec<u8>,
113        before: Option<Vec<u8>>,
114        after: Option<Vec<u8>>,
115        schema_fingerprint: [u8; 16],
116    ) -> Self {
117        Self {
118            entity_path: entity_path.into(),
119            key,
120            before,
121            after,
122            schema_fingerprint,
123        }
124    }
125}
126
127impl From<MigrationRowOp> for CommitRowOp {
128    fn from(op: MigrationRowOp) -> Self {
129        Self::new(
130            op.entity_path,
131            op.key,
132            op.before,
133            op.after,
134            op.schema_fingerprint,
135        )
136    }
137}
138
139///
140/// MigrationStep
141///
142/// One explicit migration step represented as ordered commit row operations.
143/// Step ordering is deterministic and preserved exactly at execution time.
144/// Empty step names and empty row-op vectors are rejected by constructor.
145///
146
147#[derive(Clone, Debug)]
148pub struct MigrationStep {
149    name: String,
150    row_ops: Vec<CommitRowOp>,
151}
152
153impl MigrationStep {
154    /// Build one validated migration step from public migration row-op DTOs.
155    pub fn from_row_ops(
156        name: impl Into<String>,
157        row_ops: Vec<MigrationRowOp>,
158    ) -> Result<Self, InternalError> {
159        let commit_row_ops = row_ops.into_iter().map(CommitRowOp::from).collect();
160        Self::new(name, commit_row_ops)
161    }
162
163    /// Build one validated migration step.
164    pub(in crate::db) fn new(
165        name: impl Into<String>,
166        row_ops: Vec<CommitRowOp>,
167    ) -> Result<Self, InternalError> {
168        let name = name.into();
169        validate_non_empty_label(name.as_str(), "migration step name")?;
170
171        if row_ops.is_empty() {
172            return Err(InternalError::store_unsupported(format!(
173                "migration step '{name}' must include at least one row op",
174            )));
175        }
176
177        Ok(Self { name, row_ops })
178    }
179
180    /// Return this step's stable display name.
181    #[must_use]
182    pub const fn name(&self) -> &str {
183        self.name.as_str()
184    }
185
186    /// Return the number of row operations in this step.
187    #[must_use]
188    pub const fn row_op_count(&self) -> usize {
189        self.row_ops.len()
190    }
191}
192
193///
194/// MigrationPlan
195///
196/// Explicit, ordered migration contract composed of named row-op steps.
197/// The plan id is stable caller-owned metadata for observability and retries.
198/// The plan version is caller-owned monotonic metadata for upgrade safety.
199/// Steps are executed sequentially in insertion order and never reordered.
200///
201
202#[derive(Clone, Debug)]
203pub struct MigrationPlan {
204    id: String,
205    version: u64,
206    steps: Vec<MigrationStep>,
207}
208
209impl MigrationPlan {
210    /// Build one validated migration plan.
211    pub fn new(
212        id: impl Into<String>,
213        version: u64,
214        steps: Vec<MigrationStep>,
215    ) -> Result<Self, InternalError> {
216        let id = id.into();
217        validate_non_empty_label(id.as_str(), "migration plan id")?;
218        if version == 0 {
219            return Err(InternalError::store_unsupported(format!(
220                "migration plan '{id}' version must be > 0",
221            )));
222        }
223
224        if steps.is_empty() {
225            return Err(InternalError::store_unsupported(format!(
226                "migration plan '{id}' must include at least one step",
227            )));
228        }
229
230        Ok(Self { id, version, steps })
231    }
232
233    /// Return this plan's stable id.
234    #[must_use]
235    pub const fn id(&self) -> &str {
236        self.id.as_str()
237    }
238
239    /// Return this plan's stable version.
240    #[must_use]
241    pub const fn version(&self) -> u64 {
242        self.version
243    }
244
245    /// Return the number of steps in this plan.
246    #[must_use]
247    pub const fn len(&self) -> usize {
248        self.steps.len()
249    }
250
251    /// Return whether this plan has no steps.
252    #[must_use]
253    pub const fn is_empty(&self) -> bool {
254        self.steps.is_empty()
255    }
256
257    fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
258        self.steps.get(index).ok_or_else(|| {
259            InternalError::store_unsupported(format!(
260                "migration '{}@{}' cursor out of bounds: next_step={} total_steps={}",
261                self.id(),
262                self.version(),
263                index,
264                self.len(),
265            ))
266        })
267    }
268}
269
270///
271/// MigrationRunState
272///
273/// Bounded migration-execution completion status.
274///
275
276#[derive(Clone, Copy, Debug, Eq, PartialEq)]
277pub enum MigrationRunState {
278    /// No remaining steps; migration plan is complete at returned cursor.
279    Complete,
280    /// Remaining steps exist; rerun the same plan to resume from durable state.
281    NeedsResume,
282}
283
284///
285/// MigrationRunOutcome
286///
287/// Summary of one bounded migration-execution run.
288/// This captures the next cursor plus applied-step/row-op counters.
289/// Durable cursor persistence is internal to migration runtime state.
290///
291
292#[derive(Clone, Copy, Debug, Eq, PartialEq)]
293pub struct MigrationRunOutcome {
294    cursor: MigrationCursor,
295    applied_steps: usize,
296    applied_row_ops: usize,
297    state: MigrationRunState,
298}
299
300impl MigrationRunOutcome {
301    const fn new(
302        cursor: MigrationCursor,
303        applied_steps: usize,
304        applied_row_ops: usize,
305        state: MigrationRunState,
306    ) -> Self {
307        Self {
308            cursor,
309            applied_steps,
310            applied_row_ops,
311            state,
312        }
313    }
314
315    /// Return the next migration cursor.
316    #[must_use]
317    pub const fn cursor(self) -> MigrationCursor {
318        self.cursor
319    }
320
321    /// Return the number of steps applied in this bounded run.
322    #[must_use]
323    pub const fn applied_steps(self) -> usize {
324        self.applied_steps
325    }
326
327    /// Return the number of row ops applied in this bounded run.
328    #[must_use]
329    pub const fn applied_row_ops(self) -> usize {
330        self.applied_row_ops
331    }
332
333    /// Return bounded-run completion state.
334    #[must_use]
335    pub const fn state(self) -> MigrationRunState {
336        self.state
337    }
338}
339
340/// Execute one bounded migration run from durable internal cursor state.
341///
342/// Contract:
343/// - always runs commit recovery before applying migration steps
344/// - executes at most `max_steps` deterministic steps in-order
345/// - each step is persisted through commit-marker protocol
346/// - migration cursor progress is atomically persisted with step marker writes
347/// - step failures preserve marker authority for explicit fail-closed recovery
348pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
349    db: &Db<C>,
350    plan: &MigrationPlan,
351    max_steps: usize,
352) -> Result<MigrationRunOutcome, InternalError> {
353    // Phase 1: validate run-shape controls before touching commit state.
354    if max_steps == 0 {
355        return Err(InternalError::store_unsupported(format!(
356            "migration '{}' execution requires max_steps > 0",
357            plan.id(),
358        )));
359    }
360
361    // Phase 2: recover any in-flight commit marker before migration execution.
362    db.ensure_recovered_state()?;
363
364    // Phase 3: load durable migration cursor state from commit control storage.
365    let mut next_cursor = load_durable_cursor_for_plan(plan)?;
366
367    // Phase 4: execute a bounded number of deterministic migration steps.
368    let mut applied_steps = 0usize;
369    let mut applied_row_ops = 0usize;
370    while applied_steps < max_steps {
371        if next_cursor.next_step() >= plan.len() {
372            break;
373        }
374
375        let step_index = next_cursor.next_step();
376        let step = plan.step_at(step_index)?;
377        let next_cursor_after_step = next_cursor.advance();
378        let next_state_bytes =
379            encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
380        execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
381
382        applied_steps = applied_steps.saturating_add(1);
383        applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
384        next_cursor = next_cursor_after_step;
385    }
386
387    let state = if next_cursor.next_step() == plan.len() {
388        clear_migration_state_bytes()?;
389        MigrationRunState::Complete
390    } else {
391        MigrationRunState::NeedsResume
392    };
393
394    Ok(MigrationRunOutcome::new(
395        next_cursor,
396        applied_steps,
397        applied_row_ops,
398        state,
399    ))
400}
401
402fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
403    let Some(bytes) = load_migration_state_bytes()? else {
404        return Ok(MigrationCursor::start());
405    };
406    let state = decode_persisted_migration_state(&bytes)?;
407    if state.migration_id != plan.id() || state.migration_version != plan.version() {
408        return Err(InternalError::store_unsupported(format!(
409            "migration '{}@{}' cannot execute while migration '{}@{}' is in progress",
410            plan.id(),
411            plan.version(),
412            state.migration_id,
413            state.migration_version,
414        )));
415    }
416
417    let step_index = usize::try_from(state.step_index).map_err(|_| {
418        InternalError::store_corruption(format!(
419            "migration '{}@{}' persisted step index does not fit runtime usize: {}",
420            plan.id(),
421            plan.version(),
422            state.step_index,
423        ))
424    })?;
425    if step_index > plan.len() {
426        return Err(InternalError::store_corruption(format!(
427            "migration '{}@{}' persisted step index out of bounds: {} > {}",
428            plan.id(),
429            plan.version(),
430            step_index,
431            plan.len(),
432        )));
433    }
434
435    if step_index == plan.len() {
436        clear_migration_state_bytes()?;
437    }
438
439    Ok(MigrationCursor::from_step(step_index))
440}
441
442fn encode_durable_cursor_state(
443    plan: &MigrationPlan,
444    cursor: MigrationCursor,
445    last_applied_row_op: Option<&CommitRowOp>,
446) -> Result<Vec<u8>, InternalError> {
447    let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
448        InternalError::store_internal(format!(
449            "migration '{}@{}' next step index does not fit persisted u64 cursor",
450            plan.id(),
451            plan.version(),
452        ))
453    })?;
454    let state = PersistedMigrationState {
455        migration_id: plan.id().to_string(),
456        migration_version: plan.version(),
457        step_index,
458        last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.clone()),
459    };
460
461    encode_persisted_migration_state(&state)
462}
463
464fn decode_persisted_migration_state(
465    bytes: &[u8],
466) -> Result<PersistedMigrationState, InternalError> {
467    deserialize_persisted_payload::<PersistedMigrationState>(
468        bytes,
469        MAX_MIGRATION_STATE_BYTES,
470        "migration state",
471    )
472}
473
474fn encode_persisted_migration_state(
475    state: &PersistedMigrationState,
476) -> Result<Vec<u8>, InternalError> {
477    serialize(state).map_err(|err| {
478        InternalError::serialize_internal(format!("failed to serialize migration state: {err}"))
479    })
480}
481
482fn execute_migration_step<C: CanisterKind>(
483    db: &Db<C>,
484    plan: &MigrationPlan,
485    step_index: usize,
486    step: &MigrationStep,
487    next_state_bytes: Vec<u8>,
488) -> Result<(), InternalError> {
489    // Phase 1: persist marker authority + next-step cursor state atomically.
490    let marker = CommitMarker::new(step.row_ops.clone())
491        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
492    let commit = begin_commit_with_migration_state(marker, next_state_bytes)
493        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
494
495    // Phase 2: apply step row ops under commit-window durability semantics.
496    finish_commit(commit, |guard| {
497        apply_marker_row_ops(db, &guard.marker.row_ops)
498    })
499    .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
500
501    Ok(())
502}
503
504fn apply_marker_row_ops<C: CanisterKind>(
505    db: &Db<C>,
506    row_ops: &[CommitRowOp],
507) -> Result<(), InternalError> {
508    // Phase 1: pre-prepare all row operations before mutating stores.
509    let mut prepared = Vec::with_capacity(row_ops.len());
510    for row_op in row_ops {
511        prepared.push(db.prepare_row_commit_op(row_op)?);
512    }
513
514    // Phase 2: apply the prepared operations in stable marker order.
515    for prepared_op in prepared {
516        prepared_op.apply();
517    }
518
519    Ok(())
520}
521
522fn annotate_step_error(
523    plan: &MigrationPlan,
524    step_index: usize,
525    step_name: &str,
526    err: InternalError,
527) -> InternalError {
528    let source_message = err.message().to_string();
529
530    err.with_message(format!(
531        "migration '{}' step {} ('{}') failed: {}",
532        plan.id(),
533        step_index,
534        step_name,
535        source_message,
536    ))
537}
538
539fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
540    if value.trim().is_empty() {
541        return Err(InternalError::store_unsupported(format!(
542            "{label} cannot be empty",
543        )));
544    }
545
546    Ok(())
547}