1#[cfg(test)]
7mod tests;
8
9use crate::{
10 db::{
11 Db,
12 commit::{
13 CommitMarker, CommitRowOp, begin_commit_with_migration_state,
14 clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
15 },
16 },
17 error::InternalError,
18 traits::CanisterKind,
19};
20
21const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
22const MIGRATION_STATE_MAGIC: [u8; 2] = *b"MS";
23const MIGRATION_STATE_VERSION_CURRENT: u8 = 1;
24const MIGRATION_STATE_NONE_ROW_KEY_TAG: u8 = 0;
25const MIGRATION_STATE_SOME_ROW_KEY_TAG: u8 = 1;
26
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
36pub struct MigrationCursor {
37 next_step: usize,
38}
39
40impl MigrationCursor {
41 #[must_use]
43 pub const fn start() -> Self {
44 Self { next_step: 0 }
45 }
46
47 #[must_use]
49 pub const fn next_step(self) -> usize {
50 self.next_step
51 }
52
53 const fn from_step(step_index: usize) -> Self {
54 Self {
55 next_step: step_index,
56 }
57 }
58
59 const fn advance(self) -> Self {
61 Self {
62 next_step: self.next_step.saturating_add(1),
63 }
64 }
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
78struct PersistedMigrationState {
79 migration_id: String,
80 migration_version: u64,
81 step_index: u64,
82 last_applied_row_key: Option<Vec<u8>>,
83}
84
85#[derive(Clone, Debug)]
94pub struct MigrationRowOp {
95 pub entity_path: String,
97 pub key: Vec<u8>,
99 pub before: Option<Vec<u8>>,
101 pub after: Option<Vec<u8>>,
103 pub schema_fingerprint: [u8; 16],
105}
106
107impl MigrationRowOp {
108 #[must_use]
110 pub fn new(
111 entity_path: impl Into<String>,
112 key: Vec<u8>,
113 before: Option<Vec<u8>>,
114 after: Option<Vec<u8>>,
115 schema_fingerprint: [u8; 16],
116 ) -> Self {
117 Self {
118 entity_path: entity_path.into(),
119 key,
120 before,
121 after,
122 schema_fingerprint,
123 }
124 }
125}
126
127impl TryFrom<MigrationRowOp> for CommitRowOp {
128 type Error = InternalError;
129
130 fn try_from(op: MigrationRowOp) -> Result<Self, Self::Error> {
131 Self::try_new_bytes(
132 op.entity_path,
133 op.key.as_slice(),
134 op.before,
135 op.after,
136 op.schema_fingerprint,
137 )
138 }
139}
140
141#[derive(Clone, Debug)]
150pub struct MigrationStep {
151 name: String,
152 row_ops: Vec<CommitRowOp>,
153}
154
155impl MigrationStep {
156 pub fn from_row_ops(
158 name: impl Into<String>,
159 row_ops: Vec<MigrationRowOp>,
160 ) -> Result<Self, InternalError> {
161 let commit_row_ops = row_ops
162 .into_iter()
163 .map(CommitRowOp::try_from)
164 .collect::<Result<Vec<_>, _>>()?;
165 Self::new(name, commit_row_ops)
166 }
167
168 pub(in crate::db) fn new(
170 name: impl Into<String>,
171 row_ops: Vec<CommitRowOp>,
172 ) -> Result<Self, InternalError> {
173 let name = name.into();
174 validate_non_empty_label(name.as_str(), "migration step name")?;
175
176 if row_ops.is_empty() {
177 return Err(InternalError::migration_step_row_ops_required(&name));
178 }
179
180 Ok(Self { name, row_ops })
181 }
182
183 #[must_use]
185 pub const fn name(&self) -> &str {
186 self.name.as_str()
187 }
188
189 #[must_use]
191 pub const fn row_op_count(&self) -> usize {
192 self.row_ops.len()
193 }
194}
195
196#[derive(Clone, Debug)]
206pub struct MigrationPlan {
207 id: String,
208 version: u64,
209 steps: Vec<MigrationStep>,
210}
211
212impl MigrationPlan {
213 pub fn new(
215 id: impl Into<String>,
216 version: u64,
217 steps: Vec<MigrationStep>,
218 ) -> Result<Self, InternalError> {
219 let id = id.into();
220 validate_non_empty_label(id.as_str(), "migration plan id")?;
221 if version == 0 {
222 return Err(InternalError::migration_plan_version_required(&id));
223 }
224
225 if steps.is_empty() {
226 return Err(InternalError::migration_plan_steps_required(&id));
227 }
228
229 Ok(Self { id, version, steps })
230 }
231
232 #[must_use]
234 pub const fn id(&self) -> &str {
235 self.id.as_str()
236 }
237
238 #[must_use]
240 pub const fn version(&self) -> u64 {
241 self.version
242 }
243
244 #[must_use]
246 pub const fn len(&self) -> usize {
247 self.steps.len()
248 }
249
250 #[must_use]
252 pub const fn is_empty(&self) -> bool {
253 self.steps.is_empty()
254 }
255
256 fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
257 self.steps.get(index).ok_or_else(|| {
258 InternalError::migration_cursor_out_of_bounds(
259 self.id(),
260 self.version(),
261 index,
262 self.len(),
263 )
264 })
265 }
266}
267
268#[derive(Clone, Copy, Debug, Eq, PartialEq)]
275pub enum MigrationRunState {
276 Complete,
278 NeedsResume,
280}
281
282#[derive(Clone, Copy, Debug, Eq, PartialEq)]
291pub struct MigrationRunOutcome {
292 cursor: MigrationCursor,
293 applied_steps: usize,
294 applied_row_ops: usize,
295 state: MigrationRunState,
296}
297
298impl MigrationRunOutcome {
299 const fn new(
300 cursor: MigrationCursor,
301 applied_steps: usize,
302 applied_row_ops: usize,
303 state: MigrationRunState,
304 ) -> Self {
305 Self {
306 cursor,
307 applied_steps,
308 applied_row_ops,
309 state,
310 }
311 }
312
313 #[must_use]
315 pub const fn cursor(self) -> MigrationCursor {
316 self.cursor
317 }
318
319 #[must_use]
321 pub const fn applied_steps(self) -> usize {
322 self.applied_steps
323 }
324
325 #[must_use]
327 pub const fn applied_row_ops(self) -> usize {
328 self.applied_row_ops
329 }
330
331 #[must_use]
333 pub const fn state(self) -> MigrationRunState {
334 self.state
335 }
336}
337
338pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
347 db: &Db<C>,
348 plan: &MigrationPlan,
349 max_steps: usize,
350) -> Result<MigrationRunOutcome, InternalError> {
351 if max_steps == 0 {
353 return Err(InternalError::migration_execution_requires_max_steps(
354 plan.id(),
355 ));
356 }
357
358 db.ensure_recovered_state()?;
360
361 let mut next_cursor = load_durable_cursor_for_plan(plan)?;
363
364 let mut applied_steps = 0usize;
366 let mut applied_row_ops = 0usize;
367 while applied_steps < max_steps {
368 if next_cursor.next_step() >= plan.len() {
369 break;
370 }
371
372 let step_index = next_cursor.next_step();
373 let step = plan.step_at(step_index)?;
374 let next_cursor_after_step = next_cursor.advance();
375 let next_state_bytes =
376 encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
377 execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
378
379 applied_steps = applied_steps.saturating_add(1);
380 applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
381 next_cursor = next_cursor_after_step;
382 }
383
384 let state = if next_cursor.next_step() == plan.len() {
385 clear_migration_state_bytes()?;
386 MigrationRunState::Complete
387 } else {
388 MigrationRunState::NeedsResume
389 };
390
391 Ok(MigrationRunOutcome::new(
392 next_cursor,
393 applied_steps,
394 applied_row_ops,
395 state,
396 ))
397}
398
399fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
400 let Some(bytes) = load_migration_state_bytes()? else {
401 return Ok(MigrationCursor::start());
402 };
403 let state = decode_persisted_migration_state(&bytes)?;
404 if state.migration_id != plan.id() || state.migration_version != plan.version() {
405 return Err(InternalError::migration_in_progress_conflict(
406 plan.id(),
407 plan.version(),
408 &state.migration_id,
409 state.migration_version,
410 ));
411 }
412
413 let step_index = usize::try_from(state.step_index).map_err(|_| {
414 InternalError::migration_persisted_step_index_invalid_usize(
415 plan.id(),
416 plan.version(),
417 state.step_index,
418 )
419 })?;
420 if step_index > plan.len() {
421 return Err(InternalError::migration_persisted_step_index_out_of_bounds(
422 plan.id(),
423 plan.version(),
424 step_index,
425 plan.len(),
426 ));
427 }
428
429 if step_index == plan.len() {
430 clear_migration_state_bytes()?;
431 }
432
433 Ok(MigrationCursor::from_step(step_index))
434}
435
436fn encode_durable_cursor_state(
437 plan: &MigrationPlan,
438 cursor: MigrationCursor,
439 last_applied_row_op: Option<&CommitRowOp>,
440) -> Result<Vec<u8>, InternalError> {
441 let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
442 InternalError::migration_next_step_index_u64_required(plan.id(), plan.version())
443 })?;
444 let state = PersistedMigrationState {
445 migration_id: plan.id().to_string(),
446 migration_version: plan.version(),
447 step_index,
448 last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.as_bytes().to_vec()),
449 };
450
451 encode_persisted_migration_state(&state)
452}
453
454fn decode_persisted_migration_state(
455 bytes: &[u8],
456) -> Result<PersistedMigrationState, InternalError> {
457 if bytes.len() > MAX_MIGRATION_STATE_BYTES {
459 return Err(InternalError::serialize_corruption(format!(
460 "migration state decode failed: payload size {} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
461 bytes.len(),
462 )));
463 }
464
465 let mut cursor = bytes;
467 decode_migration_state_magic(&mut cursor)?;
468 let format_version = decode_migration_state_u8(&mut cursor, "format version")?;
469 validate_migration_state_format_version(format_version)?;
470 let migration_id = decode_migration_state_string(&mut cursor, "migration_id")?;
471 let migration_version = decode_migration_state_u64(&mut cursor, "migration_version")?;
472 let step_index = decode_migration_state_u64(&mut cursor, "step_index")?;
473 let last_applied_row_key =
474 decode_migration_state_optional_bytes(&mut cursor, "last_applied_row_key")?;
475
476 if !cursor.is_empty() {
478 return Err(InternalError::serialize_corruption(
479 "migration state decode failed: trailing bytes",
480 ));
481 }
482
483 Ok(PersistedMigrationState {
484 migration_id,
485 migration_version,
486 step_index,
487 last_applied_row_key,
488 })
489}
490
491fn encode_persisted_migration_state(
492 state: &PersistedMigrationState,
493) -> Result<Vec<u8>, InternalError> {
494 let row_key_len = state.last_applied_row_key.as_ref().map_or(0usize, Vec::len);
497 let encoded_len = MIGRATION_STATE_MAGIC
498 .len()
499 .saturating_add(1)
500 .saturating_add(4)
501 .saturating_add(state.migration_id.len())
502 .saturating_add(8)
503 .saturating_add(8)
504 .saturating_add(1)
505 .saturating_add(if state.last_applied_row_key.is_some() {
506 4usize.saturating_add(row_key_len)
507 } else {
508 0
509 });
510 if encoded_len > MAX_MIGRATION_STATE_BYTES {
511 return Err(InternalError::migration_state_serialize_failed(format!(
512 "payload size {encoded_len} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
513 )));
514 }
515
516 let migration_id_len = u32::try_from(state.migration_id.len()).map_err(|_| {
517 InternalError::migration_state_serialize_failed("migration_id exceeds u32 length")
518 })?;
519 let row_key_len_u32 = u32::try_from(row_key_len).map_err(|_| {
520 InternalError::migration_state_serialize_failed("last_applied_row_key exceeds u32 length")
521 })?;
522
523 let mut encoded = Vec::with_capacity(encoded_len);
525 encoded.extend_from_slice(&MIGRATION_STATE_MAGIC);
526 encoded.push(MIGRATION_STATE_VERSION_CURRENT);
527 encoded.extend_from_slice(&migration_id_len.to_be_bytes());
528 encoded.extend_from_slice(state.migration_id.as_bytes());
529 encoded.extend_from_slice(&state.migration_version.to_be_bytes());
530 encoded.extend_from_slice(&state.step_index.to_be_bytes());
531
532 match state.last_applied_row_key.as_ref() {
533 Some(row_key) => {
534 encoded.push(MIGRATION_STATE_SOME_ROW_KEY_TAG);
535 encoded.extend_from_slice(&row_key_len_u32.to_be_bytes());
536 encoded.extend_from_slice(row_key);
537 }
538 None => encoded.push(MIGRATION_STATE_NONE_ROW_KEY_TAG),
539 }
540
541 Ok(encoded)
542}
543
544fn decode_migration_state_magic(bytes: &mut &[u8]) -> Result<(), InternalError> {
546 let magic = take_migration_state_bytes(bytes, MIGRATION_STATE_MAGIC.len(), "magic")?;
547 if magic != MIGRATION_STATE_MAGIC {
548 return Err(InternalError::serialize_corruption(
549 "migration state decode failed: invalid magic",
550 ));
551 }
552
553 Ok(())
554}
555
556fn decode_migration_state_u8(bytes: &mut &[u8], label: &'static str) -> Result<u8, InternalError> {
558 Ok(take_migration_state_bytes(bytes, 1, label)?[0])
559}
560
561fn decode_migration_state_u64(
563 bytes: &mut &[u8],
564 label: &'static str,
565) -> Result<u64, InternalError> {
566 let raw = take_migration_state_bytes(bytes, 8, label)?;
567
568 Ok(u64::from_be_bytes([
569 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
570 ]))
571}
572
573fn decode_migration_state_string(
575 bytes: &mut &[u8],
576 label: &'static str,
577) -> Result<String, InternalError> {
578 let raw = decode_migration_state_length_prefixed_bytes(bytes, label)?;
579
580 String::from_utf8(raw.to_vec()).map_err(|_| {
581 InternalError::serialize_corruption(format!(
582 "migration state decode failed: {label} is not valid UTF-8",
583 ))
584 })
585}
586
587fn decode_migration_state_optional_bytes(
589 bytes: &mut &[u8],
590 label: &'static str,
591) -> Result<Option<Vec<u8>>, InternalError> {
592 let tag = decode_migration_state_u8(bytes, label)?;
593
594 match tag {
595 MIGRATION_STATE_NONE_ROW_KEY_TAG => Ok(None),
596 MIGRATION_STATE_SOME_ROW_KEY_TAG => Ok(Some(
597 decode_migration_state_length_prefixed_bytes(bytes, label)?.to_vec(),
598 )),
599 _ => Err(InternalError::serialize_corruption(format!(
600 "migration state decode failed: invalid {label} tag {tag}",
601 ))),
602 }
603}
604
605fn decode_migration_state_length_prefixed_bytes<'a>(
607 bytes: &mut &'a [u8],
608 label: &'static str,
609) -> Result<&'a [u8], InternalError> {
610 let raw_len = take_migration_state_bytes(bytes, 4, label)?;
611 let len = usize::try_from(u32::from_be_bytes([
612 raw_len[0], raw_len[1], raw_len[2], raw_len[3],
613 ]))
614 .map_err(|_| {
615 InternalError::serialize_corruption(format!(
616 "migration state decode failed: {label} length out of range",
617 ))
618 })?;
619
620 take_migration_state_bytes(bytes, len, label)
621}
622
623fn take_migration_state_bytes<'a>(
625 bytes: &mut &'a [u8],
626 len: usize,
627 label: &'static str,
628) -> Result<&'a [u8], InternalError> {
629 if bytes.len() < len {
630 return Err(InternalError::serialize_corruption(format!(
631 "migration state decode failed: truncated {label}",
632 )));
633 }
634
635 let (head, tail) = bytes.split_at(len);
636 *bytes = tail;
637
638 Ok(head)
639}
640
641fn validate_migration_state_format_version(format_version: u8) -> Result<(), InternalError> {
643 if format_version == MIGRATION_STATE_VERSION_CURRENT {
644 return Ok(());
645 }
646
647 Err(InternalError::serialize_incompatible_persisted_format(
648 format!(
649 "migration state format version {format_version} is unsupported by runtime version {MIGRATION_STATE_VERSION_CURRENT}",
650 ),
651 ))
652}
653
654fn execute_migration_step<C: CanisterKind>(
655 db: &Db<C>,
656 plan: &MigrationPlan,
657 step_index: usize,
658 step: &MigrationStep,
659 next_state_bytes: Vec<u8>,
660) -> Result<(), InternalError> {
661 let marker = CommitMarker::new(step.row_ops.clone())
663 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
664 let commit = begin_commit_with_migration_state(marker, next_state_bytes)
665 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
666
667 finish_commit(commit, |_| apply_marker_row_ops(db, &step.row_ops))
669 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
670
671 Ok(())
672}
673
674fn apply_marker_row_ops<C: CanisterKind>(
675 db: &Db<C>,
676 row_ops: &[CommitRowOp],
677) -> Result<(), InternalError> {
678 let mut prepared = Vec::with_capacity(row_ops.len());
680 for row_op in row_ops {
681 prepared.push(db.prepare_row_commit_op(row_op)?);
682 }
683
684 for prepared_op in prepared {
686 prepared_op.apply();
687 }
688
689 Ok(())
690}
691
692fn annotate_step_error(
693 plan: &MigrationPlan,
694 step_index: usize,
695 step_name: &str,
696 err: InternalError,
697) -> InternalError {
698 let source_message = err.message().to_string();
699
700 err.with_message(format!(
701 "migration '{}' step {} ('{}') failed: {}",
702 plan.id(),
703 step_index,
704 step_name,
705 source_message,
706 ))
707}
708
709fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
710 if value.trim().is_empty() {
711 return Err(InternalError::migration_label_empty(label));
712 }
713
714 Ok(())
715}