Skip to main content

icydb_core/db/migration/
mod.rs

1//! Module: db::migration
2//! Responsibility: explicit migration plan contracts and commit-marker-backed execution.
3//! Does not own: migration row-op derivation policy or schema transformation design.
4//! Boundary: callers provide explicit row-op steps; this module executes them durably.
5
6#[cfg(test)]
7mod tests;
8
9use crate::{
10    db::{
11        Db,
12        codec::deserialize_persisted_payload,
13        commit::{
14            CommitMarker, CommitRowOp, begin_commit_with_migration_state,
15            clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
16        },
17    },
18    error::InternalError,
19    serialize::serialize,
20    traits::CanisterKind,
21};
22use serde::{Deserialize, Serialize};
23
24const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
25
26///
27/// MigrationCursor
28///
29/// Explicit migration resume cursor.
30/// This cursor tracks the next step index to execute in one migration plan.
31/// The migration runtime persists this cursor durably between executions.
32///
33
34#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
35pub struct MigrationCursor {
36    next_step: usize,
37}
38
39impl MigrationCursor {
40    /// Construct the starting migration cursor.
41    #[must_use]
42    pub const fn start() -> Self {
43        Self { next_step: 0 }
44    }
45
46    /// Return the next migration step index to execute.
47    #[must_use]
48    pub const fn next_step(self) -> usize {
49        self.next_step
50    }
51
52    const fn from_step(step_index: usize) -> Self {
53        Self {
54            next_step: step_index,
55        }
56    }
57
58    // Advance one step after successful migration-step execution.
59    const fn advance(self) -> Self {
60        Self {
61            next_step: self.next_step.saturating_add(1),
62        }
63    }
64}
65
66///
67/// PersistedMigrationState
68///
69/// Durable migration-progress record stored in commit control state.
70/// `step_index` stores the next step to execute for `migration_id` and
71/// `migration_version`.
72/// `last_applied_row_key` records the last row key from the last successful
73/// migration step for diagnostics and recovery observability.
74///
75
76#[derive(Clone, Debug, Deserialize, Serialize)]
77#[serde(deny_unknown_fields)]
78struct PersistedMigrationState {
79    migration_id: String,
80    migration_version: u64,
81    step_index: u64,
82    last_applied_row_key: Option<Vec<u8>>,
83}
84
85///
86/// MigrationRowOp
87///
88/// Public migration row operation DTO used to build explicit migration steps.
89/// This DTO mirrors commit row-op payload shape without exposing commit internals.
90/// Migration execution converts these DTOs into commit marker row operations.
91///
92
93#[derive(Clone, Debug)]
94pub struct MigrationRowOp {
95    /// Runtime entity path resolved by commit runtime hooks during execution.
96    pub entity_path: String,
97    /// Encoded raw data key bytes for target row identity.
98    pub key: Vec<u8>,
99    /// Optional encoded before-image row payload.
100    pub before: Option<Vec<u8>>,
101    /// Optional encoded after-image row payload.
102    pub after: Option<Vec<u8>>,
103    /// Schema fingerprint expected by commit prepare/replay for this row op.
104    pub schema_fingerprint: [u8; 16],
105}
106
107impl MigrationRowOp {
108    /// Construct one migration row operation DTO.
109    #[must_use]
110    pub fn new(
111        entity_path: impl Into<String>,
112        key: Vec<u8>,
113        before: Option<Vec<u8>>,
114        after: Option<Vec<u8>>,
115        schema_fingerprint: [u8; 16],
116    ) -> Self {
117        Self {
118            entity_path: entity_path.into(),
119            key,
120            before,
121            after,
122            schema_fingerprint,
123        }
124    }
125}
126
127impl From<MigrationRowOp> for CommitRowOp {
128    fn from(op: MigrationRowOp) -> Self {
129        Self::new(
130            op.entity_path,
131            op.key,
132            op.before,
133            op.after,
134            op.schema_fingerprint,
135        )
136    }
137}
138
139///
140/// MigrationStep
141///
142/// One explicit migration step represented as ordered commit row operations.
143/// Step ordering is deterministic and preserved exactly at execution time.
144/// Empty step names and empty row-op vectors are rejected by constructor.
145///
146
147#[derive(Clone, Debug)]
148pub struct MigrationStep {
149    name: String,
150    row_ops: Vec<CommitRowOp>,
151}
152
153impl MigrationStep {
154    /// Build one validated migration step from public migration row-op DTOs.
155    pub fn from_row_ops(
156        name: impl Into<String>,
157        row_ops: Vec<MigrationRowOp>,
158    ) -> Result<Self, InternalError> {
159        let commit_row_ops = row_ops.into_iter().map(CommitRowOp::from).collect();
160        Self::new(name, commit_row_ops)
161    }
162
163    /// Build one validated migration step.
164    pub(in crate::db) fn new(
165        name: impl Into<String>,
166        row_ops: Vec<CommitRowOp>,
167    ) -> Result<Self, InternalError> {
168        let name = name.into();
169        validate_non_empty_label(name.as_str(), "migration step name")?;
170
171        if row_ops.is_empty() {
172            return Err(InternalError::migration_step_row_ops_required(&name));
173        }
174
175        Ok(Self { name, row_ops })
176    }
177
178    /// Return this step's stable display name.
179    #[must_use]
180    pub const fn name(&self) -> &str {
181        self.name.as_str()
182    }
183
184    /// Return the number of row operations in this step.
185    #[must_use]
186    pub const fn row_op_count(&self) -> usize {
187        self.row_ops.len()
188    }
189}
190
191///
192/// MigrationPlan
193///
194/// Explicit, ordered migration contract composed of named row-op steps.
195/// The plan id is stable caller-owned metadata for observability and retries.
196/// The plan version is caller-owned monotonic metadata for upgrade safety.
197/// Steps are executed sequentially in insertion order and never reordered.
198///
199
200#[derive(Clone, Debug)]
201pub struct MigrationPlan {
202    id: String,
203    version: u64,
204    steps: Vec<MigrationStep>,
205}
206
207impl MigrationPlan {
208    /// Build one validated migration plan.
209    pub fn new(
210        id: impl Into<String>,
211        version: u64,
212        steps: Vec<MigrationStep>,
213    ) -> Result<Self, InternalError> {
214        let id = id.into();
215        validate_non_empty_label(id.as_str(), "migration plan id")?;
216        if version == 0 {
217            return Err(InternalError::migration_plan_version_required(&id));
218        }
219
220        if steps.is_empty() {
221            return Err(InternalError::migration_plan_steps_required(&id));
222        }
223
224        Ok(Self { id, version, steps })
225    }
226
227    /// Return this plan's stable id.
228    #[must_use]
229    pub const fn id(&self) -> &str {
230        self.id.as_str()
231    }
232
233    /// Return this plan's stable version.
234    #[must_use]
235    pub const fn version(&self) -> u64 {
236        self.version
237    }
238
239    /// Return the number of steps in this plan.
240    #[must_use]
241    pub const fn len(&self) -> usize {
242        self.steps.len()
243    }
244
245    /// Return whether this plan has no steps.
246    #[must_use]
247    pub const fn is_empty(&self) -> bool {
248        self.steps.is_empty()
249    }
250
251    fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
252        self.steps.get(index).ok_or_else(|| {
253            InternalError::migration_cursor_out_of_bounds(
254                self.id(),
255                self.version(),
256                index,
257                self.len(),
258            )
259        })
260    }
261}
262
263///
264/// MigrationRunState
265///
266/// Bounded migration-execution completion status.
267///
268
269#[derive(Clone, Copy, Debug, Eq, PartialEq)]
270pub enum MigrationRunState {
271    /// No remaining steps; migration plan is complete at returned cursor.
272    Complete,
273    /// Remaining steps exist; rerun the same plan to resume from durable state.
274    NeedsResume,
275}
276
277///
278/// MigrationRunOutcome
279///
280/// Summary of one bounded migration-execution run.
281/// This captures the next cursor plus applied-step/row-op counters.
282/// Durable cursor persistence is internal to migration runtime state.
283///
284
285#[derive(Clone, Copy, Debug, Eq, PartialEq)]
286pub struct MigrationRunOutcome {
287    cursor: MigrationCursor,
288    applied_steps: usize,
289    applied_row_ops: usize,
290    state: MigrationRunState,
291}
292
293impl MigrationRunOutcome {
294    const fn new(
295        cursor: MigrationCursor,
296        applied_steps: usize,
297        applied_row_ops: usize,
298        state: MigrationRunState,
299    ) -> Self {
300        Self {
301            cursor,
302            applied_steps,
303            applied_row_ops,
304            state,
305        }
306    }
307
308    /// Return the next migration cursor.
309    #[must_use]
310    pub const fn cursor(self) -> MigrationCursor {
311        self.cursor
312    }
313
314    /// Return the number of steps applied in this bounded run.
315    #[must_use]
316    pub const fn applied_steps(self) -> usize {
317        self.applied_steps
318    }
319
320    /// Return the number of row ops applied in this bounded run.
321    #[must_use]
322    pub const fn applied_row_ops(self) -> usize {
323        self.applied_row_ops
324    }
325
326    /// Return bounded-run completion state.
327    #[must_use]
328    pub const fn state(self) -> MigrationRunState {
329        self.state
330    }
331}
332
333/// Execute one bounded migration run from durable internal cursor state.
334///
335/// Contract:
336/// - always runs commit recovery before applying migration steps
337/// - executes at most `max_steps` deterministic steps in-order
338/// - each step is persisted through commit-marker protocol
339/// - migration cursor progress is atomically persisted with step marker writes
340/// - step failures preserve marker authority for explicit fail-closed recovery
341pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
342    db: &Db<C>,
343    plan: &MigrationPlan,
344    max_steps: usize,
345) -> Result<MigrationRunOutcome, InternalError> {
346    // Phase 1: validate run-shape controls before touching commit state.
347    if max_steps == 0 {
348        return Err(InternalError::migration_execution_requires_max_steps(
349            plan.id(),
350        ));
351    }
352
353    // Phase 2: recover any in-flight commit marker before migration execution.
354    db.ensure_recovered_state()?;
355
356    // Phase 3: load durable migration cursor state from commit control storage.
357    let mut next_cursor = load_durable_cursor_for_plan(plan)?;
358
359    // Phase 4: execute a bounded number of deterministic migration steps.
360    let mut applied_steps = 0usize;
361    let mut applied_row_ops = 0usize;
362    while applied_steps < max_steps {
363        if next_cursor.next_step() >= plan.len() {
364            break;
365        }
366
367        let step_index = next_cursor.next_step();
368        let step = plan.step_at(step_index)?;
369        let next_cursor_after_step = next_cursor.advance();
370        let next_state_bytes =
371            encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
372        execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
373
374        applied_steps = applied_steps.saturating_add(1);
375        applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
376        next_cursor = next_cursor_after_step;
377    }
378
379    let state = if next_cursor.next_step() == plan.len() {
380        clear_migration_state_bytes()?;
381        MigrationRunState::Complete
382    } else {
383        MigrationRunState::NeedsResume
384    };
385
386    Ok(MigrationRunOutcome::new(
387        next_cursor,
388        applied_steps,
389        applied_row_ops,
390        state,
391    ))
392}
393
394fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
395    let Some(bytes) = load_migration_state_bytes()? else {
396        return Ok(MigrationCursor::start());
397    };
398    let state = decode_persisted_migration_state(&bytes)?;
399    if state.migration_id != plan.id() || state.migration_version != plan.version() {
400        return Err(InternalError::migration_in_progress_conflict(
401            plan.id(),
402            plan.version(),
403            &state.migration_id,
404            state.migration_version,
405        ));
406    }
407
408    let step_index = usize::try_from(state.step_index).map_err(|_| {
409        InternalError::migration_persisted_step_index_invalid_usize(
410            plan.id(),
411            plan.version(),
412            state.step_index,
413        )
414    })?;
415    if step_index > plan.len() {
416        return Err(InternalError::migration_persisted_step_index_out_of_bounds(
417            plan.id(),
418            plan.version(),
419            step_index,
420            plan.len(),
421        ));
422    }
423
424    if step_index == plan.len() {
425        clear_migration_state_bytes()?;
426    }
427
428    Ok(MigrationCursor::from_step(step_index))
429}
430
431fn encode_durable_cursor_state(
432    plan: &MigrationPlan,
433    cursor: MigrationCursor,
434    last_applied_row_op: Option<&CommitRowOp>,
435) -> Result<Vec<u8>, InternalError> {
436    let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
437        InternalError::migration_next_step_index_u64_required(plan.id(), plan.version())
438    })?;
439    let state = PersistedMigrationState {
440        migration_id: plan.id().to_string(),
441        migration_version: plan.version(),
442        step_index,
443        last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.clone()),
444    };
445
446    encode_persisted_migration_state(&state)
447}
448
449fn decode_persisted_migration_state(
450    bytes: &[u8],
451) -> Result<PersistedMigrationState, InternalError> {
452    deserialize_persisted_payload::<PersistedMigrationState>(
453        bytes,
454        MAX_MIGRATION_STATE_BYTES,
455        "migration state",
456    )
457}
458
459fn encode_persisted_migration_state(
460    state: &PersistedMigrationState,
461) -> Result<Vec<u8>, InternalError> {
462    serialize(state).map_err(InternalError::migration_state_serialize_failed)
463}
464
465fn execute_migration_step<C: CanisterKind>(
466    db: &Db<C>,
467    plan: &MigrationPlan,
468    step_index: usize,
469    step: &MigrationStep,
470    next_state_bytes: Vec<u8>,
471) -> Result<(), InternalError> {
472    // Phase 1: persist marker authority + next-step cursor state atomically.
473    let marker = CommitMarker::new(step.row_ops.clone())
474        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
475    let commit = begin_commit_with_migration_state(marker, next_state_bytes)
476        .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
477
478    // Phase 2: apply step row ops under commit-window durability semantics.
479    finish_commit(commit, |guard| {
480        apply_marker_row_ops(db, &guard.marker.row_ops)
481    })
482    .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
483
484    Ok(())
485}
486
487fn apply_marker_row_ops<C: CanisterKind>(
488    db: &Db<C>,
489    row_ops: &[CommitRowOp],
490) -> Result<(), InternalError> {
491    // Phase 1: pre-prepare all row operations before mutating stores.
492    let mut prepared = Vec::with_capacity(row_ops.len());
493    for row_op in row_ops {
494        prepared.push(db.prepare_row_commit_op(row_op)?);
495    }
496
497    // Phase 2: apply the prepared operations in stable marker order.
498    for prepared_op in prepared {
499        prepared_op.apply();
500    }
501
502    Ok(())
503}
504
505fn annotate_step_error(
506    plan: &MigrationPlan,
507    step_index: usize,
508    step_name: &str,
509    err: InternalError,
510) -> InternalError {
511    let source_message = err.message().to_string();
512
513    err.with_message(format!(
514        "migration '{}' step {} ('{}') failed: {}",
515        plan.id(),
516        step_index,
517        step_name,
518        source_message,
519    ))
520}
521
522fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
523    if value.trim().is_empty() {
524        return Err(InternalError::migration_label_empty(label));
525    }
526
527    Ok(())
528}