1#[cfg(test)]
7mod tests;
8
9use crate::{
10 db::{
11 Db,
12 codec::deserialize_persisted_payload,
13 commit::{
14 CommitMarker, CommitRowOp, begin_commit_with_migration_state,
15 clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
16 },
17 },
18 error::InternalError,
19 serialize::serialize,
20 traits::CanisterKind,
21};
22use serde::{Deserialize, Serialize};
23
24const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
25
26#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
35pub struct MigrationCursor {
36 next_step: usize,
37}
38
39impl MigrationCursor {
40 #[must_use]
42 pub const fn start() -> Self {
43 Self { next_step: 0 }
44 }
45
46 #[must_use]
48 pub const fn next_step(self) -> usize {
49 self.next_step
50 }
51
52 const fn from_step(step_index: usize) -> Self {
53 Self {
54 next_step: step_index,
55 }
56 }
57
58 const fn advance(self) -> Self {
60 Self {
61 next_step: self.next_step.saturating_add(1),
62 }
63 }
64}
65
66#[derive(Clone, Debug, Deserialize, Serialize)]
77#[serde(deny_unknown_fields)]
78struct PersistedMigrationState {
79 migration_id: String,
80 migration_version: u64,
81 step_index: u64,
82 last_applied_row_key: Option<Vec<u8>>,
83}
84
85#[derive(Clone, Debug)]
94pub struct MigrationRowOp {
95 pub entity_path: String,
97 pub key: Vec<u8>,
99 pub before: Option<Vec<u8>>,
101 pub after: Option<Vec<u8>>,
103 pub schema_fingerprint: [u8; 16],
105}
106
107impl MigrationRowOp {
108 #[must_use]
110 pub fn new(
111 entity_path: impl Into<String>,
112 key: Vec<u8>,
113 before: Option<Vec<u8>>,
114 after: Option<Vec<u8>>,
115 schema_fingerprint: [u8; 16],
116 ) -> Self {
117 Self {
118 entity_path: entity_path.into(),
119 key,
120 before,
121 after,
122 schema_fingerprint,
123 }
124 }
125}
126
127impl From<MigrationRowOp> for CommitRowOp {
128 fn from(op: MigrationRowOp) -> Self {
129 Self::new(
130 op.entity_path,
131 op.key,
132 op.before,
133 op.after,
134 op.schema_fingerprint,
135 )
136 }
137}
138
139#[derive(Clone, Debug)]
148pub struct MigrationStep {
149 name: String,
150 row_ops: Vec<CommitRowOp>,
151}
152
153impl MigrationStep {
154 pub fn from_row_ops(
156 name: impl Into<String>,
157 row_ops: Vec<MigrationRowOp>,
158 ) -> Result<Self, InternalError> {
159 let commit_row_ops = row_ops.into_iter().map(CommitRowOp::from).collect();
160 Self::new(name, commit_row_ops)
161 }
162
163 pub(in crate::db) fn new(
165 name: impl Into<String>,
166 row_ops: Vec<CommitRowOp>,
167 ) -> Result<Self, InternalError> {
168 let name = name.into();
169 validate_non_empty_label(name.as_str(), "migration step name")?;
170
171 if row_ops.is_empty() {
172 return Err(InternalError::migration_step_row_ops_required(&name));
173 }
174
175 Ok(Self { name, row_ops })
176 }
177
178 #[must_use]
180 pub const fn name(&self) -> &str {
181 self.name.as_str()
182 }
183
184 #[must_use]
186 pub const fn row_op_count(&self) -> usize {
187 self.row_ops.len()
188 }
189}
190
191#[derive(Clone, Debug)]
201pub struct MigrationPlan {
202 id: String,
203 version: u64,
204 steps: Vec<MigrationStep>,
205}
206
207impl MigrationPlan {
208 pub fn new(
210 id: impl Into<String>,
211 version: u64,
212 steps: Vec<MigrationStep>,
213 ) -> Result<Self, InternalError> {
214 let id = id.into();
215 validate_non_empty_label(id.as_str(), "migration plan id")?;
216 if version == 0 {
217 return Err(InternalError::migration_plan_version_required(&id));
218 }
219
220 if steps.is_empty() {
221 return Err(InternalError::migration_plan_steps_required(&id));
222 }
223
224 Ok(Self { id, version, steps })
225 }
226
227 #[must_use]
229 pub const fn id(&self) -> &str {
230 self.id.as_str()
231 }
232
233 #[must_use]
235 pub const fn version(&self) -> u64 {
236 self.version
237 }
238
239 #[must_use]
241 pub const fn len(&self) -> usize {
242 self.steps.len()
243 }
244
245 #[must_use]
247 pub const fn is_empty(&self) -> bool {
248 self.steps.is_empty()
249 }
250
251 fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
252 self.steps.get(index).ok_or_else(|| {
253 InternalError::migration_cursor_out_of_bounds(
254 self.id(),
255 self.version(),
256 index,
257 self.len(),
258 )
259 })
260 }
261}
262
263#[derive(Clone, Copy, Debug, Eq, PartialEq)]
270pub enum MigrationRunState {
271 Complete,
273 NeedsResume,
275}
276
277#[derive(Clone, Copy, Debug, Eq, PartialEq)]
286pub struct MigrationRunOutcome {
287 cursor: MigrationCursor,
288 applied_steps: usize,
289 applied_row_ops: usize,
290 state: MigrationRunState,
291}
292
293impl MigrationRunOutcome {
294 const fn new(
295 cursor: MigrationCursor,
296 applied_steps: usize,
297 applied_row_ops: usize,
298 state: MigrationRunState,
299 ) -> Self {
300 Self {
301 cursor,
302 applied_steps,
303 applied_row_ops,
304 state,
305 }
306 }
307
308 #[must_use]
310 pub const fn cursor(self) -> MigrationCursor {
311 self.cursor
312 }
313
314 #[must_use]
316 pub const fn applied_steps(self) -> usize {
317 self.applied_steps
318 }
319
320 #[must_use]
322 pub const fn applied_row_ops(self) -> usize {
323 self.applied_row_ops
324 }
325
326 #[must_use]
328 pub const fn state(self) -> MigrationRunState {
329 self.state
330 }
331}
332
333pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
342 db: &Db<C>,
343 plan: &MigrationPlan,
344 max_steps: usize,
345) -> Result<MigrationRunOutcome, InternalError> {
346 if max_steps == 0 {
348 return Err(InternalError::migration_execution_requires_max_steps(
349 plan.id(),
350 ));
351 }
352
353 db.ensure_recovered_state()?;
355
356 let mut next_cursor = load_durable_cursor_for_plan(plan)?;
358
359 let mut applied_steps = 0usize;
361 let mut applied_row_ops = 0usize;
362 while applied_steps < max_steps {
363 if next_cursor.next_step() >= plan.len() {
364 break;
365 }
366
367 let step_index = next_cursor.next_step();
368 let step = plan.step_at(step_index)?;
369 let next_cursor_after_step = next_cursor.advance();
370 let next_state_bytes =
371 encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
372 execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
373
374 applied_steps = applied_steps.saturating_add(1);
375 applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
376 next_cursor = next_cursor_after_step;
377 }
378
379 let state = if next_cursor.next_step() == plan.len() {
380 clear_migration_state_bytes()?;
381 MigrationRunState::Complete
382 } else {
383 MigrationRunState::NeedsResume
384 };
385
386 Ok(MigrationRunOutcome::new(
387 next_cursor,
388 applied_steps,
389 applied_row_ops,
390 state,
391 ))
392}
393
394fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
395 let Some(bytes) = load_migration_state_bytes()? else {
396 return Ok(MigrationCursor::start());
397 };
398 let state = decode_persisted_migration_state(&bytes)?;
399 if state.migration_id != plan.id() || state.migration_version != plan.version() {
400 return Err(InternalError::migration_in_progress_conflict(
401 plan.id(),
402 plan.version(),
403 &state.migration_id,
404 state.migration_version,
405 ));
406 }
407
408 let step_index = usize::try_from(state.step_index).map_err(|_| {
409 InternalError::migration_persisted_step_index_invalid_usize(
410 plan.id(),
411 plan.version(),
412 state.step_index,
413 )
414 })?;
415 if step_index > plan.len() {
416 return Err(InternalError::migration_persisted_step_index_out_of_bounds(
417 plan.id(),
418 plan.version(),
419 step_index,
420 plan.len(),
421 ));
422 }
423
424 if step_index == plan.len() {
425 clear_migration_state_bytes()?;
426 }
427
428 Ok(MigrationCursor::from_step(step_index))
429}
430
431fn encode_durable_cursor_state(
432 plan: &MigrationPlan,
433 cursor: MigrationCursor,
434 last_applied_row_op: Option<&CommitRowOp>,
435) -> Result<Vec<u8>, InternalError> {
436 let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
437 InternalError::migration_next_step_index_u64_required(plan.id(), plan.version())
438 })?;
439 let state = PersistedMigrationState {
440 migration_id: plan.id().to_string(),
441 migration_version: plan.version(),
442 step_index,
443 last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.clone()),
444 };
445
446 encode_persisted_migration_state(&state)
447}
448
449fn decode_persisted_migration_state(
450 bytes: &[u8],
451) -> Result<PersistedMigrationState, InternalError> {
452 deserialize_persisted_payload::<PersistedMigrationState>(
453 bytes,
454 MAX_MIGRATION_STATE_BYTES,
455 "migration state",
456 )
457}
458
459fn encode_persisted_migration_state(
460 state: &PersistedMigrationState,
461) -> Result<Vec<u8>, InternalError> {
462 serialize(state).map_err(InternalError::migration_state_serialize_failed)
463}
464
465fn execute_migration_step<C: CanisterKind>(
466 db: &Db<C>,
467 plan: &MigrationPlan,
468 step_index: usize,
469 step: &MigrationStep,
470 next_state_bytes: Vec<u8>,
471) -> Result<(), InternalError> {
472 let marker = CommitMarker::new(step.row_ops.clone())
474 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
475 let commit = begin_commit_with_migration_state(marker, next_state_bytes)
476 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
477
478 finish_commit(commit, |guard| {
480 apply_marker_row_ops(db, &guard.marker.row_ops)
481 })
482 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
483
484 Ok(())
485}
486
487fn apply_marker_row_ops<C: CanisterKind>(
488 db: &Db<C>,
489 row_ops: &[CommitRowOp],
490) -> Result<(), InternalError> {
491 let mut prepared = Vec::with_capacity(row_ops.len());
493 for row_op in row_ops {
494 prepared.push(db.prepare_row_commit_op(row_op)?);
495 }
496
497 for prepared_op in prepared {
499 prepared_op.apply();
500 }
501
502 Ok(())
503}
504
505fn annotate_step_error(
506 plan: &MigrationPlan,
507 step_index: usize,
508 step_name: &str,
509 err: InternalError,
510) -> InternalError {
511 let source_message = err.message().to_string();
512
513 err.with_message(format!(
514 "migration '{}' step {} ('{}') failed: {}",
515 plan.id(),
516 step_index,
517 step_name,
518 source_message,
519 ))
520}
521
522fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
523 if value.trim().is_empty() {
524 return Err(InternalError::migration_label_empty(label));
525 }
526
527 Ok(())
528}