Skip to main content

canic_backup/restore/apply/journal/
mod.rs

1use super::{RestoreApplyDryRun, RestoreApplyDryRunOperation, RestoreApplyDryRunPhase};
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeSet;
4use thiserror::Error as ThisError;
5
6mod commands;
7mod receipts;
8mod reports;
9
10pub use commands::{
11    RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
12};
13pub use receipts::{
14    RestoreApplyCommandOutput, RestoreApplyCommandOutputPair, RestoreApplyOperationReceipt,
15    RestoreApplyOperationReceiptOutcome,
16};
17pub use reports::{
18    RestoreApplyJournalReport, RestoreApplyJournalStatus, RestoreApplyNextOperation,
19    RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
20    RestoreApplyReportOutcome,
21};
22
23///
24/// RestoreApplyJournal
25///
26
27#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
28pub struct RestoreApplyJournal {
29    pub journal_version: u16,
30    pub backup_id: String,
31    pub ready: bool,
32    pub blocked_reasons: Vec<String>,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub backup_root: Option<String>,
35    pub operation_count: usize,
36    #[serde(default)]
37    pub operation_counts: RestoreApplyOperationKindCounts,
38    pub pending_operations: usize,
39    pub ready_operations: usize,
40    pub blocked_operations: usize,
41    pub completed_operations: usize,
42    pub failed_operations: usize,
43    pub operations: Vec<RestoreApplyJournalOperation>,
44    #[serde(default, skip_serializing_if = "Vec::is_empty")]
45    pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
46}
47
48impl RestoreApplyJournal {
49    /// Build the initial no-mutation restore apply journal from a dry-run.
50    #[must_use]
51    pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
52        let blocked_reasons = restore_apply_blocked_reasons(dry_run);
53        let initial_state = if blocked_reasons.is_empty() {
54            RestoreApplyOperationState::Ready
55        } else {
56            RestoreApplyOperationState::Blocked
57        };
58        let operations = dry_run
59            .phases
60            .iter()
61            .flat_map(|phase| phase.operations.iter())
62            .map(|operation| {
63                RestoreApplyJournalOperation::from_dry_run_operation(
64                    operation,
65                    initial_state.clone(),
66                    &blocked_reasons,
67                )
68            })
69            .collect::<Vec<_>>();
70        let ready_operations = operations
71            .iter()
72            .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
73            .count();
74        let blocked_operations = operations
75            .iter()
76            .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
77            .count();
78        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
79
80        Self {
81            journal_version: 1,
82            backup_id: dry_run.backup_id.clone(),
83            ready: blocked_reasons.is_empty(),
84            blocked_reasons,
85            backup_root: dry_run
86                .artifact_validation
87                .as_ref()
88                .map(|validation| validation.backup_root.clone()),
89            operation_count: operations.len(),
90            operation_counts,
91            pending_operations: 0,
92            ready_operations,
93            blocked_operations,
94            completed_operations: 0,
95            failed_operations: 0,
96            operations,
97            operation_receipts: Vec::new(),
98        }
99    }
100
101    /// Validate the structural consistency of a restore apply journal.
102    pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
103        validate_apply_journal_version(self.journal_version)?;
104        validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
105        if let Some(backup_root) = &self.backup_root {
106            validate_apply_journal_nonempty("backup_root", backup_root)?;
107        }
108        validate_apply_journal_count(
109            "operation_count",
110            self.operation_count,
111            self.operations.len(),
112        )?;
113
114        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
115        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
116        self.operation_counts
117            .validate_matches_if_supplied(&operation_counts)?;
118        validate_apply_journal_count(
119            "pending_operations",
120            self.pending_operations,
121            state_counts.pending,
122        )?;
123        validate_apply_journal_count(
124            "ready_operations",
125            self.ready_operations,
126            state_counts.ready,
127        )?;
128        validate_apply_journal_count(
129            "blocked_operations",
130            self.blocked_operations,
131            state_counts.blocked,
132        )?;
133        validate_apply_journal_count(
134            "completed_operations",
135            self.completed_operations,
136            state_counts.completed,
137        )?;
138        validate_apply_journal_count(
139            "failed_operations",
140            self.failed_operations,
141            state_counts.failed,
142        )?;
143
144        if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
145            return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
146        }
147
148        validate_apply_journal_sequences(&self.operations)?;
149        for operation in &self.operations {
150            operation.validate()?;
151        }
152        for receipt in &self.operation_receipts {
153            receipt.validate_against(self)?;
154        }
155
156        Ok(())
157    }
158
159    /// Summarize this apply journal for operators and automation.
160    #[must_use]
161    pub fn status(&self) -> RestoreApplyJournalStatus {
162        RestoreApplyJournalStatus::from_journal(self)
163    }
164
165    /// Build an operator-oriented report from this apply journal.
166    #[must_use]
167    pub fn report(&self) -> RestoreApplyJournalReport {
168        RestoreApplyJournalReport::from_journal(self)
169    }
170
171    /// Return the full next ready operation row, if one is available.
172    #[must_use]
173    pub fn next_ready_operation(&self) -> Option<&RestoreApplyJournalOperation> {
174        self.operations
175            .iter()
176            .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
177            .min_by_key(|operation| operation.sequence)
178    }
179
180    /// Return the next ready or pending operation that controls runner progress.
181    #[must_use]
182    pub fn next_transition_operation(&self) -> Option<&RestoreApplyJournalOperation> {
183        self.operations
184            .iter()
185            .filter(|operation| {
186                matches!(
187                    operation.state,
188                    RestoreApplyOperationState::Ready
189                        | RestoreApplyOperationState::Pending
190                        | RestoreApplyOperationState::Failed
191                )
192            })
193            .min_by_key(|operation| operation.sequence)
194    }
195
196    /// Render the next transitionable operation as a compact runner response.
197    #[must_use]
198    pub fn next_operation(&self) -> RestoreApplyNextOperation {
199        RestoreApplyNextOperation::from_journal(self)
200    }
201
202    /// Render the next transitionable operation as a no-execute command preview.
203    #[must_use]
204    pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
205        RestoreApplyCommandPreview::from_journal(self)
206    }
207
208    /// Render the next transitionable operation with a configured command preview.
209    #[must_use]
210    pub fn next_command_preview_with_config(
211        &self,
212        config: &RestoreApplyCommandConfig,
213    ) -> RestoreApplyCommandPreview {
214        RestoreApplyCommandPreview::from_journal_with_config(self, config)
215    }
216
217    /// Store one durable operation receipt/output and revalidate the journal.
218    pub fn record_operation_receipt(
219        &mut self,
220        receipt: RestoreApplyOperationReceipt,
221    ) -> Result<(), RestoreApplyJournalError> {
222        self.operation_receipts.push(receipt);
223        if let Err(error) = self.validate() {
224            self.operation_receipts.pop();
225            return Err(error);
226        }
227
228        Ok(())
229    }
230
231    /// Mark the next transitionable operation pending and refresh journal counts.
232    pub fn mark_next_operation_pending(&mut self) -> Result<(), RestoreApplyJournalError> {
233        self.mark_next_operation_pending_at(None)
234    }
235
236    /// Mark the next transitionable operation pending with an update marker.
237    pub fn mark_next_operation_pending_at(
238        &mut self,
239        updated_at: Option<String>,
240    ) -> Result<(), RestoreApplyJournalError> {
241        let sequence = self
242            .next_transition_sequence()
243            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
244        self.mark_operation_pending_at(sequence, updated_at)
245    }
246
247    /// Mark one restore apply operation pending and refresh journal counts.
248    pub fn mark_operation_pending(
249        &mut self,
250        sequence: usize,
251    ) -> Result<(), RestoreApplyJournalError> {
252        self.mark_operation_pending_at(sequence, None)
253    }
254
255    /// Mark one restore apply operation pending with an update marker.
256    pub fn mark_operation_pending_at(
257        &mut self,
258        sequence: usize,
259        updated_at: Option<String>,
260    ) -> Result<(), RestoreApplyJournalError> {
261        self.transition_operation(
262            sequence,
263            RestoreApplyOperationState::Pending,
264            Vec::new(),
265            updated_at,
266        )
267    }
268
269    /// Mark the current pending operation ready again and refresh counts.
270    pub fn mark_next_operation_ready(&mut self) -> Result<(), RestoreApplyJournalError> {
271        self.mark_next_operation_ready_at(None)
272    }
273
274    /// Mark the current pending operation ready again with an update marker.
275    pub fn mark_next_operation_ready_at(
276        &mut self,
277        updated_at: Option<String>,
278    ) -> Result<(), RestoreApplyJournalError> {
279        let operation = self
280            .next_transition_operation()
281            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
282        if operation.state != RestoreApplyOperationState::Pending {
283            return Err(RestoreApplyJournalError::NoPendingOperation);
284        }
285
286        self.mark_operation_ready_at(operation.sequence, updated_at)
287    }
288
289    /// Mark one restore apply operation ready again and refresh journal counts.
290    pub fn mark_operation_ready(
291        &mut self,
292        sequence: usize,
293    ) -> Result<(), RestoreApplyJournalError> {
294        self.mark_operation_ready_at(sequence, None)
295    }
296
297    /// Mark one restore apply operation ready again with an update marker.
298    pub fn mark_operation_ready_at(
299        &mut self,
300        sequence: usize,
301        updated_at: Option<String>,
302    ) -> Result<(), RestoreApplyJournalError> {
303        self.transition_operation(
304            sequence,
305            RestoreApplyOperationState::Ready,
306            Vec::new(),
307            updated_at,
308        )
309    }
310
311    /// Retry one failed restore apply operation by moving it back to ready.
312    pub fn retry_failed_operation_at(
313        &mut self,
314        sequence: usize,
315        updated_at: Option<String>,
316    ) -> Result<(), RestoreApplyJournalError> {
317        self.transition_operation(
318            sequence,
319            RestoreApplyOperationState::Ready,
320            Vec::new(),
321            updated_at,
322        )
323    }
324
325    /// Mark one restore apply operation completed and refresh journal counts.
326    pub fn mark_operation_completed(
327        &mut self,
328        sequence: usize,
329    ) -> Result<(), RestoreApplyJournalError> {
330        self.mark_operation_completed_at(sequence, None)
331    }
332
333    /// Mark one restore apply operation completed with an update marker.
334    pub fn mark_operation_completed_at(
335        &mut self,
336        sequence: usize,
337        updated_at: Option<String>,
338    ) -> Result<(), RestoreApplyJournalError> {
339        self.transition_operation(
340            sequence,
341            RestoreApplyOperationState::Completed,
342            Vec::new(),
343            updated_at,
344        )
345    }
346
347    /// Mark one restore apply operation failed and refresh journal counts.
348    pub fn mark_operation_failed(
349        &mut self,
350        sequence: usize,
351        reason: String,
352    ) -> Result<(), RestoreApplyJournalError> {
353        self.mark_operation_failed_at(sequence, reason, None)
354    }
355
356    /// Mark one restore apply operation failed with an update marker.
357    pub fn mark_operation_failed_at(
358        &mut self,
359        sequence: usize,
360        reason: String,
361        updated_at: Option<String>,
362    ) -> Result<(), RestoreApplyJournalError> {
363        if reason.trim().is_empty() {
364            return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
365        }
366
367        self.transition_operation(
368            sequence,
369            RestoreApplyOperationState::Failed,
370            vec![reason],
371            updated_at,
372        )
373    }
374
375    // Apply one legal operation state transition and revalidate the journal.
376    fn transition_operation(
377        &mut self,
378        sequence: usize,
379        next_state: RestoreApplyOperationState,
380        blocking_reasons: Vec<String>,
381        updated_at: Option<String>,
382    ) -> Result<(), RestoreApplyJournalError> {
383        let index = self
384            .operations
385            .iter()
386            .position(|operation| operation.sequence == sequence)
387            .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
388        let operation = &self.operations[index];
389
390        if !operation.can_transition_to(&next_state) {
391            return Err(RestoreApplyJournalError::InvalidOperationTransition {
392                sequence,
393                from: operation.state.clone(),
394                to: next_state,
395            });
396        }
397
398        self.validate_operation_transition_order(operation, &next_state)?;
399
400        let operation = &mut self.operations[index];
401        operation.state = next_state;
402        operation.blocking_reasons = blocking_reasons;
403        operation.state_updated_at = updated_at;
404        self.refresh_operation_counts();
405        self.validate()
406    }
407
408    // Ensure fresh operation transitions advance in journal order.
409    fn validate_operation_transition_order(
410        &self,
411        operation: &RestoreApplyJournalOperation,
412        next_state: &RestoreApplyOperationState,
413    ) -> Result<(), RestoreApplyJournalError> {
414        if operation.state == *next_state {
415            return Ok(());
416        }
417
418        let next_sequence = self
419            .next_transition_sequence()
420            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
421
422        if operation.sequence == next_sequence {
423            return Ok(());
424        }
425
426        Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
427            requested: operation.sequence,
428            next: next_sequence,
429        })
430    }
431
432    // Return the next operation sequence that can be advanced by a runner.
433    fn next_transition_sequence(&self) -> Option<usize> {
434        self.next_transition_operation()
435            .map(|operation| operation.sequence)
436    }
437
438    // Recompute operation counts after a journal operation state change.
439    fn refresh_operation_counts(&mut self) {
440        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
441        self.operation_count = self.operations.len();
442        self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
443        self.pending_operations = state_counts.pending;
444        self.ready_operations = state_counts.ready;
445        self.blocked_operations = state_counts.blocked;
446        self.completed_operations = state_counts.completed;
447        self.failed_operations = state_counts.failed;
448    }
449
450    // Return whether this journal carried a persisted operation-kind receipt.
451    pub(super) const fn operation_counts_supplied(&self) -> bool {
452        !self.operation_counts.is_empty() || self.operations.is_empty()
453    }
454
455    // Return whether every planned operation has completed.
456    pub(super) const fn is_complete(&self) -> bool {
457        self.operation_count > 0 && self.completed_operations == self.operation_count
458    }
459
460    // Recompute operation-kind counts from concrete operation rows.
461    pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
462        RestoreApplyOperationKindCounts::from_operations(&self.operations)
463    }
464
465    // Find the uploaded target snapshot ID required by one load operation.
466    pub(super) fn uploaded_snapshot_id_for_load(
467        &self,
468        load: &RestoreApplyJournalOperation,
469    ) -> Option<&str> {
470        self.operation_receipts
471            .iter()
472            .find(|receipt| {
473                receipt.matches_load_operation(load)
474                    && self.operations.iter().any(|operation| {
475                        operation.sequence == receipt.sequence
476                            && operation.operation == RestoreApplyOperationKind::UploadSnapshot
477                            && operation.state == RestoreApplyOperationState::Completed
478                    })
479            })
480            .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
481    }
482}
483
484// Validate the supported restore apply journal format version.
485const fn validate_apply_journal_version(version: u16) -> Result<(), RestoreApplyJournalError> {
486    if version == 1 {
487        return Ok(());
488    }
489
490    Err(RestoreApplyJournalError::UnsupportedVersion(version))
491}
492
493// Validate required nonempty restore apply journal fields.
494fn validate_apply_journal_nonempty(
495    field: &'static str,
496    value: &str,
497) -> Result<(), RestoreApplyJournalError> {
498    if !value.trim().is_empty() {
499        return Ok(());
500    }
501
502    Err(RestoreApplyJournalError::MissingField(field))
503}
504
505// Validate one reported restore apply journal count.
506const fn validate_apply_journal_count(
507    field: &'static str,
508    reported: usize,
509    actual: usize,
510) -> Result<(), RestoreApplyJournalError> {
511    if reported == actual {
512        return Ok(());
513    }
514
515    Err(RestoreApplyJournalError::CountMismatch {
516        field,
517        reported,
518        actual,
519    })
520}
521
522// Validate operation sequence values are unique and contiguous from zero.
523fn validate_apply_journal_sequences(
524    operations: &[RestoreApplyJournalOperation],
525) -> Result<(), RestoreApplyJournalError> {
526    let mut sequences = BTreeSet::new();
527    for operation in operations {
528        if !sequences.insert(operation.sequence) {
529            return Err(RestoreApplyJournalError::DuplicateSequence(
530                operation.sequence,
531            ));
532        }
533    }
534
535    for expected in 0..operations.len() {
536        if !sequences.contains(&expected) {
537            return Err(RestoreApplyJournalError::MissingSequence(expected));
538        }
539    }
540
541    Ok(())
542}
543
544///
545/// RestoreApplyJournalStateCounts
546///
547
548#[derive(Clone, Debug, Default, Eq, PartialEq)]
549struct RestoreApplyJournalStateCounts {
550    pending: usize,
551    ready: usize,
552    blocked: usize,
553    completed: usize,
554    failed: usize,
555}
556
557impl RestoreApplyJournalStateCounts {
558    // Count operation states from concrete journal operation rows.
559    fn from_operations(operations: &[RestoreApplyJournalOperation]) -> Self {
560        let mut counts = Self::default();
561        for operation in operations {
562            match operation.state {
563                RestoreApplyOperationState::Pending => counts.pending += 1,
564                RestoreApplyOperationState::Ready => counts.ready += 1,
565                RestoreApplyOperationState::Blocked => counts.blocked += 1,
566                RestoreApplyOperationState::Completed => counts.completed += 1,
567                RestoreApplyOperationState::Failed => counts.failed += 1,
568            }
569        }
570        counts
571    }
572}
573
574///
575/// RestoreApplyOperationKindCounts
576///
577
578#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
579pub struct RestoreApplyOperationKindCounts {
580    pub snapshot_uploads: usize,
581    pub snapshot_loads: usize,
582    pub code_reinstalls: usize,
583    pub member_verifications: usize,
584    pub fleet_verifications: usize,
585    pub verification_operations: usize,
586}
587
588impl RestoreApplyOperationKindCounts {
589    /// Count restore apply journal operations by runner operation kind.
590    #[must_use]
591    pub fn from_operations(operations: &[RestoreApplyJournalOperation]) -> Self {
592        let mut counts = Self::default();
593        for operation in operations {
594            counts.record(&operation.operation);
595        }
596        counts
597    }
598
599    /// Validate this count object against concrete operations when it was supplied.
600    pub fn validate_matches_if_supplied(
601        &self,
602        expected: &Self,
603    ) -> Result<(), RestoreApplyJournalError> {
604        if self.is_empty() && !expected.is_empty() {
605            return Ok(());
606        }
607
608        validate_apply_journal_count(
609            "operation_counts.snapshot_uploads",
610            self.snapshot_uploads,
611            expected.snapshot_uploads,
612        )?;
613        validate_apply_journal_count(
614            "operation_counts.snapshot_loads",
615            self.snapshot_loads,
616            expected.snapshot_loads,
617        )?;
618        validate_apply_journal_count(
619            "operation_counts.code_reinstalls",
620            self.code_reinstalls,
621            expected.code_reinstalls,
622        )?;
623        validate_apply_journal_count(
624            "operation_counts.member_verifications",
625            self.member_verifications,
626            expected.member_verifications,
627        )?;
628        validate_apply_journal_count(
629            "operation_counts.fleet_verifications",
630            self.fleet_verifications,
631            expected.fleet_verifications,
632        )?;
633        validate_apply_journal_count(
634            "operation_counts.verification_operations",
635            self.verification_operations,
636            expected.verification_operations,
637        )
638    }
639
640    // Return whether no operation-kind counts are present.
641    const fn is_empty(&self) -> bool {
642        self.snapshot_uploads == 0
643            && self.snapshot_loads == 0
644            && self.code_reinstalls == 0
645            && self.member_verifications == 0
646            && self.fleet_verifications == 0
647            && self.verification_operations == 0
648    }
649
650    /// Count restore apply dry-run operations by runner operation kind.
651    #[must_use]
652    pub fn from_dry_run_phases(phases: &[RestoreApplyDryRunPhase]) -> Self {
653        let mut counts = Self::default();
654        for operation in phases.iter().flat_map(|phase| {
655            phase
656                .operations
657                .iter()
658                .map(|operation| &operation.operation)
659        }) {
660            counts.record(operation);
661        }
662        counts
663    }
664
665    // Record one operation kind in the aggregate count object.
666    const fn record(&mut self, operation: &RestoreApplyOperationKind) {
667        match operation {
668            RestoreApplyOperationKind::UploadSnapshot => self.snapshot_uploads += 1,
669            RestoreApplyOperationKind::LoadSnapshot => self.snapshot_loads += 1,
670            RestoreApplyOperationKind::ReinstallCode => self.code_reinstalls += 1,
671            RestoreApplyOperationKind::VerifyMember => {
672                self.member_verifications += 1;
673                self.verification_operations += 1;
674            }
675            RestoreApplyOperationKind::VerifyFleet => {
676                self.fleet_verifications += 1;
677                self.verification_operations += 1;
678            }
679        }
680    }
681}
682
683// Explain why an apply journal is blocked before mutation is allowed.
684fn restore_apply_blocked_reasons(dry_run: &RestoreApplyDryRun) -> Vec<String> {
685    let mut reasons = dry_run.readiness_reasons.clone();
686
687    match &dry_run.artifact_validation {
688        Some(validation) => {
689            if !validation.artifacts_present {
690                reasons.push("missing-artifacts".to_string());
691            }
692            if !validation.checksums_verified {
693                reasons.push("artifact-checksum-validation-incomplete".to_string());
694            }
695        }
696        None => reasons.push("missing-artifact-validation".to_string()),
697    }
698
699    reasons.sort();
700    reasons.dedup();
701    reasons
702}
703
704///
705/// RestoreApplyJournalOperation
706///
707
708#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
709pub struct RestoreApplyJournalOperation {
710    pub sequence: usize,
711    pub operation: RestoreApplyOperationKind,
712    pub state: RestoreApplyOperationState,
713    #[serde(default, skip_serializing_if = "Option::is_none")]
714    pub state_updated_at: Option<String>,
715    pub blocking_reasons: Vec<String>,
716    pub restore_group: u16,
717    pub phase_order: usize,
718    pub source_canister: String,
719    pub target_canister: String,
720    pub role: String,
721    pub snapshot_id: Option<String>,
722    pub artifact_path: Option<String>,
723    pub verification_kind: Option<String>,
724    pub verification_method: Option<String>,
725}
726
727impl RestoreApplyJournalOperation {
728    // Build one initial journal operation from the dry-run operation row.
729    fn from_dry_run_operation(
730        operation: &RestoreApplyDryRunOperation,
731        state: RestoreApplyOperationState,
732        blocked_reasons: &[String],
733    ) -> Self {
734        Self {
735            sequence: operation.sequence,
736            operation: operation.operation.clone(),
737            state: state.clone(),
738            state_updated_at: None,
739            blocking_reasons: if state == RestoreApplyOperationState::Blocked {
740                blocked_reasons.to_vec()
741            } else {
742                Vec::new()
743            },
744            restore_group: operation.restore_group,
745            phase_order: operation.phase_order,
746            source_canister: operation.source_canister.clone(),
747            target_canister: operation.target_canister.clone(),
748            role: operation.role.clone(),
749            snapshot_id: operation.snapshot_id.clone(),
750            artifact_path: operation.artifact_path.clone(),
751            verification_kind: operation.verification_kind.clone(),
752            verification_method: operation.verification_method.clone(),
753        }
754    }
755
756    // Validate one restore apply journal operation row.
757    fn validate(&self) -> Result<(), RestoreApplyJournalError> {
758        validate_apply_journal_nonempty("operations[].source_canister", &self.source_canister)?;
759        validate_apply_journal_nonempty("operations[].target_canister", &self.target_canister)?;
760        validate_apply_journal_nonempty("operations[].role", &self.role)?;
761        if let Some(updated_at) = &self.state_updated_at {
762            validate_apply_journal_nonempty("operations[].state_updated_at", updated_at)?;
763        }
764        self.validate_operation_fields()?;
765
766        match self.state {
767            RestoreApplyOperationState::Blocked if self.blocking_reasons.is_empty() => Err(
768                RestoreApplyJournalError::BlockedOperationMissingReason(self.sequence),
769            ),
770            RestoreApplyOperationState::Failed if self.blocking_reasons.is_empty() => Err(
771                RestoreApplyJournalError::FailureReasonRequired(self.sequence),
772            ),
773            RestoreApplyOperationState::Pending
774            | RestoreApplyOperationState::Ready
775            | RestoreApplyOperationState::Completed
776                if !self.blocking_reasons.is_empty() =>
777            {
778                Err(RestoreApplyJournalError::UnblockedOperationHasReasons(
779                    self.sequence,
780                ))
781            }
782            RestoreApplyOperationState::Blocked
783            | RestoreApplyOperationState::Failed
784            | RestoreApplyOperationState::Pending
785            | RestoreApplyOperationState::Ready
786            | RestoreApplyOperationState::Completed => Ok(()),
787        }
788    }
789
790    // Validate fields required by the operation kind before runner command rendering.
791    fn validate_operation_fields(&self) -> Result<(), RestoreApplyJournalError> {
792        match self.operation {
793            RestoreApplyOperationKind::UploadSnapshot => self
794                .validate_required_field("operations[].artifact_path", self.artifact_path.as_ref())
795                .map(|_| ()),
796            RestoreApplyOperationKind::LoadSnapshot => self
797                .validate_required_field("operations[].snapshot_id", self.snapshot_id.as_ref())
798                .map(|_| ()),
799            RestoreApplyOperationKind::ReinstallCode => Ok(()),
800            RestoreApplyOperationKind::VerifyMember | RestoreApplyOperationKind::VerifyFleet => {
801                let kind = self.validate_required_field(
802                    "operations[].verification_kind",
803                    self.verification_kind.as_ref(),
804                )?;
805                if kind == "status" {
806                    return Ok(());
807                }
808                self.validate_required_field(
809                    "operations[].verification_method",
810                    self.verification_method.as_ref(),
811                )
812                .map(|_| ())
813            }
814        }
815    }
816
817    // Return one required optional field after checking it is present and nonempty.
818    fn validate_required_field<'a>(
819        &self,
820        field: &'static str,
821        value: Option<&'a String>,
822    ) -> Result<&'a str, RestoreApplyJournalError> {
823        let value = value.map(String::as_str).ok_or_else(|| {
824            RestoreApplyJournalError::OperationMissingField {
825                sequence: self.sequence,
826                operation: self.operation.clone(),
827                field,
828            }
829        })?;
830        if value.trim().is_empty() {
831            return Err(RestoreApplyJournalError::OperationMissingField {
832                sequence: self.sequence,
833                operation: self.operation.clone(),
834                field,
835            });
836        }
837
838        Ok(value)
839    }
840
841    // Decide whether an operation can move to the requested next state.
842    const fn can_transition_to(&self, next_state: &RestoreApplyOperationState) -> bool {
843        match (&self.state, next_state) {
844            (
845                RestoreApplyOperationState::Ready | RestoreApplyOperationState::Pending,
846                RestoreApplyOperationState::Pending,
847            )
848            | (
849                RestoreApplyOperationState::Pending | RestoreApplyOperationState::Failed,
850                RestoreApplyOperationState::Ready,
851            )
852            | (
853                RestoreApplyOperationState::Ready
854                | RestoreApplyOperationState::Pending
855                | RestoreApplyOperationState::Completed,
856                RestoreApplyOperationState::Completed,
857            )
858            | (
859                RestoreApplyOperationState::Ready
860                | RestoreApplyOperationState::Pending
861                | RestoreApplyOperationState::Failed,
862                RestoreApplyOperationState::Failed,
863            ) => true,
864            (
865                RestoreApplyOperationState::Blocked
866                | RestoreApplyOperationState::Completed
867                | RestoreApplyOperationState::Failed
868                | RestoreApplyOperationState::Pending
869                | RestoreApplyOperationState::Ready,
870                _,
871            ) => false,
872        }
873    }
874}
875
876///
877/// RestoreApplyOperationState
878///
879
880#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
881#[serde(rename_all = "kebab-case")]
882pub enum RestoreApplyOperationState {
883    Pending,
884    Ready,
885    Blocked,
886    Completed,
887    Failed,
888}
889
890///
891/// RestoreApplyJournalError
892///
893
894#[derive(Debug, ThisError)]
895pub enum RestoreApplyJournalError {
896    #[error("unsupported restore apply journal version {0}")]
897    UnsupportedVersion(u16),
898
899    #[error("restore apply journal field {0} is required")]
900    MissingField(&'static str),
901
902    #[error("restore apply journal count {field} mismatch: reported={reported}, actual={actual}")]
903    CountMismatch {
904        field: &'static str,
905        reported: usize,
906        actual: usize,
907    },
908
909    #[error("restore apply journal has duplicate operation sequence {0}")]
910    DuplicateSequence(usize),
911
912    #[error("restore apply journal is missing operation sequence {0}")]
913    MissingSequence(usize),
914
915    #[error("ready restore apply journal cannot include blocked reasons or blocked operations")]
916    ReadyJournalHasBlockingState,
917
918    #[error("blocked restore apply journal operation {0} is missing a blocking reason")]
919    BlockedOperationMissingReason(usize),
920
921    #[error("unblocked restore apply journal operation {0} cannot have blocking reasons")]
922    UnblockedOperationHasReasons(usize),
923
924    #[error("restore apply journal operation {sequence} {operation:?} is missing field {field}")]
925    OperationMissingField {
926        sequence: usize,
927        operation: RestoreApplyOperationKind,
928        field: &'static str,
929    },
930
931    #[error("restore apply journal operation {0} was not found")]
932    OperationNotFound(usize),
933
934    #[error("restore apply journal operation {sequence} cannot transition from {from:?} to {to:?}")]
935    InvalidOperationTransition {
936        sequence: usize,
937        from: RestoreApplyOperationState,
938        to: RestoreApplyOperationState,
939    },
940
941    #[error("failed restore apply journal operation {0} requires a reason")]
942    FailureReasonRequired(usize),
943
944    #[error("restore apply journal has no operation that can be advanced")]
945    NoTransitionableOperation,
946
947    #[error("restore apply journal has no pending operation to release")]
948    NoPendingOperation,
949
950    #[error("restore apply journal operation {requested} cannot advance before operation {next}")]
951    OutOfOrderOperationTransition { requested: usize, next: usize },
952
953    #[error("restore apply journal receipt references missing operation {0}")]
954    OperationReceiptOperationNotFound(usize),
955
956    #[error("restore apply journal receipt does not match operation {sequence}")]
957    OperationReceiptMismatch { sequence: usize },
958}
959///
960/// RestoreApplyOperationKind
961///
962
963#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
964#[serde(rename_all = "kebab-case")]
965pub enum RestoreApplyOperationKind {
966    UploadSnapshot,
967    LoadSnapshot,
968    ReinstallCode,
969    VerifyMember,
970    VerifyFleet,
971}