1mod types;
2
3pub use types::*;
4
5use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
6
7const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
8const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
9
10impl BackupExecutionJournal {
11 pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
13 plan.validate()
14 .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
15 let operations = plan
16 .phases
17 .iter()
18 .map(BackupExecutionJournalOperation::from_plan_operation)
19 .collect::<Vec<_>>();
20 let mut journal = Self {
21 journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
22 plan_id: plan.plan_id.clone(),
23 run_id: plan.run_id.clone(),
24 preflight_id: None,
25 preflight_accepted: false,
26 restart_required: false,
27 operations,
28 operation_receipts: Vec::new(),
29 };
30 journal.refresh_blocked_operations();
31 journal.validate()?;
32 Ok(journal)
33 }
34
35 pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
37 if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
38 return Err(BackupExecutionJournalError::UnsupportedVersion(
39 self.journal_version,
40 ));
41 }
42 validate_nonempty("plan_id", &self.plan_id)?;
43 validate_nonempty("run_id", &self.run_id)?;
44 if let Some(preflight_id) = &self.preflight_id {
45 validate_nonempty("preflight_id", preflight_id)?;
46 } else if self.preflight_accepted {
47 return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
48 }
49 validate_operation_sequences(&self.operations)?;
50 for operation in &self.operations {
51 operation.validate()?;
52 if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
53 match operation.state {
54 BackupExecutionOperationState::Blocked => {}
55 BackupExecutionOperationState::Ready
56 | BackupExecutionOperationState::Pending
57 | BackupExecutionOperationState::Completed
58 | BackupExecutionOperationState::Failed
59 | BackupExecutionOperationState::Skipped => {
60 return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
61 sequence: operation.sequence,
62 });
63 }
64 }
65 }
66 }
67 for receipt in &self.operation_receipts {
68 receipt.validate_against(self)?;
69 }
70 Ok(())
71 }
72
73 pub fn accept_preflight_bundle_at(
75 &mut self,
76 preflight_id: String,
77 updated_at: Option<String>,
78 ) -> Result<(), BackupExecutionJournalError> {
79 validate_nonempty("preflight_id", &preflight_id)?;
80 validate_optional_nonempty("updated_at", updated_at.as_deref())?;
81 if let Some(existing) = &self.preflight_id
82 && existing != &preflight_id
83 {
84 return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
85 existing: existing.clone(),
86 attempted: preflight_id,
87 });
88 }
89
90 self.preflight_id = Some(preflight_id);
91 self.preflight_accepted = true;
92 for operation in &mut self.operations {
93 if operation_kind_is_preflight(&operation.kind) {
94 operation.state = BackupExecutionOperationState::Completed;
95 operation.state_updated_at.clone_from(&updated_at);
96 operation.blocking_reasons.clear();
97 } else if operation.state == BackupExecutionOperationState::Blocked {
98 operation.state = BackupExecutionOperationState::Ready;
99 operation.blocking_reasons.clear();
100 }
101 }
102 self.refresh_restart_required();
103 self.validate()
104 }
105
106 pub fn accept_preflight_receipts_at(
108 &mut self,
109 receipts: &BackupExecutionPreflightReceipts,
110 updated_at: Option<String>,
111 ) -> Result<(), BackupExecutionJournalError> {
112 validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
113 if receipts.plan_id != self.plan_id {
114 return Err(BackupExecutionJournalError::PreflightPlanMismatch {
115 expected: self.plan_id.clone(),
116 actual: receipts.plan_id.clone(),
117 });
118 }
119 self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
120 }
121
122 #[must_use]
124 pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
125 self.operations
126 .iter()
127 .filter(|operation| {
128 matches!(
129 operation.state,
130 BackupExecutionOperationState::Ready
131 | BackupExecutionOperationState::Pending
132 | BackupExecutionOperationState::Failed
133 )
134 })
135 .min_by_key(|operation| operation.sequence)
136 }
137
138 pub fn mark_next_operation_pending_at(
140 &mut self,
141 updated_at: Option<String>,
142 ) -> Result<(), BackupExecutionJournalError> {
143 let sequence = self
144 .next_ready_operation()
145 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
146 .sequence;
147 self.mark_operation_pending_at(sequence, updated_at)
148 }
149
150 pub fn mark_operation_pending_at(
152 &mut self,
153 sequence: usize,
154 updated_at: Option<String>,
155 ) -> Result<(), BackupExecutionJournalError> {
156 validate_optional_nonempty("updated_at", updated_at.as_deref())?;
157 let expected = self
158 .next_ready_operation()
159 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
160 .sequence;
161 if sequence != expected {
162 return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
163 requested: sequence,
164 next: expected,
165 });
166 }
167 let index = self.operation_index(sequence)?;
168 let operation = &self.operations[index];
169 if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
170 return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
171 }
172 if !matches!(
173 operation.state,
174 BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
175 ) {
176 return Err(BackupExecutionJournalError::InvalidOperationTransition {
177 sequence,
178 from: operation.state.clone(),
179 to: BackupExecutionOperationState::Pending,
180 });
181 }
182
183 let operation = &mut self.operations[index];
184 operation.state = BackupExecutionOperationState::Pending;
185 operation.state_updated_at = updated_at;
186 operation.blocking_reasons.clear();
187 self.refresh_restart_required();
188 self.validate()
189 }
190
191 pub fn record_operation_receipt(
193 &mut self,
194 receipt: BackupExecutionOperationReceipt,
195 ) -> Result<(), BackupExecutionJournalError> {
196 receipt.validate_against(self)?;
197 let index = self.operation_index(receipt.sequence)?;
198 let operation = &self.operations[index];
199 if operation.state != BackupExecutionOperationState::Pending {
200 return Err(
201 BackupExecutionJournalError::ReceiptWithoutPendingOperation {
202 sequence: receipt.sequence,
203 },
204 );
205 }
206
207 let next_state = match receipt.outcome {
208 BackupExecutionOperationReceiptOutcome::Completed => {
209 BackupExecutionOperationState::Completed
210 }
211 BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
212 BackupExecutionOperationReceiptOutcome::Skipped => {
213 BackupExecutionOperationState::Skipped
214 }
215 };
216 let failure_reason = receipt.failure_reason.clone();
217 self.operation_receipts.push(receipt);
218
219 let operation = &mut self.operations[index];
220 operation.state = next_state;
221 operation.state_updated_at = self
222 .operation_receipts
223 .last()
224 .and_then(|receipt| receipt.updated_at.clone());
225 operation.blocking_reasons = failure_reason.into_iter().collect();
226 self.refresh_restart_required();
227 if let Err(error) = self.validate() {
228 self.operation_receipts.pop();
229 return Err(error);
230 }
231 Ok(())
232 }
233
234 pub fn retry_failed_operation_at(
236 &mut self,
237 sequence: usize,
238 updated_at: Option<String>,
239 ) -> Result<(), BackupExecutionJournalError> {
240 validate_optional_nonempty("updated_at", updated_at.as_deref())?;
241 let index = self.operation_index(sequence)?;
242 if self.operations[index].state != BackupExecutionOperationState::Failed {
243 return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
244 }
245 self.operations[index].state = BackupExecutionOperationState::Ready;
246 self.operations[index].state_updated_at = updated_at;
247 self.operations[index].blocking_reasons.clear();
248 self.refresh_restart_required();
249 self.validate()
250 }
251
252 #[must_use]
254 pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
255 let mut summary = BackupExecutionResumeSummary {
256 plan_id: self.plan_id.clone(),
257 run_id: self.run_id.clone(),
258 preflight_id: self.preflight_id.clone(),
259 preflight_accepted: self.preflight_accepted,
260 restart_required: self.restart_required,
261 total_operations: self.operations.len(),
262 ready_operations: 0,
263 pending_operations: 0,
264 blocked_operations: 0,
265 completed_operations: 0,
266 failed_operations: 0,
267 skipped_operations: 0,
268 next_operation: self.next_ready_operation().cloned(),
269 };
270 for operation in &self.operations {
271 match operation.state {
272 BackupExecutionOperationState::Ready => summary.ready_operations += 1,
273 BackupExecutionOperationState::Pending => summary.pending_operations += 1,
274 BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
275 BackupExecutionOperationState::Completed => summary.completed_operations += 1,
276 BackupExecutionOperationState::Failed => summary.failed_operations += 1,
277 BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
278 }
279 }
280 summary
281 }
282
283 fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
284 self.operations
285 .iter()
286 .position(|operation| operation.sequence == sequence)
287 .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
288 }
289
290 fn refresh_blocked_operations(&mut self) {
291 if self.preflight_accepted {
292 return;
293 }
294 for operation in &mut self.operations {
295 if operation_kind_is_mutating(&operation.kind) {
296 operation.state = BackupExecutionOperationState::Blocked;
297 operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
298 }
299 }
300 }
301
302 fn refresh_restart_required(&mut self) {
303 let stopped = self.operations.iter().any(|operation| {
304 operation.kind == BackupOperationKind::Stop
305 && operation.state == BackupExecutionOperationState::Completed
306 });
307 let unstarted = self.operations.iter().any(|operation| {
308 operation.kind == BackupOperationKind::Start
309 && !matches!(
310 operation.state,
311 BackupExecutionOperationState::Completed
312 | BackupExecutionOperationState::Skipped
313 )
314 });
315 self.restart_required = stopped && unstarted;
316 }
317}
318
319impl BackupExecutionJournalOperation {
320 fn from_plan_operation(operation: &crate::plan::BackupOperation) -> Self {
321 Self {
322 sequence: usize::try_from(operation.order).unwrap_or(usize::MAX),
323 operation_id: operation.operation_id.clone(),
324 kind: operation.kind.clone(),
325 target_canister_id: operation.target_canister_id.clone(),
326 state: BackupExecutionOperationState::Ready,
327 state_updated_at: None,
328 blocking_reasons: Vec::new(),
329 }
330 }
331
332 fn validate(&self) -> Result<(), BackupExecutionJournalError> {
333 validate_nonempty("operations[].operation_id", &self.operation_id)?;
334 validate_optional_nonempty(
335 "operations[].state_updated_at",
336 self.state_updated_at.as_deref(),
337 )?;
338 validate_optional_nonempty(
339 "operations[].target_canister_id",
340 self.target_canister_id.as_deref(),
341 )?;
342 match self.state {
343 BackupExecutionOperationState::Blocked | BackupExecutionOperationState::Failed
344 if self.blocking_reasons.is_empty() =>
345 {
346 Err(BackupExecutionJournalError::OperationMissingReason(
347 self.sequence,
348 ))
349 }
350 BackupExecutionOperationState::Ready
351 | BackupExecutionOperationState::Pending
352 | BackupExecutionOperationState::Completed
353 | BackupExecutionOperationState::Skipped
354 if !self.blocking_reasons.is_empty() =>
355 {
356 Err(BackupExecutionJournalError::UnblockedOperationHasReasons(
357 self.sequence,
358 ))
359 }
360 BackupExecutionOperationState::Ready
361 | BackupExecutionOperationState::Pending
362 | BackupExecutionOperationState::Blocked
363 | BackupExecutionOperationState::Completed
364 | BackupExecutionOperationState::Failed
365 | BackupExecutionOperationState::Skipped => Ok(()),
366 }
367 }
368}
369
370impl BackupExecutionOperationReceipt {
371 #[must_use]
373 pub fn completed(
374 journal: &BackupExecutionJournal,
375 operation: &BackupExecutionJournalOperation,
376 updated_at: Option<String>,
377 ) -> Self {
378 Self::from_operation(
379 journal,
380 operation,
381 BackupExecutionOperationReceiptOutcome::Completed,
382 updated_at,
383 None,
384 )
385 }
386
387 #[must_use]
389 pub fn failed(
390 journal: &BackupExecutionJournal,
391 operation: &BackupExecutionJournalOperation,
392 updated_at: Option<String>,
393 failure_reason: String,
394 ) -> Self {
395 Self::from_operation(
396 journal,
397 operation,
398 BackupExecutionOperationReceiptOutcome::Failed,
399 updated_at,
400 Some(failure_reason),
401 )
402 }
403
404 fn from_operation(
405 journal: &BackupExecutionJournal,
406 operation: &BackupExecutionJournalOperation,
407 outcome: BackupExecutionOperationReceiptOutcome,
408 updated_at: Option<String>,
409 failure_reason: Option<String>,
410 ) -> Self {
411 Self {
412 plan_id: journal.plan_id.clone(),
413 run_id: journal.run_id.clone(),
414 preflight_id: journal.preflight_id.clone(),
415 sequence: operation.sequence,
416 operation_id: operation.operation_id.clone(),
417 kind: operation.kind.clone(),
418 target_canister_id: operation.target_canister_id.clone(),
419 outcome,
420 updated_at,
421 snapshot_id: None,
422 artifact_path: None,
423 checksum: None,
424 failure_reason,
425 }
426 }
427
428 fn validate_against(
429 &self,
430 journal: &BackupExecutionJournal,
431 ) -> Result<(), BackupExecutionJournalError> {
432 validate_nonempty("operation_receipts[].plan_id", &self.plan_id)?;
433 validate_nonempty("operation_receipts[].run_id", &self.run_id)?;
434 validate_nonempty("operation_receipts[].operation_id", &self.operation_id)?;
435 validate_optional_nonempty(
436 "operation_receipts[].updated_at",
437 self.updated_at.as_deref(),
438 )?;
439 validate_optional_nonempty(
440 "operation_receipts[].snapshot_id",
441 self.snapshot_id.as_deref(),
442 )?;
443 validate_optional_nonempty(
444 "operation_receipts[].artifact_path",
445 self.artifact_path.as_deref(),
446 )?;
447 validate_optional_nonempty("operation_receipts[].checksum", self.checksum.as_deref())?;
448
449 if self.plan_id != journal.plan_id || self.run_id != journal.run_id {
450 return Err(BackupExecutionJournalError::ReceiptJournalMismatch {
451 sequence: self.sequence,
452 });
453 }
454 let operation = journal
455 .operations
456 .iter()
457 .find(|operation| operation.sequence == self.sequence)
458 .ok_or(BackupExecutionJournalError::ReceiptOperationNotFound(
459 self.sequence,
460 ))?;
461 if operation.operation_id != self.operation_id
462 || operation.kind != self.kind
463 || operation.target_canister_id != self.target_canister_id
464 {
465 return Err(BackupExecutionJournalError::ReceiptOperationMismatch {
466 sequence: self.sequence,
467 });
468 }
469 if operation_kind_is_mutating(&operation.kind) && self.preflight_id != journal.preflight_id
470 {
471 return Err(BackupExecutionJournalError::ReceiptPreflightMismatch {
472 sequence: self.sequence,
473 });
474 }
475 if self.outcome == BackupExecutionOperationReceiptOutcome::Failed {
476 validate_nonempty(
477 "operation_receipts[].failure_reason",
478 self.failure_reason.as_deref().unwrap_or_default(),
479 )?;
480 }
481 if self.kind == BackupOperationKind::CreateSnapshot
482 && self.outcome == BackupExecutionOperationReceiptOutcome::Completed
483 {
484 validate_nonempty(
485 "operation_receipts[].snapshot_id",
486 self.snapshot_id.as_deref().unwrap_or_default(),
487 )?;
488 }
489 if self.kind == BackupOperationKind::DownloadSnapshot
490 && self.outcome == BackupExecutionOperationReceiptOutcome::Completed
491 {
492 validate_nonempty(
493 "operation_receipts[].artifact_path",
494 self.artifact_path.as_deref().unwrap_or_default(),
495 )?;
496 }
497 if self.kind == BackupOperationKind::VerifyArtifact
498 && self.outcome == BackupExecutionOperationReceiptOutcome::Completed
499 {
500 validate_nonempty(
501 "operation_receipts[].checksum",
502 self.checksum.as_deref().unwrap_or_default(),
503 )?;
504 }
505
506 Ok(())
507 }
508}
509
510const fn operation_kind_is_preflight(kind: &BackupOperationKind) -> bool {
511 matches!(
512 kind,
513 BackupOperationKind::ValidateTopology
514 | BackupOperationKind::ValidateControlAuthority
515 | BackupOperationKind::ValidateSnapshotReadAuthority
516 | BackupOperationKind::ValidateQuiescencePolicy
517 )
518}
519
520const fn operation_kind_is_mutating(kind: &BackupOperationKind) -> bool {
521 !operation_kind_is_preflight(kind)
522}
523
524fn validate_operation_sequences(
525 operations: &[BackupExecutionJournalOperation],
526) -> Result<(), BackupExecutionJournalError> {
527 let mut sequences = std::collections::BTreeSet::new();
528 for operation in operations {
529 if !sequences.insert(operation.sequence) {
530 return Err(BackupExecutionJournalError::DuplicateSequence(
531 operation.sequence,
532 ));
533 }
534 }
535 for expected in 0..operations.len() {
536 if !sequences.contains(&expected) {
537 return Err(BackupExecutionJournalError::MissingSequence(expected));
538 }
539 }
540 Ok(())
541}
542
543fn validate_nonempty(field: &'static str, value: &str) -> Result<(), BackupExecutionJournalError> {
544 if value.trim().is_empty() {
545 Err(BackupExecutionJournalError::MissingField(field))
546 } else {
547 Ok(())
548 }
549}
550
551fn validate_optional_nonempty(
552 field: &'static str,
553 value: Option<&str>,
554) -> Result<(), BackupExecutionJournalError> {
555 match value {
556 Some(value) => validate_nonempty(field, value),
557 None => Ok(()),
558 }
559}
560
561#[cfg(test)]
562mod tests;