Skip to main content

canic_backup/restore/apply/journal/
mod.rs

1use super::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeSet;
4use thiserror::Error as ThisError;
5
6mod commands;
7mod receipts;
8mod reports;
9
10pub use commands::{
11    RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
12};
13pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
14pub use receipts::{
15    RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
16};
17pub(in crate::restore) use reports::RestoreApplyJournalReport;
18pub use reports::{
19    RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
20    RestoreApplyReportOutcome,
21};
22
23///
24/// RestoreApplyJournal
25///
26
27#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
28pub struct RestoreApplyJournal {
29    pub journal_version: u16,
30    pub backup_id: String,
31    pub ready: bool,
32    pub blocked_reasons: Vec<String>,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub backup_root: Option<String>,
35    pub operation_count: usize,
36    pub operation_counts: RestoreApplyOperationKindCounts,
37    pub pending_operations: usize,
38    pub ready_operations: usize,
39    pub blocked_operations: usize,
40    pub completed_operations: usize,
41    pub failed_operations: usize,
42    pub operations: Vec<RestoreApplyJournalOperation>,
43    #[serde(default, skip_serializing_if = "Vec::is_empty")]
44    pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
45}
46
47impl RestoreApplyJournal {
48    /// Build the initial no-mutation restore apply journal from a dry-run.
49    #[must_use]
50    pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
51        let blocked_reasons = restore_apply_blocked_reasons(dry_run);
52        let initial_state = if blocked_reasons.is_empty() {
53            RestoreApplyOperationState::Ready
54        } else {
55            RestoreApplyOperationState::Blocked
56        };
57        let operations = dry_run
58            .operations
59            .iter()
60            .map(|operation| {
61                RestoreApplyJournalOperation::from_dry_run_operation(
62                    operation,
63                    initial_state.clone(),
64                    &blocked_reasons,
65                )
66            })
67            .collect::<Vec<_>>();
68        let ready_operations = operations
69            .iter()
70            .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
71            .count();
72        let blocked_operations = operations
73            .iter()
74            .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
75            .count();
76        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
77
78        Self {
79            journal_version: 1,
80            backup_id: dry_run.backup_id.clone(),
81            ready: blocked_reasons.is_empty(),
82            blocked_reasons,
83            backup_root: dry_run
84                .artifact_validation
85                .as_ref()
86                .map(|validation| validation.backup_root.clone()),
87            operation_count: operations.len(),
88            operation_counts,
89            pending_operations: 0,
90            ready_operations,
91            blocked_operations,
92            completed_operations: 0,
93            failed_operations: 0,
94            operations,
95            operation_receipts: Vec::new(),
96        }
97    }
98
99    /// Validate the structural consistency of a restore apply journal.
100    pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
101        validate_apply_journal_version(self.journal_version)?;
102        validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
103        if let Some(backup_root) = &self.backup_root {
104            validate_apply_journal_nonempty("backup_root", backup_root)?;
105        }
106        validate_apply_journal_count(
107            "operation_count",
108            self.operation_count,
109            self.operations.len(),
110        )?;
111
112        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
113        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
114        self.operation_counts.validate_matches(&operation_counts)?;
115        validate_apply_journal_count(
116            "pending_operations",
117            self.pending_operations,
118            state_counts.pending,
119        )?;
120        validate_apply_journal_count(
121            "ready_operations",
122            self.ready_operations,
123            state_counts.ready,
124        )?;
125        validate_apply_journal_count(
126            "blocked_operations",
127            self.blocked_operations,
128            state_counts.blocked,
129        )?;
130        validate_apply_journal_count(
131            "completed_operations",
132            self.completed_operations,
133            state_counts.completed,
134        )?;
135        validate_apply_journal_count(
136            "failed_operations",
137            self.failed_operations,
138            state_counts.failed,
139        )?;
140
141        if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
142            return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
143        }
144
145        validate_apply_journal_sequences(&self.operations)?;
146        for operation in &self.operations {
147            operation.validate()?;
148        }
149        for receipt in &self.operation_receipts {
150            receipt.validate_against(self)?;
151        }
152
153        Ok(())
154    }
155
156    /// Build an operator-oriented report from this apply journal.
157    #[must_use]
158    pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
159        RestoreApplyJournalReport::from_journal(self)
160    }
161
162    /// Return the next ready or pending operation that controls runner progress.
163    #[must_use]
164    pub(in crate::restore) fn next_transition_operation(
165        &self,
166    ) -> Option<&RestoreApplyJournalOperation> {
167        self.operations
168            .iter()
169            .filter(|operation| {
170                matches!(
171                    operation.state,
172                    RestoreApplyOperationState::Ready
173                        | RestoreApplyOperationState::Pending
174                        | RestoreApplyOperationState::Failed
175                )
176            })
177            .min_by_key(|operation| operation.sequence)
178    }
179
180    /// Render the next transitionable operation as a no-execute command preview.
181    #[must_use]
182    pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
183        RestoreApplyCommandPreview::from_journal(self)
184    }
185
186    /// Render the next transitionable operation with a configured command preview.
187    #[must_use]
188    pub(in crate::restore) fn next_command_preview_with_config(
189        &self,
190        config: &RestoreApplyCommandConfig,
191    ) -> RestoreApplyCommandPreview {
192        RestoreApplyCommandPreview::from_journal_with_config(self, config)
193    }
194
195    /// Store one durable operation receipt/output and revalidate the journal.
196    pub(in crate::restore) fn record_operation_receipt(
197        &mut self,
198        receipt: RestoreApplyOperationReceipt,
199    ) -> Result<(), RestoreApplyJournalError> {
200        self.operation_receipts.push(receipt);
201        if let Err(error) = self.validate() {
202            self.operation_receipts.pop();
203            return Err(error);
204        }
205
206        Ok(())
207    }
208
209    /// Mark the next transitionable operation pending with an update marker.
210    pub fn mark_next_operation_pending_at(
211        &mut self,
212        updated_at: Option<String>,
213    ) -> Result<(), RestoreApplyJournalError> {
214        let sequence = self
215            .next_transition_sequence()
216            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
217        self.mark_operation_pending_at(sequence, updated_at)
218    }
219
220    /// Mark one restore apply operation pending with an update marker.
221    pub(in crate::restore) fn mark_operation_pending_at(
222        &mut self,
223        sequence: usize,
224        updated_at: Option<String>,
225    ) -> Result<(), RestoreApplyJournalError> {
226        self.transition_operation(
227            sequence,
228            RestoreApplyOperationState::Pending,
229            Vec::new(),
230            updated_at,
231        )
232    }
233
234    /// Mark the current pending operation ready again with an update marker.
235    pub(in crate::restore) fn mark_next_operation_ready_at(
236        &mut self,
237        updated_at: Option<String>,
238    ) -> Result<(), RestoreApplyJournalError> {
239        let operation = self
240            .next_transition_operation()
241            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
242        if operation.state != RestoreApplyOperationState::Pending {
243            return Err(RestoreApplyJournalError::NoPendingOperation);
244        }
245
246        self.mark_operation_ready_at(operation.sequence, updated_at)
247    }
248
249    /// Mark one restore apply operation ready again with an update marker.
250    pub(in crate::restore) fn mark_operation_ready_at(
251        &mut self,
252        sequence: usize,
253        updated_at: Option<String>,
254    ) -> Result<(), RestoreApplyJournalError> {
255        self.transition_operation(
256            sequence,
257            RestoreApplyOperationState::Ready,
258            Vec::new(),
259            updated_at,
260        )
261    }
262
263    /// Retry one failed restore apply operation by moving it back to ready.
264    pub fn retry_failed_operation_at(
265        &mut self,
266        sequence: usize,
267        updated_at: Option<String>,
268    ) -> Result<(), RestoreApplyJournalError> {
269        self.transition_operation(
270            sequence,
271            RestoreApplyOperationState::Ready,
272            Vec::new(),
273            updated_at,
274        )
275    }
276
277    /// Mark one restore apply operation completed with an update marker.
278    pub(in crate::restore) fn mark_operation_completed_at(
279        &mut self,
280        sequence: usize,
281        updated_at: Option<String>,
282    ) -> Result<(), RestoreApplyJournalError> {
283        self.transition_operation(
284            sequence,
285            RestoreApplyOperationState::Completed,
286            Vec::new(),
287            updated_at,
288        )
289    }
290
291    /// Mark one restore apply operation failed with an update marker.
292    pub(in crate::restore) fn mark_operation_failed_at(
293        &mut self,
294        sequence: usize,
295        reason: String,
296        updated_at: Option<String>,
297    ) -> Result<(), RestoreApplyJournalError> {
298        if reason.trim().is_empty() {
299            return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
300        }
301
302        self.transition_operation(
303            sequence,
304            RestoreApplyOperationState::Failed,
305            vec![reason],
306            updated_at,
307        )
308    }
309
310    // Apply one legal operation state transition and revalidate the journal.
311    fn transition_operation(
312        &mut self,
313        sequence: usize,
314        next_state: RestoreApplyOperationState,
315        blocking_reasons: Vec<String>,
316        updated_at: Option<String>,
317    ) -> Result<(), RestoreApplyJournalError> {
318        let index = self
319            .operations
320            .iter()
321            .position(|operation| operation.sequence == sequence)
322            .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
323        let operation = &self.operations[index];
324
325        if !operation.can_transition_to(&next_state) {
326            return Err(RestoreApplyJournalError::InvalidOperationTransition {
327                sequence,
328                from: operation.state.clone(),
329                to: next_state,
330            });
331        }
332
333        self.validate_operation_transition_order(operation, &next_state)?;
334
335        let operation = &mut self.operations[index];
336        operation.state = next_state;
337        operation.blocking_reasons = blocking_reasons;
338        operation.state_updated_at = updated_at;
339        self.refresh_operation_counts();
340        self.validate()
341    }
342
343    // Ensure fresh operation transitions advance in journal order.
344    fn validate_operation_transition_order(
345        &self,
346        operation: &RestoreApplyJournalOperation,
347        next_state: &RestoreApplyOperationState,
348    ) -> Result<(), RestoreApplyJournalError> {
349        if operation.state == *next_state {
350            return Ok(());
351        }
352
353        let next_sequence = self
354            .next_transition_sequence()
355            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
356
357        if operation.sequence == next_sequence {
358            return Ok(());
359        }
360
361        Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
362            requested: operation.sequence,
363            next: next_sequence,
364        })
365    }
366
367    // Return the next operation sequence that can be advanced by a runner.
368    fn next_transition_sequence(&self) -> Option<usize> {
369        self.next_transition_operation()
370            .map(|operation| operation.sequence)
371    }
372
373    // Recompute operation counts after a journal operation state change.
374    fn refresh_operation_counts(&mut self) {
375        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
376        self.operation_count = self.operations.len();
377        self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
378        self.pending_operations = state_counts.pending;
379        self.ready_operations = state_counts.ready;
380        self.blocked_operations = state_counts.blocked;
381        self.completed_operations = state_counts.completed;
382        self.failed_operations = state_counts.failed;
383    }
384
385    // Return whether every planned operation has completed.
386    pub(super) const fn is_complete(&self) -> bool {
387        self.operation_count > 0 && self.completed_operations == self.operation_count
388    }
389
390    // Recompute operation-kind counts from concrete operation rows.
391    pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
392        RestoreApplyOperationKindCounts::from_operations(&self.operations)
393    }
394
395    // Find the uploaded target snapshot ID required by one load operation.
396    pub(super) fn uploaded_snapshot_id_for_load(
397        &self,
398        load: &RestoreApplyJournalOperation,
399    ) -> Option<&str> {
400        self.operation_receipts
401            .iter()
402            .find(|receipt| {
403                receipt.matches_load_operation(load)
404                    && self.operations.iter().any(|operation| {
405                        operation.sequence == receipt.sequence
406                            && operation.operation == RestoreApplyOperationKind::UploadSnapshot
407                            && operation.state == RestoreApplyOperationState::Completed
408                    })
409            })
410            .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
411    }
412}
413
414// Validate the supported restore apply journal format version.
415const fn validate_apply_journal_version(version: u16) -> Result<(), RestoreApplyJournalError> {
416    if version == 1 {
417        return Ok(());
418    }
419
420    Err(RestoreApplyJournalError::UnsupportedVersion(version))
421}
422
423// Validate required nonempty restore apply journal fields.
424fn validate_apply_journal_nonempty(
425    field: &'static str,
426    value: &str,
427) -> Result<(), RestoreApplyJournalError> {
428    if !value.trim().is_empty() {
429        return Ok(());
430    }
431
432    Err(RestoreApplyJournalError::MissingField(field))
433}
434
435// Validate one reported restore apply journal count.
436const fn validate_apply_journal_count(
437    field: &'static str,
438    reported: usize,
439    actual: usize,
440) -> Result<(), RestoreApplyJournalError> {
441    if reported == actual {
442        return Ok(());
443    }
444
445    Err(RestoreApplyJournalError::CountMismatch {
446        field,
447        reported,
448        actual,
449    })
450}
451
452// Validate operation sequence values are unique and contiguous from zero.
453fn validate_apply_journal_sequences(
454    operations: &[RestoreApplyJournalOperation],
455) -> Result<(), RestoreApplyJournalError> {
456    let mut sequences = BTreeSet::new();
457    for operation in operations {
458        if !sequences.insert(operation.sequence) {
459            return Err(RestoreApplyJournalError::DuplicateSequence(
460                operation.sequence,
461            ));
462        }
463    }
464
465    for expected in 0..operations.len() {
466        if !sequences.contains(&expected) {
467            return Err(RestoreApplyJournalError::MissingSequence(expected));
468        }
469    }
470
471    Ok(())
472}
473
474///
475/// RestoreApplyJournalStateCounts
476///
477
478#[derive(Clone, Debug, Default, Eq, PartialEq)]
479struct RestoreApplyJournalStateCounts {
480    pending: usize,
481    ready: usize,
482    blocked: usize,
483    completed: usize,
484    failed: usize,
485}
486
487impl RestoreApplyJournalStateCounts {
488    // Count operation states from concrete journal operation rows.
489    fn from_operations(operations: &[RestoreApplyJournalOperation]) -> Self {
490        let mut counts = Self::default();
491        for operation in operations {
492            match operation.state {
493                RestoreApplyOperationState::Pending => counts.pending += 1,
494                RestoreApplyOperationState::Ready => counts.ready += 1,
495                RestoreApplyOperationState::Blocked => counts.blocked += 1,
496                RestoreApplyOperationState::Completed => counts.completed += 1,
497                RestoreApplyOperationState::Failed => counts.failed += 1,
498            }
499        }
500        counts
501    }
502}
503
504///
505/// RestoreApplyOperationKindCounts
506///
507
508#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
509pub struct RestoreApplyOperationKindCounts {
510    pub snapshot_uploads: usize,
511    pub snapshot_loads: usize,
512    pub member_verifications: usize,
513    pub fleet_verifications: usize,
514    pub verification_operations: usize,
515}
516
517impl RestoreApplyOperationKindCounts {
518    /// Count restore apply journal operations by runner operation kind.
519    #[must_use]
520    pub fn from_operations(operations: &[RestoreApplyJournalOperation]) -> Self {
521        let mut counts = Self::default();
522        for operation in operations {
523            counts.record(&operation.operation);
524        }
525        counts
526    }
527
528    /// Validate this count object against concrete operations.
529    pub fn validate_matches(&self, expected: &Self) -> Result<(), RestoreApplyJournalError> {
530        validate_apply_journal_count(
531            "operation_counts.snapshot_uploads",
532            self.snapshot_uploads,
533            expected.snapshot_uploads,
534        )?;
535        validate_apply_journal_count(
536            "operation_counts.snapshot_loads",
537            self.snapshot_loads,
538            expected.snapshot_loads,
539        )?;
540        validate_apply_journal_count(
541            "operation_counts.member_verifications",
542            self.member_verifications,
543            expected.member_verifications,
544        )?;
545        validate_apply_journal_count(
546            "operation_counts.fleet_verifications",
547            self.fleet_verifications,
548            expected.fleet_verifications,
549        )?;
550        validate_apply_journal_count(
551            "operation_counts.verification_operations",
552            self.verification_operations,
553            expected.verification_operations,
554        )
555    }
556
557    /// Count restore apply dry-run operations by runner operation kind.
558    #[must_use]
559    pub fn from_dry_run_operations(operations: &[RestoreApplyDryRunOperation]) -> Self {
560        let mut counts = Self::default();
561        for operation in operations {
562            counts.record(&operation.operation);
563        }
564        counts
565    }
566
567    // Record one operation kind in the aggregate count object.
568    const fn record(&mut self, operation: &RestoreApplyOperationKind) {
569        match operation {
570            RestoreApplyOperationKind::UploadSnapshot => self.snapshot_uploads += 1,
571            RestoreApplyOperationKind::LoadSnapshot => self.snapshot_loads += 1,
572            RestoreApplyOperationKind::VerifyMember => {
573                self.member_verifications += 1;
574                self.verification_operations += 1;
575            }
576            RestoreApplyOperationKind::VerifyFleet => {
577                self.fleet_verifications += 1;
578                self.verification_operations += 1;
579            }
580        }
581    }
582}
583
584// Explain why an apply journal is blocked before mutation is allowed.
585fn restore_apply_blocked_reasons(dry_run: &RestoreApplyDryRun) -> Vec<String> {
586    let mut reasons = dry_run.readiness_reasons.clone();
587
588    match &dry_run.artifact_validation {
589        Some(validation) => {
590            if !validation.artifacts_present {
591                reasons.push("missing-artifacts".to_string());
592            }
593            if !validation.checksums_verified {
594                reasons.push("artifact-checksum-validation-incomplete".to_string());
595            }
596        }
597        None => reasons.push("missing-artifact-validation".to_string()),
598    }
599
600    reasons.sort();
601    reasons.dedup();
602    reasons
603}
604
605///
606/// RestoreApplyJournalOperation
607///
608
609#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
610pub struct RestoreApplyJournalOperation {
611    pub sequence: usize,
612    pub operation: RestoreApplyOperationKind,
613    pub state: RestoreApplyOperationState,
614    #[serde(default, skip_serializing_if = "Option::is_none")]
615    pub state_updated_at: Option<String>,
616    pub blocking_reasons: Vec<String>,
617    pub member_order: usize,
618    pub source_canister: String,
619    pub target_canister: String,
620    pub role: String,
621    pub snapshot_id: Option<String>,
622    pub artifact_path: Option<String>,
623    pub verification_kind: Option<String>,
624}
625
626impl RestoreApplyJournalOperation {
627    // Build one initial journal operation from the dry-run operation row.
628    fn from_dry_run_operation(
629        operation: &RestoreApplyDryRunOperation,
630        state: RestoreApplyOperationState,
631        blocked_reasons: &[String],
632    ) -> Self {
633        Self {
634            sequence: operation.sequence,
635            operation: operation.operation.clone(),
636            state: state.clone(),
637            state_updated_at: None,
638            blocking_reasons: if state == RestoreApplyOperationState::Blocked {
639                blocked_reasons.to_vec()
640            } else {
641                Vec::new()
642            },
643            member_order: operation.member_order,
644            source_canister: operation.source_canister.clone(),
645            target_canister: operation.target_canister.clone(),
646            role: operation.role.clone(),
647            snapshot_id: operation.snapshot_id.clone(),
648            artifact_path: operation.artifact_path.clone(),
649            verification_kind: operation.verification_kind.clone(),
650        }
651    }
652
653    // Validate one restore apply journal operation row.
654    fn validate(&self) -> Result<(), RestoreApplyJournalError> {
655        validate_apply_journal_nonempty("operations[].source_canister", &self.source_canister)?;
656        validate_apply_journal_nonempty("operations[].target_canister", &self.target_canister)?;
657        validate_apply_journal_nonempty("operations[].role", &self.role)?;
658        if let Some(updated_at) = &self.state_updated_at {
659            validate_apply_journal_nonempty("operations[].state_updated_at", updated_at)?;
660        }
661        self.validate_operation_fields()?;
662
663        match self.state {
664            RestoreApplyOperationState::Blocked if self.blocking_reasons.is_empty() => Err(
665                RestoreApplyJournalError::BlockedOperationMissingReason(self.sequence),
666            ),
667            RestoreApplyOperationState::Failed if self.blocking_reasons.is_empty() => Err(
668                RestoreApplyJournalError::FailureReasonRequired(self.sequence),
669            ),
670            RestoreApplyOperationState::Pending
671            | RestoreApplyOperationState::Ready
672            | RestoreApplyOperationState::Completed
673                if !self.blocking_reasons.is_empty() =>
674            {
675                Err(RestoreApplyJournalError::UnblockedOperationHasReasons(
676                    self.sequence,
677                ))
678            }
679            RestoreApplyOperationState::Blocked
680            | RestoreApplyOperationState::Failed
681            | RestoreApplyOperationState::Pending
682            | RestoreApplyOperationState::Ready
683            | RestoreApplyOperationState::Completed => Ok(()),
684        }
685    }
686
687    // Validate fields required by the operation kind before runner command rendering.
688    fn validate_operation_fields(&self) -> Result<(), RestoreApplyJournalError> {
689        match self.operation {
690            RestoreApplyOperationKind::UploadSnapshot => self
691                .validate_required_field("operations[].artifact_path", self.artifact_path.as_ref())
692                .map(|_| ()),
693            RestoreApplyOperationKind::LoadSnapshot => self
694                .validate_required_field("operations[].snapshot_id", self.snapshot_id.as_ref())
695                .map(|_| ()),
696            RestoreApplyOperationKind::VerifyMember | RestoreApplyOperationKind::VerifyFleet => {
697                let kind = self.validate_required_field(
698                    "operations[].verification_kind",
699                    self.verification_kind.as_ref(),
700                )?;
701                if kind != "status" {
702                    return Err(RestoreApplyJournalError::UnsupportedVerificationKind {
703                        sequence: self.sequence,
704                        kind: kind.to_string(),
705                    });
706                }
707                Ok(())
708            }
709        }
710    }
711
712    // Return one required optional field after checking it is present and nonempty.
713    fn validate_required_field<'a>(
714        &self,
715        field: &'static str,
716        value: Option<&'a String>,
717    ) -> Result<&'a str, RestoreApplyJournalError> {
718        let value = value.map(String::as_str).ok_or_else(|| {
719            RestoreApplyJournalError::OperationMissingField {
720                sequence: self.sequence,
721                operation: self.operation.clone(),
722                field,
723            }
724        })?;
725        if value.trim().is_empty() {
726            return Err(RestoreApplyJournalError::OperationMissingField {
727                sequence: self.sequence,
728                operation: self.operation.clone(),
729                field,
730            });
731        }
732
733        Ok(value)
734    }
735
736    // Decide whether an operation can move to the requested next state.
737    const fn can_transition_to(&self, next_state: &RestoreApplyOperationState) -> bool {
738        match (&self.state, next_state) {
739            (
740                RestoreApplyOperationState::Ready | RestoreApplyOperationState::Pending,
741                RestoreApplyOperationState::Pending,
742            )
743            | (
744                RestoreApplyOperationState::Pending | RestoreApplyOperationState::Failed,
745                RestoreApplyOperationState::Ready,
746            )
747            | (
748                RestoreApplyOperationState::Ready
749                | RestoreApplyOperationState::Pending
750                | RestoreApplyOperationState::Completed,
751                RestoreApplyOperationState::Completed,
752            )
753            | (
754                RestoreApplyOperationState::Ready
755                | RestoreApplyOperationState::Pending
756                | RestoreApplyOperationState::Failed,
757                RestoreApplyOperationState::Failed,
758            ) => true,
759            (
760                RestoreApplyOperationState::Blocked
761                | RestoreApplyOperationState::Completed
762                | RestoreApplyOperationState::Failed
763                | RestoreApplyOperationState::Pending
764                | RestoreApplyOperationState::Ready,
765                _,
766            ) => false,
767        }
768    }
769}
770
771///
772/// RestoreApplyOperationState
773///
774
775#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
776#[serde(rename_all = "kebab-case")]
777pub enum RestoreApplyOperationState {
778    Pending,
779    Ready,
780    Blocked,
781    Completed,
782    Failed,
783}
784
785///
786/// RestoreApplyJournalError
787///
788
789#[derive(Debug, ThisError)]
790pub enum RestoreApplyJournalError {
791    #[error("unsupported restore apply journal version {0}")]
792    UnsupportedVersion(u16),
793
794    #[error("restore apply journal field {0} is required")]
795    MissingField(&'static str),
796
797    #[error("restore apply journal count {field} mismatch: reported={reported}, actual={actual}")]
798    CountMismatch {
799        field: &'static str,
800        reported: usize,
801        actual: usize,
802    },
803
804    #[error("restore apply journal has duplicate operation sequence {0}")]
805    DuplicateSequence(usize),
806
807    #[error("restore apply journal is missing operation sequence {0}")]
808    MissingSequence(usize),
809
810    #[error("ready restore apply journal cannot include blocked reasons or blocked operations")]
811    ReadyJournalHasBlockingState,
812
813    #[error("blocked restore apply journal operation {0} is missing a blocking reason")]
814    BlockedOperationMissingReason(usize),
815
816    #[error("unblocked restore apply journal operation {0} cannot have blocking reasons")]
817    UnblockedOperationHasReasons(usize),
818
819    #[error("restore apply journal operation {sequence} {operation:?} is missing field {field}")]
820    OperationMissingField {
821        sequence: usize,
822        operation: RestoreApplyOperationKind,
823        field: &'static str,
824    },
825
826    #[error("restore apply journal operation {sequence} uses unsupported verification kind {kind}")]
827    UnsupportedVerificationKind { sequence: usize, kind: String },
828
829    #[error("restore apply journal operation {0} was not found")]
830    OperationNotFound(usize),
831
832    #[error("restore apply journal operation {sequence} cannot transition from {from:?} to {to:?}")]
833    InvalidOperationTransition {
834        sequence: usize,
835        from: RestoreApplyOperationState,
836        to: RestoreApplyOperationState,
837    },
838
839    #[error("failed restore apply journal operation {0} requires a reason")]
840    FailureReasonRequired(usize),
841
842    #[error("restore apply journal has no operation that can be advanced")]
843    NoTransitionableOperation,
844
845    #[error("restore apply journal has no pending operation to release")]
846    NoPendingOperation,
847
848    #[error("restore apply journal operation {requested} cannot advance before operation {next}")]
849    OutOfOrderOperationTransition { requested: usize, next: usize },
850
851    #[error("restore apply journal receipt references missing operation {0}")]
852    OperationReceiptOperationNotFound(usize),
853
854    #[error("restore apply journal receipt does not match operation {sequence}")]
855    OperationReceiptMismatch { sequence: usize },
856}
857///
858/// RestoreApplyOperationKind
859///
860
861#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
862#[serde(rename_all = "kebab-case")]
863pub enum RestoreApplyOperationKind {
864    UploadSnapshot,
865    LoadSnapshot,
866    VerifyMember,
867    VerifyFleet,
868}