1#[cfg(test)]
7mod tests;
8
9use crate::{
10 db::{
11 Db,
12 codec::deserialize_persisted_payload,
13 commit::{
14 CommitMarker, CommitRowOp, begin_commit_with_migration_state,
15 clear_migration_state_bytes, finish_commit, load_migration_state_bytes,
16 },
17 },
18 error::InternalError,
19 serialize::serialize,
20 traits::CanisterKind,
21};
22use serde::{Deserialize, Serialize};
23
24const MAX_MIGRATION_STATE_BYTES: usize = 64 * 1024;
25
26#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
35pub struct MigrationCursor {
36 next_step: usize,
37}
38
39impl MigrationCursor {
40 #[must_use]
42 pub const fn start() -> Self {
43 Self { next_step: 0 }
44 }
45
46 #[must_use]
48 pub const fn next_step(self) -> usize {
49 self.next_step
50 }
51
52 const fn from_step(step_index: usize) -> Self {
53 Self {
54 next_step: step_index,
55 }
56 }
57
58 const fn advance(self) -> Self {
60 Self {
61 next_step: self.next_step.saturating_add(1),
62 }
63 }
64}
65
66#[derive(Clone, Debug, Deserialize, Serialize)]
77#[serde(deny_unknown_fields)]
78struct PersistedMigrationState {
79 migration_id: String,
80 migration_version: u64,
81 step_index: u64,
82 last_applied_row_key: Option<Vec<u8>>,
83}
84
85#[derive(Clone, Debug)]
94pub struct MigrationRowOp {
95 pub entity_path: String,
97 pub key: Vec<u8>,
99 pub before: Option<Vec<u8>>,
101 pub after: Option<Vec<u8>>,
103 pub schema_fingerprint: [u8; 16],
105}
106
107impl MigrationRowOp {
108 #[must_use]
110 pub fn new(
111 entity_path: impl Into<String>,
112 key: Vec<u8>,
113 before: Option<Vec<u8>>,
114 after: Option<Vec<u8>>,
115 schema_fingerprint: [u8; 16],
116 ) -> Self {
117 Self {
118 entity_path: entity_path.into(),
119 key,
120 before,
121 after,
122 schema_fingerprint,
123 }
124 }
125}
126
127impl From<MigrationRowOp> for CommitRowOp {
128 fn from(op: MigrationRowOp) -> Self {
129 Self::new(
130 op.entity_path,
131 op.key,
132 op.before,
133 op.after,
134 op.schema_fingerprint,
135 )
136 }
137}
138
139#[derive(Clone, Debug)]
148pub struct MigrationStep {
149 name: String,
150 row_ops: Vec<CommitRowOp>,
151}
152
153impl MigrationStep {
154 pub fn from_row_ops(
156 name: impl Into<String>,
157 row_ops: Vec<MigrationRowOp>,
158 ) -> Result<Self, InternalError> {
159 let commit_row_ops = row_ops.into_iter().map(CommitRowOp::from).collect();
160 Self::new(name, commit_row_ops)
161 }
162
163 pub(in crate::db) fn new(
165 name: impl Into<String>,
166 row_ops: Vec<CommitRowOp>,
167 ) -> Result<Self, InternalError> {
168 let name = name.into();
169 validate_non_empty_label(name.as_str(), "migration step name")?;
170
171 if row_ops.is_empty() {
172 return Err(InternalError::store_unsupported(format!(
173 "migration step '{name}' must include at least one row op",
174 )));
175 }
176
177 Ok(Self { name, row_ops })
178 }
179
180 #[must_use]
182 pub const fn name(&self) -> &str {
183 self.name.as_str()
184 }
185
186 #[must_use]
188 pub const fn row_op_count(&self) -> usize {
189 self.row_ops.len()
190 }
191}
192
193#[derive(Clone, Debug)]
203pub struct MigrationPlan {
204 id: String,
205 version: u64,
206 steps: Vec<MigrationStep>,
207}
208
209impl MigrationPlan {
210 pub fn new(
212 id: impl Into<String>,
213 version: u64,
214 steps: Vec<MigrationStep>,
215 ) -> Result<Self, InternalError> {
216 let id = id.into();
217 validate_non_empty_label(id.as_str(), "migration plan id")?;
218 if version == 0 {
219 return Err(InternalError::store_unsupported(format!(
220 "migration plan '{id}' version must be > 0",
221 )));
222 }
223
224 if steps.is_empty() {
225 return Err(InternalError::store_unsupported(format!(
226 "migration plan '{id}' must include at least one step",
227 )));
228 }
229
230 Ok(Self { id, version, steps })
231 }
232
233 #[must_use]
235 pub const fn id(&self) -> &str {
236 self.id.as_str()
237 }
238
239 #[must_use]
241 pub const fn version(&self) -> u64 {
242 self.version
243 }
244
245 #[must_use]
247 pub const fn len(&self) -> usize {
248 self.steps.len()
249 }
250
251 #[must_use]
253 pub const fn is_empty(&self) -> bool {
254 self.steps.is_empty()
255 }
256
257 fn step_at(&self, index: usize) -> Result<&MigrationStep, InternalError> {
258 self.steps.get(index).ok_or_else(|| {
259 InternalError::store_unsupported(format!(
260 "migration '{}@{}' cursor out of bounds: next_step={} total_steps={}",
261 self.id(),
262 self.version(),
263 index,
264 self.len(),
265 ))
266 })
267 }
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq)]
277pub enum MigrationRunState {
278 Complete,
280 NeedsResume,
282}
283
284#[derive(Clone, Copy, Debug, Eq, PartialEq)]
293pub struct MigrationRunOutcome {
294 cursor: MigrationCursor,
295 applied_steps: usize,
296 applied_row_ops: usize,
297 state: MigrationRunState,
298}
299
300impl MigrationRunOutcome {
301 const fn new(
302 cursor: MigrationCursor,
303 applied_steps: usize,
304 applied_row_ops: usize,
305 state: MigrationRunState,
306 ) -> Self {
307 Self {
308 cursor,
309 applied_steps,
310 applied_row_ops,
311 state,
312 }
313 }
314
315 #[must_use]
317 pub const fn cursor(self) -> MigrationCursor {
318 self.cursor
319 }
320
321 #[must_use]
323 pub const fn applied_steps(self) -> usize {
324 self.applied_steps
325 }
326
327 #[must_use]
329 pub const fn applied_row_ops(self) -> usize {
330 self.applied_row_ops
331 }
332
333 #[must_use]
335 pub const fn state(self) -> MigrationRunState {
336 self.state
337 }
338}
339
340pub(in crate::db) fn execute_migration_plan<C: CanisterKind>(
349 db: &Db<C>,
350 plan: &MigrationPlan,
351 max_steps: usize,
352) -> Result<MigrationRunOutcome, InternalError> {
353 if max_steps == 0 {
355 return Err(InternalError::store_unsupported(format!(
356 "migration '{}' execution requires max_steps > 0",
357 plan.id(),
358 )));
359 }
360
361 db.ensure_recovered_state()?;
363
364 let mut next_cursor = load_durable_cursor_for_plan(plan)?;
366
367 let mut applied_steps = 0usize;
369 let mut applied_row_ops = 0usize;
370 while applied_steps < max_steps {
371 if next_cursor.next_step() >= plan.len() {
372 break;
373 }
374
375 let step_index = next_cursor.next_step();
376 let step = plan.step_at(step_index)?;
377 let next_cursor_after_step = next_cursor.advance();
378 let next_state_bytes =
379 encode_durable_cursor_state(plan, next_cursor_after_step, step.row_ops.last())?;
380 execute_migration_step(db, plan, step_index, step, next_state_bytes)?;
381
382 applied_steps = applied_steps.saturating_add(1);
383 applied_row_ops = applied_row_ops.saturating_add(step.row_op_count());
384 next_cursor = next_cursor_after_step;
385 }
386
387 let state = if next_cursor.next_step() == plan.len() {
388 clear_migration_state_bytes()?;
389 MigrationRunState::Complete
390 } else {
391 MigrationRunState::NeedsResume
392 };
393
394 Ok(MigrationRunOutcome::new(
395 next_cursor,
396 applied_steps,
397 applied_row_ops,
398 state,
399 ))
400}
401
402fn load_durable_cursor_for_plan(plan: &MigrationPlan) -> Result<MigrationCursor, InternalError> {
403 let Some(bytes) = load_migration_state_bytes()? else {
404 return Ok(MigrationCursor::start());
405 };
406 let state = decode_persisted_migration_state(&bytes)?;
407 if state.migration_id != plan.id() || state.migration_version != plan.version() {
408 return Err(InternalError::store_unsupported(format!(
409 "migration '{}@{}' cannot execute while migration '{}@{}' is in progress",
410 plan.id(),
411 plan.version(),
412 state.migration_id,
413 state.migration_version,
414 )));
415 }
416
417 let step_index = usize::try_from(state.step_index).map_err(|_| {
418 InternalError::store_corruption(format!(
419 "migration '{}@{}' persisted step index does not fit runtime usize: {}",
420 plan.id(),
421 plan.version(),
422 state.step_index,
423 ))
424 })?;
425 if step_index > plan.len() {
426 return Err(InternalError::store_corruption(format!(
427 "migration '{}@{}' persisted step index out of bounds: {} > {}",
428 plan.id(),
429 plan.version(),
430 step_index,
431 plan.len(),
432 )));
433 }
434
435 if step_index == plan.len() {
436 clear_migration_state_bytes()?;
437 }
438
439 Ok(MigrationCursor::from_step(step_index))
440}
441
442fn encode_durable_cursor_state(
443 plan: &MigrationPlan,
444 cursor: MigrationCursor,
445 last_applied_row_op: Option<&CommitRowOp>,
446) -> Result<Vec<u8>, InternalError> {
447 let step_index = u64::try_from(cursor.next_step()).map_err(|_| {
448 InternalError::store_internal(format!(
449 "migration '{}@{}' next step index does not fit persisted u64 cursor",
450 plan.id(),
451 plan.version(),
452 ))
453 })?;
454 let state = PersistedMigrationState {
455 migration_id: plan.id().to_string(),
456 migration_version: plan.version(),
457 step_index,
458 last_applied_row_key: last_applied_row_op.map(|row_op| row_op.key.clone()),
459 };
460
461 encode_persisted_migration_state(&state)
462}
463
464fn decode_persisted_migration_state(
465 bytes: &[u8],
466) -> Result<PersistedMigrationState, InternalError> {
467 deserialize_persisted_payload::<PersistedMigrationState>(
468 bytes,
469 MAX_MIGRATION_STATE_BYTES,
470 "migration state",
471 )
472}
473
474fn encode_persisted_migration_state(
475 state: &PersistedMigrationState,
476) -> Result<Vec<u8>, InternalError> {
477 serialize(state).map_err(|err| {
478 InternalError::serialize_internal(format!("failed to serialize migration state: {err}"))
479 })
480}
481
482fn execute_migration_step<C: CanisterKind>(
483 db: &Db<C>,
484 plan: &MigrationPlan,
485 step_index: usize,
486 step: &MigrationStep,
487 next_state_bytes: Vec<u8>,
488) -> Result<(), InternalError> {
489 let marker = CommitMarker::new(step.row_ops.clone())
491 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
492 let commit = begin_commit_with_migration_state(marker, next_state_bytes)
493 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
494
495 finish_commit(commit, |guard| {
497 apply_marker_row_ops(db, &guard.marker.row_ops)
498 })
499 .map_err(|err| annotate_step_error(plan, step_index, step.name(), err))?;
500
501 Ok(())
502}
503
504fn apply_marker_row_ops<C: CanisterKind>(
505 db: &Db<C>,
506 row_ops: &[CommitRowOp],
507) -> Result<(), InternalError> {
508 let mut prepared = Vec::with_capacity(row_ops.len());
510 for row_op in row_ops {
511 prepared.push(db.prepare_row_commit_op(row_op)?);
512 }
513
514 for prepared_op in prepared {
516 prepared_op.apply();
517 }
518
519 Ok(())
520}
521
522fn annotate_step_error(
523 plan: &MigrationPlan,
524 step_index: usize,
525 step_name: &str,
526 err: InternalError,
527) -> InternalError {
528 let source_message = err.message().to_string();
529
530 err.with_message(format!(
531 "migration '{}' step {} ('{}') failed: {}",
532 plan.id(),
533 step_index,
534 step_name,
535 source_message,
536 ))
537}
538
539fn validate_non_empty_label(value: &str, label: &str) -> Result<(), InternalError> {
540 if value.trim().is_empty() {
541 return Err(InternalError::store_unsupported(format!(
542 "{label} cannot be empty",
543 )));
544 }
545
546 Ok(())
547}