1#[cfg(test)]
7mod tests;
8
9use crate::{
10 db::{
11 Db,
12 commit::{
13 CommitMarker, CommitRowOp, begin_commit_with_migration_state,
14 clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
15 },
16 data::RawDataKey,
17 },
18 error::InternalError,
19 traits::CanisterKind,
20};
21
22const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
23const MIGRATION_STATE_MAGIC: [u8; 2] = *b"MS";
24const MIGRATION_STATE_VERSION_CURRENT: u8 = 1;
25const MIGRATION_STATE_NONE_ROW_KEY_TAG: u8 = 0;
26const MIGRATION_STATE_SOME_ROW_KEY_TAG: u8 = 1;
27
28#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
37pub struct MigrationCursor {
38 next_step: usize,
39}
40
41impl MigrationCursor {
42 #[must_use]
44 pub const fn start() -> Self {
45 Self { next_step: 0 }
46 }
47
48 #[must_use]
50 pub const fn next_step(self) -> usize {
51 self.next_step
52 }
53
54 const fn from_step(step_index: usize) -> Self {
55 Self {
56 next_step: step_index,
57 }
58 }
59
60 const fn advance(self) -> Self {
62 Self {
63 next_step: self.next_step.saturating_add(1),
64 }
65 }
66}
67
68#[derive(Clone, Debug, Eq, PartialEq)]
79struct PersistedMigrationState {
80 migration_id: String,
81 migration_version: u64,
82 step_index: u64,
83 last_applied_row_key: Option<Vec<u8>>,
84}
85
86#[derive(Clone, Debug)]
95pub struct MigrationRowOp {
96 entity_path: String,
97 key: RawDataKey,
98 before: Option<Vec<u8>>,
99 after: Option<Vec<u8>>,
100 schema_fingerprint: [u8; 16],
101}
102
103impl MigrationRowOp {
104 pub fn new(
111 entity_path: impl Into<String>,
112 key: Vec<u8>,
113 before: Option<Vec<u8>>,
114 after: Option<Vec<u8>>,
115 schema_fingerprint: [u8; 16],
116 ) -> Result<Self, InternalError> {
117 let entity_path = entity_path.into();
118 validate_non_empty_label(entity_path.as_str(), "migration row op entity path")?;
119 if before.is_none() && after.is_none() {
120 return Err(InternalError::migration_row_op_payload_required(
121 entity_path.as_str(),
122 ));
123 }
124
125 let validated = CommitRowOp::try_new_bytes(
126 entity_path.clone(),
127 key.as_slice(),
128 before.clone(),
129 after.clone(),
130 schema_fingerprint,
131 )?;
132
133 Ok(Self {
134 entity_path,
135 key: validated.key,
136 before,
137 after,
138 schema_fingerprint,
139 })
140 }
141
142 #[must_use]
144 pub const fn entity_path(&self) -> &str {
145 self.entity_path.as_str()
146 }
147
148 #[must_use]
150 pub const fn key(&self) -> &[u8] {
151 self.key.as_bytes()
152 }
153
154 #[must_use]
156 pub fn before(&self) -> Option<&[u8]> {
157 self.before.as_deref()
158 }
159
160 #[must_use]
162 pub fn after(&self) -> Option<&[u8]> {
163 self.after.as_deref()
164 }
165
166 #[must_use]
168 pub const fn schema_fingerprint(&self) -> [u8; 16] {
169 self.schema_fingerprint
170 }
171}
172
173impl TryFrom<MigrationRowOp> for CommitRowOp {
174 type Error = InternalError;
175
176 fn try_from(op: MigrationRowOp) -> Result<Self, Self::Error> {
177 Ok(Self::new(
178 op.entity_path,
179 op.key,
180 op.before,
181 op.after,
182 op.schema_fingerprint,
183 ))
184 }
185}
186
187#[derive(Clone, Debug)]
196pub struct MigrationStep {
197 name: String,
198 row_ops: Vec<CommitRowOp>,
199}
200
201impl MigrationStep {
202 pub fn from_row_ops(
204 name: impl Into<String>,
205 row_ops: Vec<MigrationRowOp>,
206 ) -> Result<Self, InternalError> {
207 let commit_row_ops = row_ops
208 .into_iter()
209 .map(CommitRowOp::try_from)
210 .collect::<Result<Vec<_>, _>>()?;
211 Self::new(name, commit_row_ops)
212 }
213
214 pub(in crate::db) fn new(
216 name: impl Into<String>,
217 row_ops: Vec<CommitRowOp>,
218 ) -> Result<Self, InternalError> {
219 let name = name.into();
220 validate_non_empty_label(name.as_str(), "migration step name")?;
221
222 if row_ops.is_empty() {
223 return Err(InternalError::migration_step_row_ops_required(&name));
224 }
225
226 Ok(Self { name, row_ops })
227 }
228
229 #[must_use]
231 pub const fn name(&self) -> &str {
232 self.name.as_str()
233 }
234
235 #[must_use]
237 pub const fn row_op_count(&self) -> usize {
238 self.row_ops.len()
239 }
240}
241
242#[derive(Clone, Debug)]
252pub struct MigrationPlan {
253 id: String,
254 version: u64,
255 steps: Vec<MigrationStep>,
256}
257
258impl MigrationPlan {
259 pub fn new(
261 id: impl Into<String>,
262 version: u64,
263 steps: Vec<MigrationStep>,
264 ) -> Result<Self, InternalError> {
265 let id = id.into();
266 validate_non_empty_label(id.as_str(), "migration plan id")?;
267 if version == 0 {
268 return Err(InternalError::migration_plan_version_required(&id));
269 }
270
271 if steps.is_empty() {
272 return Err(InternalError::migration_plan_steps_required(&id));
273 }
274
275 Ok(Self { id, version, steps })
276 }
277
278 #[must_use]
280 pub const fn id(&self) -> &str {
281 self.id.as_str()
282 }
283
284 #[must_use]
286 pub const fn version(&self) -> u64 {
287 self.version
288 }
289
290 #[must_use]
292 pub const fn len(&self) -> usize {
293 self.steps.len()
294 }
295
296 #[must_use]
298 pub const fn is_empty(&self) -> bool {
299 self.steps.is_empty()
300 }
301
302 fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
303 self.steps.get(index).ok_or_else(|| {
304 InternalError::migration_cursor_out_of_bounds(
305 self.id(),
306 self.version(),
307 index,
308 self.len(),
309 )
310 })
311 }
312}
313
314#[derive(Clone, Copy, Debug, Eq, PartialEq)]
321pub enum MigrationRunState {
322 Complete,
324 NeedsResume,
326}
327
328#[derive(Clone, Copy, Debug, Eq, PartialEq)]
337pub struct MigrationRunOutcome {
338 cursor: MigrationCursor,
339 applied_steps: usize,
340 applied_row_ops: usize,
341 state: MigrationRunState,
342}
343
344impl MigrationRunOutcome {
345 const fn new(
346 cursor: MigrationCursor,
347 applied_steps: usize,
348 applied_row_ops: usize,
349 state: MigrationRunState,
350 ) -> Self {
351 Self {
352 cursor,
353 applied_steps,
354 applied_row_ops,
355 state,
356 }
357 }
358
359 #[must_use]
361 pub const fn cursor(self) -> MigrationCursor {
362 self.cursor
363 }
364
365 #[must_use]
367 pub const fn applied_steps(self) -> usize {
368 self.applied_steps
369 }
370
371 #[must_use]
373 pub const fn applied_row_ops(self) -> usize {
374 self.applied_row_ops
375 }
376
377 #[must_use]
379 pub const fn state(self) -> MigrationRunState {
380 self.state
381 }
382}
383
384pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
393 db: &Db<C>,
394 plan: &MigrationPlan,
395 max_steps: usize,
396) -> Result<MigrationRunOutcome, InternalError> {
397 if max_steps == 0 {
399 return Err(InternalError::migration_execution_requires_max_steps(
400 plan.id(),
401 ));
402 }
403
404 db.ensure_recovered_state()?;
406
407 let mut next_cursor = load_durable_cursor_for_plan(plan)?;
409
410 let mut applied_steps = 0usize;
412 let mut applied_row_ops = 0usize;
413 while applied_steps < max_steps {
414 if next_cursor.next_step() >= plan.len() {
415 break;
416 }
417
418 let step_index = next_cursor.next_step();
419 let step = plan.step_at(step_index)?;
420 let next_cursor_after_step = next_cursor.advance();
421 let next_state_bytes =
422 encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
423 execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
424
425 applied_steps = applied_steps.saturating_add(1);
426 applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
427 next_cursor = next_cursor_after_step;
428 }
429
430 let state = if next_cursor.next_step() == plan.len() {
431 clear_migration_state_bytes()?;
432 MigrationRunState::Complete
433 } else {
434 MigrationRunState::NeedsResume
435 };
436
437 Ok(MigrationRunOutcome::new(
438 next_cursor,
439 applied_steps,
440 applied_row_ops,
441 state,
442 ))
443}
444
445fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
446 let Some(bytes) = load_migration_state_bytes()? else {
447 return Ok(MigrationCursor::start());
448 };
449 let state = decode_persisted_migration_state(&bytes)?;
450 if state.migration_id != plan.id() || state.migration_version != plan.version() {
451 return Err(InternalError::migration_in_progress_conflict(
452 plan.id(),
453 plan.version(),
454 &state.migration_id,
455 state.migration_version,
456 ));
457 }
458
459 let step_index = usize::try_from(state.step_index).map_err(|_| {
460 InternalError::migration_persisted_step_index_invalid_usize(
461 plan.id(),
462 plan.version(),
463 state.step_index,
464 )
465 })?;
466 if step_index > plan.len() {
467 return Err(InternalError::migration_persisted_step_index_out_of_bounds(
468 plan.id(),
469 plan.version(),
470 step_index,
471 plan.len(),
472 ));
473 }
474
475 if step_index == plan.len() {
476 clear_migration_state_bytes()?;
477 }
478
479 Ok(MigrationCursor::from_step(step_index))
480}
481
482fn encode_durable_cursor_state(
483 plan: &MigrationPlan,
484 cursor: MigrationCursor,
485 last_applied_row_op: Option<&CommitRowOp>,
486) -> Result<Vec<u8>, InternalError> {
487 let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
488 InternalError::migration_next_step_index_u64_required(plan.id(), plan.version())
489 })?;
490 let state = PersistedMigrationState {
491 migration_id: plan.id().to_string(),
492 migration_version: plan.version(),
493 step_index,
494 last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.as_bytes().to_vec()),
495 };
496
497 encode_persisted_migration_state(&state)
498}
499
500fn decode_persisted_migration_state(
501 bytes: &[u8],
502) -> Result<PersistedMigrationState, InternalError> {
503 if bytes.len() > MAX_MIGRATION_STATE_BYTES {
505 return Err(InternalError::serialize_corruption(format!(
506 "migration state decode failed: payload size {} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
507 bytes.len(),
508 )));
509 }
510
511 let mut cursor = bytes;
513 decode_migration_state_magic(&mut cursor)?;
514 let format_version = decode_migration_state_u8(&mut cursor, "format version")?;
515 validate_migration_state_format_version(format_version)?;
516 let migration_id = decode_migration_state_string(&mut cursor, "migration_id")?;
517 let migration_version = decode_migration_state_u64(&mut cursor, "migration_version")?;
518 let step_index = decode_migration_state_u64(&mut cursor, "step_index")?;
519 let last_applied_row_key =
520 decode_migration_state_optional_bytes(&mut cursor, "last_applied_row_key")?;
521
522 if !cursor.is_empty() {
524 return Err(InternalError::serialize_corruption(
525 "migration state decode failed: trailing bytes",
526 ));
527 }
528
529 Ok(PersistedMigrationState {
530 migration_id,
531 migration_version,
532 step_index,
533 last_applied_row_key,
534 })
535}
536
537fn encode_persisted_migration_state(
538 state: &PersistedMigrationState,
539) -> Result<Vec<u8>, InternalError> {
540 let row_key_len = state.last_applied_row_key.as_ref().map_or(0usize, Vec::len);
543 let encoded_len = MIGRATION_STATE_MAGIC
544 .len()
545 .saturating_add(1)
546 .saturating_add(4)
547 .saturating_add(state.migration_id.len())
548 .saturating_add(8)
549 .saturating_add(8)
550 .saturating_add(1)
551 .saturating_add(if state.last_applied_row_key.is_some() {
552 4usize.saturating_add(row_key_len)
553 } else {
554 0
555 });
556 if encoded_len > MAX_MIGRATION_STATE_BYTES {
557 return Err(InternalError::migration_state_serialize_failed(format!(
558 "payload size {encoded_len} exceeds limit {MAX_MIGRATION_STATE_BYTES}",
559 )));
560 }
561
562 let migration_id_len = u32::try_from(state.migration_id.len()).map_err(|_| {
563 InternalError::migration_state_serialize_failed("migration_id exceeds u32 length")
564 })?;
565 let row_key_len_u32 = u32::try_from(row_key_len).map_err(|_| {
566 InternalError::migration_state_serialize_failed("last_applied_row_key exceeds u32 length")
567 })?;
568
569 let mut encoded = Vec::with_capacity(encoded_len);
571 encoded.extend_from_slice(&MIGRATION_STATE_MAGIC);
572 encoded.push(MIGRATION_STATE_VERSION_CURRENT);
573 encoded.extend_from_slice(&migration_id_len.to_be_bytes());
574 encoded.extend_from_slice(state.migration_id.as_bytes());
575 encoded.extend_from_slice(&state.migration_version.to_be_bytes());
576 encoded.extend_from_slice(&state.step_index.to_be_bytes());
577
578 match state.last_applied_row_key.as_ref() {
579 Some(row_key) => {
580 encoded.push(MIGRATION_STATE_SOME_ROW_KEY_TAG);
581 encoded.extend_from_slice(&row_key_len_u32.to_be_bytes());
582 encoded.extend_from_slice(row_key);
583 }
584 None => encoded.push(MIGRATION_STATE_NONE_ROW_KEY_TAG),
585 }
586
587 Ok(encoded)
588}
589
590fn decode_migration_state_magic(bytes: &mut &[u8]) -> Result<(), InternalError> {
592 let magic = take_migration_state_bytes(bytes, MIGRATION_STATE_MAGIC.len(), "magic")?;
593 if magic != MIGRATION_STATE_MAGIC {
594 return Err(InternalError::serialize_corruption(
595 "migration state decode failed: invalid magic",
596 ));
597 }
598
599 Ok(())
600}
601
602fn decode_migration_state_u8(bytes: &mut &[u8], label: &'static str) -> Result<u8, InternalError> {
604 Ok(take_migration_state_bytes(bytes, 1, label)?[0])
605}
606
607fn decode_migration_state_u64(
609 bytes: &mut &[u8],
610 label: &'static str,
611) -> Result<u64, InternalError> {
612 let raw = take_migration_state_bytes(bytes, 8, label)?;
613
614 Ok(u64::from_be_bytes([
615 raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
616 ]))
617}
618
619fn decode_migration_state_string(
621 bytes: &mut &[u8],
622 label: &'static str,
623) -> Result<String, InternalError> {
624 let raw = decode_migration_state_length_prefixed_bytes(bytes, label)?;
625
626 String::from_utf8(raw.to_vec()).map_err(|_| {
627 InternalError::serialize_corruption(format!(
628 "migration state decode failed: {label} is not valid UTF-8",
629 ))
630 })
631}
632
633fn decode_migration_state_optional_bytes(
635 bytes: &mut &[u8],
636 label: &'static str,
637) -> Result<Option<Vec<u8>>, InternalError> {
638 let tag = decode_migration_state_u8(bytes, label)?;
639
640 match tag {
641 MIGRATION_STATE_NONE_ROW_KEY_TAG => Ok(None),
642 MIGRATION_STATE_SOME_ROW_KEY_TAG => Ok(Some(
643 decode_migration_state_length_prefixed_bytes(bytes, label)?.to_vec(),
644 )),
645 _ => Err(InternalError::serialize_corruption(format!(
646 "migration state decode failed: invalid {label} tag {tag}",
647 ))),
648 }
649}
650
651fn decode_migration_state_length_prefixed_bytes<'a>(
653 bytes: &mut &'a [u8],
654 label: &'static str,
655) -> Result<&'a [u8], InternalError> {
656 let raw_len = take_migration_state_bytes(bytes, 4, label)?;
657 let len = usize::try_from(u32::from_be_bytes([
658 raw_len[0], raw_len[1], raw_len[2], raw_len[3],
659 ]))
660 .map_err(|_| {
661 InternalError::serialize_corruption(format!(
662 "migration state decode failed: {label} length out of range",
663 ))
664 })?;
665
666 take_migration_state_bytes(bytes, len, label)
667}
668
669fn take_migration_state_bytes<'a>(
671 bytes: &mut &'a [u8],
672 len: usize,
673 label: &'static str,
674) -> Result<&'a [u8], InternalError> {
675 if bytes.len() < len {
676 return Err(InternalError::serialize_corruption(format!(
677 "migration state decode failed: truncated {label}",
678 )));
679 }
680
681 let (head, tail) = bytes.split_at(len);
682 *bytes = tail;
683
684 Ok(head)
685}
686
687fn validate_migration_state_format_version(format_version: u8) -> Result<(), InternalError> {
689 if format_version == MIGRATION_STATE_VERSION_CURRENT {
690 return Ok(());
691 }
692
693 Err(InternalError::serialize_incompatible_persisted_format(
694 format!(
695 "migration state format version {format_version} is unsupported by runtime version {MIGRATION_STATE_VERSION_CURRENT}",
696 ),
697 ))
698}
699
700fn execute_migration_step<C: CanisterKind>(
701 db: &Db<C>,
702 plan: &MigrationPlan,
703 step_index: usize,
704 step: &MigrationStep,
705 next_state_bytes: Vec<u8>,
706) -> Result<(), InternalError> {
707 let marker = CommitMarker::new(step.row_ops.clone())
709 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
710 let commit = begin_commit_with_migration_state(marker, next_state_bytes)
711 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
712
713 finish_commit(commit, |_| apply_marker_row_ops(db, &step.row_ops))
715 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
716
717 Ok(())
718}
719
720fn apply_marker_row_ops<C: CanisterKind>(
721 db: &Db<C>,
722 row_ops: &[CommitRowOp],
723) -> Result<(), InternalError> {
724 let mut prepared = Vec::with_capacity(row_ops.len());
726 for row_op in row_ops {
727 prepared.push(db.prepare_row_commit_op(row_op)?);
728 }
729
730 for prepared_op in prepared {
732 prepared_op.apply();
733 }
734
735 Ok(())
736}
737
738fn annotate_step_error(
739 plan: &MigrationPlan,
740 step_index: usize,
741 step_name: &str,
742 err: InternalError,
743) -> InternalError {
744 let source_message = err.message().to_string();
745
746 err.with_message(format!(
747 "migration '{}' step {} ('{}') failed: {}",
748 plan.id(),
749 step_index,
750 step_name,
751 source_message,
752 ))
753}
754
755fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
756 if value.trim().is_empty() {
757 return Err(InternalError::migration_label_empty(label));
758 }
759
760 Ok(())
761}