Skip to main content

canic_backup/execution/
mod.rs

1mod types;
2
3pub use types::*;
4
5use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
6
7const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
8const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
9
10impl BackupExecutionJournal {
11    /// Build an execution journal from a validated backup plan.
12    pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
13        plan.validate()
14            .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
15        let operations = plan
16            .phases
17            .iter()
18            .map(BackupExecutionJournalOperation::from_plan_operation)
19            .collect::<Vec<_>>();
20        let mut journal = Self {
21            journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
22            plan_id: plan.plan_id.clone(),
23            run_id: plan.run_id.clone(),
24            preflight_id: None,
25            preflight_accepted: false,
26            restart_required: false,
27            operations,
28            operation_receipts: Vec::new(),
29        };
30        journal.refresh_blocked_operations();
31        journal.validate()?;
32        Ok(journal)
33    }
34
35    /// Validate journal structure and operation receipts.
36    pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
37        if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
38            return Err(BackupExecutionJournalError::UnsupportedVersion(
39                self.journal_version,
40            ));
41        }
42        validate_nonempty("plan_id", &self.plan_id)?;
43        validate_nonempty("run_id", &self.run_id)?;
44        if let Some(preflight_id) = &self.preflight_id {
45            validate_nonempty("preflight_id", preflight_id)?;
46        } else if self.preflight_accepted {
47            return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
48        }
49        validate_operation_sequences(&self.operations)?;
50        for operation in &self.operations {
51            operation.validate()?;
52            if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
53                match operation.state {
54                    BackupExecutionOperationState::Blocked => {}
55                    BackupExecutionOperationState::Ready
56                    | BackupExecutionOperationState::Pending
57                    | BackupExecutionOperationState::Completed
58                    | BackupExecutionOperationState::Failed
59                    | BackupExecutionOperationState::Skipped => {
60                        return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
61                            sequence: operation.sequence,
62                        });
63                    }
64                }
65            }
66        }
67        for receipt in &self.operation_receipts {
68            receipt.validate_against(self)?;
69        }
70        Ok(())
71    }
72
73    /// Mark all preflight operations completed and unblock mutating operations.
74    pub fn accept_preflight_bundle_at(
75        &mut self,
76        preflight_id: String,
77        updated_at: Option<String>,
78    ) -> Result<(), BackupExecutionJournalError> {
79        validate_nonempty("preflight_id", &preflight_id)?;
80        validate_optional_nonempty("updated_at", updated_at.as_deref())?;
81        if let Some(existing) = &self.preflight_id
82            && existing != &preflight_id
83        {
84            return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
85                existing: existing.clone(),
86                attempted: preflight_id,
87            });
88        }
89
90        self.preflight_id = Some(preflight_id);
91        self.preflight_accepted = true;
92        for operation in &mut self.operations {
93            if operation_kind_is_preflight(&operation.kind) {
94                operation.state = BackupExecutionOperationState::Completed;
95                operation.state_updated_at.clone_from(&updated_at);
96                operation.blocking_reasons.clear();
97            } else if operation.state == BackupExecutionOperationState::Blocked {
98                operation.state = BackupExecutionOperationState::Ready;
99                operation.blocking_reasons.clear();
100            }
101        }
102        self.refresh_restart_required();
103        self.validate()
104    }
105
106    /// Accept a typed preflight receipt bundle and unblock mutating operations.
107    pub fn accept_preflight_receipts_at(
108        &mut self,
109        receipts: &BackupExecutionPreflightReceipts,
110        updated_at: Option<String>,
111    ) -> Result<(), BackupExecutionJournalError> {
112        validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
113        if receipts.plan_id != self.plan_id {
114            return Err(BackupExecutionJournalError::PreflightPlanMismatch {
115                expected: self.plan_id.clone(),
116                actual: receipts.plan_id.clone(),
117            });
118        }
119        self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
120    }
121
122    /// Return the next operation that should control runner progress.
123    #[must_use]
124    pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
125        self.operations
126            .iter()
127            .filter(|operation| {
128                matches!(
129                    operation.state,
130                    BackupExecutionOperationState::Ready
131                        | BackupExecutionOperationState::Pending
132                        | BackupExecutionOperationState::Failed
133                )
134            })
135            .min_by_key(|operation| operation.sequence)
136    }
137
138    /// Mark the next transitionable operation pending.
139    pub fn mark_next_operation_pending_at(
140        &mut self,
141        updated_at: Option<String>,
142    ) -> Result<(), BackupExecutionJournalError> {
143        let sequence = self
144            .next_ready_operation()
145            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
146            .sequence;
147        self.mark_operation_pending_at(sequence, updated_at)
148    }
149
150    /// Mark one operation pending.
151    pub fn mark_operation_pending_at(
152        &mut self,
153        sequence: usize,
154        updated_at: Option<String>,
155    ) -> Result<(), BackupExecutionJournalError> {
156        validate_optional_nonempty("updated_at", updated_at.as_deref())?;
157        let expected = self
158            .next_ready_operation()
159            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
160            .sequence;
161        if sequence != expected {
162            return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
163                requested: sequence,
164                next: expected,
165            });
166        }
167        let index = self.operation_index(sequence)?;
168        let operation = &self.operations[index];
169        if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
170            return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
171        }
172        if !matches!(
173            operation.state,
174            BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
175        ) {
176            return Err(BackupExecutionJournalError::InvalidOperationTransition {
177                sequence,
178                from: operation.state.clone(),
179                to: BackupExecutionOperationState::Pending,
180            });
181        }
182
183        let operation = &mut self.operations[index];
184        operation.state = BackupExecutionOperationState::Pending;
185        operation.state_updated_at = updated_at;
186        operation.blocking_reasons.clear();
187        self.refresh_restart_required();
188        self.validate()
189    }
190
191    /// Record one operation receipt and transition the matching operation.
192    pub fn record_operation_receipt(
193        &mut self,
194        receipt: BackupExecutionOperationReceipt,
195    ) -> Result<(), BackupExecutionJournalError> {
196        receipt.validate_against(self)?;
197        let index = self.operation_index(receipt.sequence)?;
198        let operation = &self.operations[index];
199        if operation.state != BackupExecutionOperationState::Pending {
200            return Err(
201                BackupExecutionJournalError::ReceiptWithoutPendingOperation {
202                    sequence: receipt.sequence,
203                },
204            );
205        }
206
207        let next_state = match receipt.outcome {
208            BackupExecutionOperationReceiptOutcome::Completed => {
209                BackupExecutionOperationState::Completed
210            }
211            BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
212            BackupExecutionOperationReceiptOutcome::Skipped => {
213                BackupExecutionOperationState::Skipped
214            }
215        };
216        let failure_reason = receipt.failure_reason.clone();
217        self.operation_receipts.push(receipt);
218
219        let operation = &mut self.operations[index];
220        operation.state = next_state;
221        operation.state_updated_at = self
222            .operation_receipts
223            .last()
224            .and_then(|receipt| receipt.updated_at.clone());
225        operation.blocking_reasons = failure_reason.into_iter().collect();
226        self.refresh_restart_required();
227        if let Err(error) = self.validate() {
228            self.operation_receipts.pop();
229            return Err(error);
230        }
231        Ok(())
232    }
233
234    /// Move a failed operation back to ready for retry.
235    pub fn retry_failed_operation_at(
236        &mut self,
237        sequence: usize,
238        updated_at: Option<String>,
239    ) -> Result<(), BackupExecutionJournalError> {
240        validate_optional_nonempty("updated_at", updated_at.as_deref())?;
241        let index = self.operation_index(sequence)?;
242        if self.operations[index].state != BackupExecutionOperationState::Failed {
243            return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
244        }
245        self.operations[index].state = BackupExecutionOperationState::Ready;
246        self.operations[index].state_updated_at = updated_at;
247        self.operations[index].blocking_reasons.clear();
248        self.refresh_restart_required();
249        self.validate()
250    }
251
252    /// Build a compact resumability summary.
253    #[must_use]
254    pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
255        let mut summary = BackupExecutionResumeSummary {
256            plan_id: self.plan_id.clone(),
257            run_id: self.run_id.clone(),
258            preflight_id: self.preflight_id.clone(),
259            preflight_accepted: self.preflight_accepted,
260            restart_required: self.restart_required,
261            total_operations: self.operations.len(),
262            ready_operations: 0,
263            pending_operations: 0,
264            blocked_operations: 0,
265            completed_operations: 0,
266            failed_operations: 0,
267            skipped_operations: 0,
268            next_operation: self.next_ready_operation().cloned(),
269        };
270        for operation in &self.operations {
271            match operation.state {
272                BackupExecutionOperationState::Ready => summary.ready_operations += 1,
273                BackupExecutionOperationState::Pending => summary.pending_operations += 1,
274                BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
275                BackupExecutionOperationState::Completed => summary.completed_operations += 1,
276                BackupExecutionOperationState::Failed => summary.failed_operations += 1,
277                BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
278            }
279        }
280        summary
281    }
282
283    fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
284        self.operations
285            .iter()
286            .position(|operation| operation.sequence == sequence)
287            .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
288    }
289
290    fn refresh_blocked_operations(&mut self) {
291        if self.preflight_accepted {
292            return;
293        }
294        for operation in &mut self.operations {
295            if operation_kind_is_mutating(&operation.kind) {
296                operation.state = BackupExecutionOperationState::Blocked;
297                operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
298            }
299        }
300    }
301
302    fn refresh_restart_required(&mut self) {
303        let stopped = self.operations.iter().any(|operation| {
304            operation.kind == BackupOperationKind::Stop
305                && operation.state == BackupExecutionOperationState::Completed
306        });
307        let unstarted = self.operations.iter().any(|operation| {
308            operation.kind == BackupOperationKind::Start
309                && !matches!(
310                    operation.state,
311                    BackupExecutionOperationState::Completed
312                        | BackupExecutionOperationState::Skipped
313                )
314        });
315        self.restart_required = stopped && unstarted;
316    }
317}
318
319impl BackupExecutionJournalOperation {
320    fn from_plan_operation(operation: &crate::plan::BackupOperation) -> Self {
321        Self {
322            sequence: usize::try_from(operation.order).unwrap_or(usize::MAX),
323            operation_id: operation.operation_id.clone(),
324            kind: operation.kind.clone(),
325            target_canister_id: operation.target_canister_id.clone(),
326            state: BackupExecutionOperationState::Ready,
327            state_updated_at: None,
328            blocking_reasons: Vec::new(),
329        }
330    }
331
332    fn validate(&self) -> Result<(), BackupExecutionJournalError> {
333        validate_nonempty("operations[].operation_id", &self.operation_id)?;
334        validate_optional_nonempty(
335            "operations[].state_updated_at",
336            self.state_updated_at.as_deref(),
337        )?;
338        validate_optional_nonempty(
339            "operations[].target_canister_id",
340            self.target_canister_id.as_deref(),
341        )?;
342        match self.state {
343            BackupExecutionOperationState::Blocked | BackupExecutionOperationState::Failed
344                if self.blocking_reasons.is_empty() =>
345            {
346                Err(BackupExecutionJournalError::OperationMissingReason(
347                    self.sequence,
348                ))
349            }
350            BackupExecutionOperationState::Ready
351            | BackupExecutionOperationState::Pending
352            | BackupExecutionOperationState::Completed
353            | BackupExecutionOperationState::Skipped
354                if !self.blocking_reasons.is_empty() =>
355            {
356                Err(BackupExecutionJournalError::UnblockedOperationHasReasons(
357                    self.sequence,
358                ))
359            }
360            BackupExecutionOperationState::Ready
361            | BackupExecutionOperationState::Pending
362            | BackupExecutionOperationState::Blocked
363            | BackupExecutionOperationState::Completed
364            | BackupExecutionOperationState::Failed
365            | BackupExecutionOperationState::Skipped => Ok(()),
366        }
367    }
368}
369
370impl BackupExecutionOperationReceipt {
371    /// Build a completed operation receipt from one journal operation.
372    #[must_use]
373    pub fn completed(
374        journal: &BackupExecutionJournal,
375        operation: &BackupExecutionJournalOperation,
376        updated_at: Option<String>,
377    ) -> Self {
378        Self::from_operation(
379            journal,
380            operation,
381            BackupExecutionOperationReceiptOutcome::Completed,
382            updated_at,
383            None,
384        )
385    }
386
387    /// Build a failed operation receipt from one journal operation.
388    #[must_use]
389    pub fn failed(
390        journal: &BackupExecutionJournal,
391        operation: &BackupExecutionJournalOperation,
392        updated_at: Option<String>,
393        failure_reason: String,
394    ) -> Self {
395        Self::from_operation(
396            journal,
397            operation,
398            BackupExecutionOperationReceiptOutcome::Failed,
399            updated_at,
400            Some(failure_reason),
401        )
402    }
403
404    fn from_operation(
405        journal: &BackupExecutionJournal,
406        operation: &BackupExecutionJournalOperation,
407        outcome: BackupExecutionOperationReceiptOutcome,
408        updated_at: Option<String>,
409        failure_reason: Option<String>,
410    ) -> Self {
411        Self {
412            plan_id: journal.plan_id.clone(),
413            run_id: journal.run_id.clone(),
414            preflight_id: journal.preflight_id.clone(),
415            sequence: operation.sequence,
416            operation_id: operation.operation_id.clone(),
417            kind: operation.kind.clone(),
418            target_canister_id: operation.target_canister_id.clone(),
419            outcome,
420            updated_at,
421            snapshot_id: None,
422            artifact_path: None,
423            checksum: None,
424            failure_reason,
425        }
426    }
427
428    fn validate_against(
429        &self,
430        journal: &BackupExecutionJournal,
431    ) -> Result<(), BackupExecutionJournalError> {
432        validate_nonempty("operation_receipts[].plan_id", &self.plan_id)?;
433        validate_nonempty("operation_receipts[].run_id", &self.run_id)?;
434        validate_nonempty("operation_receipts[].operation_id", &self.operation_id)?;
435        validate_optional_nonempty(
436            "operation_receipts[].updated_at",
437            self.updated_at.as_deref(),
438        )?;
439        validate_optional_nonempty(
440            "operation_receipts[].snapshot_id",
441            self.snapshot_id.as_deref(),
442        )?;
443        validate_optional_nonempty(
444            "operation_receipts[].artifact_path",
445            self.artifact_path.as_deref(),
446        )?;
447        validate_optional_nonempty("operation_receipts[].checksum", self.checksum.as_deref())?;
448
449        if self.plan_id != journal.plan_id || self.run_id != journal.run_id {
450            return Err(BackupExecutionJournalError::ReceiptJournalMismatch {
451                sequence: self.sequence,
452            });
453        }
454        let operation = journal
455            .operations
456            .iter()
457            .find(|operation| operation.sequence == self.sequence)
458            .ok_or(BackupExecutionJournalError::ReceiptOperationNotFound(
459                self.sequence,
460            ))?;
461        if operation.operation_id != self.operation_id
462            || operation.kind != self.kind
463            || operation.target_canister_id != self.target_canister_id
464        {
465            return Err(BackupExecutionJournalError::ReceiptOperationMismatch {
466                sequence: self.sequence,
467            });
468        }
469        if operation_kind_is_mutating(&operation.kind) && self.preflight_id != journal.preflight_id
470        {
471            return Err(BackupExecutionJournalError::ReceiptPreflightMismatch {
472                sequence: self.sequence,
473            });
474        }
475        if self.outcome == BackupExecutionOperationReceiptOutcome::Failed {
476            validate_nonempty(
477                "operation_receipts[].failure_reason",
478                self.failure_reason.as_deref().unwrap_or_default(),
479            )?;
480        }
481        if self.kind == BackupOperationKind::CreateSnapshot
482            && self.outcome == BackupExecutionOperationReceiptOutcome::Completed
483        {
484            validate_nonempty(
485                "operation_receipts[].snapshot_id",
486                self.snapshot_id.as_deref().unwrap_or_default(),
487            )?;
488        }
489        if self.kind == BackupOperationKind::DownloadSnapshot
490            && self.outcome == BackupExecutionOperationReceiptOutcome::Completed
491        {
492            validate_nonempty(
493                "operation_receipts[].artifact_path",
494                self.artifact_path.as_deref().unwrap_or_default(),
495            )?;
496        }
497        if self.kind == BackupOperationKind::VerifyArtifact
498            && self.outcome == BackupExecutionOperationReceiptOutcome::Completed
499        {
500            validate_nonempty(
501                "operation_receipts[].checksum",
502                self.checksum.as_deref().unwrap_or_default(),
503            )?;
504        }
505
506        Ok(())
507    }
508}
509
510const fn operation_kind_is_preflight(kind: &BackupOperationKind) -> bool {
511    matches!(
512        kind,
513        BackupOperationKind::ValidateTopology
514            | BackupOperationKind::ValidateControlAuthority
515            | BackupOperationKind::ValidateSnapshotReadAuthority
516            | BackupOperationKind::ValidateQuiescencePolicy
517    )
518}
519
520const fn operation_kind_is_mutating(kind: &BackupOperationKind) -> bool {
521    !operation_kind_is_preflight(kind)
522}
523
524fn validate_operation_sequences(
525    operations: &[BackupExecutionJournalOperation],
526) -> Result<(), BackupExecutionJournalError> {
527    let mut sequences = std::collections::BTreeSet::new();
528    for operation in operations {
529        if !sequences.insert(operation.sequence) {
530            return Err(BackupExecutionJournalError::DuplicateSequence(
531                operation.sequence,
532            ));
533        }
534    }
535    for expected in 0..operations.len() {
536        if !sequences.contains(&expected) {
537            return Err(BackupExecutionJournalError::MissingSequence(expected));
538        }
539    }
540    Ok(())
541}
542
543fn validate_nonempty(field: &'static str, value: &str) -> Result<(), BackupExecutionJournalError> {
544    if value.trim().is_empty() {
545        Err(BackupExecutionJournalError::MissingField(field))
546    } else {
547        Ok(())
548    }
549}
550
551fn validate_optional_nonempty(
552    field: &'static str,
553    value: Option<&str>,
554) -> Result<(), BackupExecutionJournalError> {
555    match value {
556        Some(value) => validate_nonempty(field, value),
557        None => Ok(()),
558    }
559}
560
561#[cfg(test)]
562mod tests;