Skip to main content

canic_backup/restore/apply/journal/
mod.rs

1use super::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeSet;
4
5mod commands;
6mod counts;
7mod receipts;
8mod reports;
9mod types;
10
11pub use commands::{
12    RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
13};
14use counts::RestoreApplyJournalStateCounts;
15pub use counts::RestoreApplyOperationKindCounts;
16pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
17pub use receipts::{
18    RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
19};
20pub(in crate::restore) use reports::RestoreApplyJournalReport;
21pub use reports::{
22    RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
23    RestoreApplyReportOutcome,
24};
25pub use types::{
26    RestoreApplyJournalError, RestoreApplyJournalOperation, RestoreApplyOperationKind,
27    RestoreApplyOperationState,
28};
29use types::{
30    restore_apply_blocked_reasons, validate_apply_journal_count, validate_apply_journal_nonempty,
31    validate_apply_journal_sequences, validate_apply_journal_version,
32};
33
34///
35/// RestoreApplyJournal
36///
37
38#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
39pub struct RestoreApplyJournal {
40    pub journal_version: u16,
41    pub backup_id: String,
42    pub ready: bool,
43    pub blocked_reasons: Vec<String>,
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub backup_root: Option<String>,
46    pub operation_count: usize,
47    pub operation_counts: RestoreApplyOperationKindCounts,
48    pub pending_operations: usize,
49    pub ready_operations: usize,
50    pub blocked_operations: usize,
51    pub completed_operations: usize,
52    pub failed_operations: usize,
53    pub operations: Vec<RestoreApplyJournalOperation>,
54    #[serde(default, skip_serializing_if = "Vec::is_empty")]
55    pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
56}
57
58impl RestoreApplyJournal {
59    /// Build the initial no-mutation restore apply journal from a dry-run.
60    #[must_use]
61    pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
62        let blocked_reasons = restore_apply_blocked_reasons(dry_run);
63        let initial_state = if blocked_reasons.is_empty() {
64            RestoreApplyOperationState::Ready
65        } else {
66            RestoreApplyOperationState::Blocked
67        };
68        let operations = dry_run
69            .operations
70            .iter()
71            .map(|operation| {
72                RestoreApplyJournalOperation::from_dry_run_operation(
73                    operation,
74                    initial_state.clone(),
75                    &blocked_reasons,
76                )
77            })
78            .collect::<Vec<_>>();
79        let ready_operations = operations
80            .iter()
81            .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
82            .count();
83        let blocked_operations = operations
84            .iter()
85            .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
86            .count();
87        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
88
89        Self {
90            journal_version: 1,
91            backup_id: dry_run.backup_id.clone(),
92            ready: blocked_reasons.is_empty(),
93            blocked_reasons,
94            backup_root: dry_run
95                .artifact_validation
96                .as_ref()
97                .map(|validation| validation.backup_root.clone()),
98            operation_count: operations.len(),
99            operation_counts,
100            pending_operations: 0,
101            ready_operations,
102            blocked_operations,
103            completed_operations: 0,
104            failed_operations: 0,
105            operations,
106            operation_receipts: Vec::new(),
107        }
108    }
109
110    /// Validate the structural consistency of a restore apply journal.
111    pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
112        validate_apply_journal_version(self.journal_version)?;
113        validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
114        if let Some(backup_root) = &self.backup_root {
115            validate_apply_journal_nonempty("backup_root", backup_root)?;
116        }
117        validate_apply_journal_count(
118            "operation_count",
119            self.operation_count,
120            self.operations.len(),
121        )?;
122
123        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
124        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
125        self.operation_counts.validate_matches(&operation_counts)?;
126        validate_apply_journal_count(
127            "pending_operations",
128            self.pending_operations,
129            state_counts.pending,
130        )?;
131        validate_apply_journal_count(
132            "ready_operations",
133            self.ready_operations,
134            state_counts.ready,
135        )?;
136        validate_apply_journal_count(
137            "blocked_operations",
138            self.blocked_operations,
139            state_counts.blocked,
140        )?;
141        validate_apply_journal_count(
142            "completed_operations",
143            self.completed_operations,
144            state_counts.completed,
145        )?;
146        validate_apply_journal_count(
147            "failed_operations",
148            self.failed_operations,
149            state_counts.failed,
150        )?;
151
152        if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
153            return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
154        }
155
156        validate_apply_journal_sequences(&self.operations)?;
157        for operation in &self.operations {
158            operation.validate()?;
159        }
160        self.validate_operation_receipt_attempts()?;
161        for receipt in &self.operation_receipts {
162            receipt.validate_against(self)?;
163        }
164
165        Ok(())
166    }
167
168    /// Build an operator-oriented report from this apply journal.
169    #[must_use]
170    pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
171        RestoreApplyJournalReport::from_journal(self)
172    }
173
174    /// Return the next ready or pending operation that controls runner progress.
175    #[must_use]
176    pub(in crate::restore) fn next_transition_operation(
177        &self,
178    ) -> Option<&RestoreApplyJournalOperation> {
179        self.operations
180            .iter()
181            .filter(|operation| {
182                matches!(
183                    operation.state,
184                    RestoreApplyOperationState::Ready
185                        | RestoreApplyOperationState::Pending
186                        | RestoreApplyOperationState::Failed
187                )
188            })
189            .min_by_key(|operation| operation.sequence)
190    }
191
192    /// Render the next transitionable operation as a no-execute command preview.
193    #[must_use]
194    pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
195        RestoreApplyCommandPreview::from_journal(self)
196    }
197
198    /// Render the next transitionable operation with a configured command preview.
199    #[must_use]
200    pub(in crate::restore) fn next_command_preview_with_config(
201        &self,
202        config: &RestoreApplyCommandConfig,
203    ) -> RestoreApplyCommandPreview {
204        RestoreApplyCommandPreview::from_journal_with_config(self, config)
205    }
206
207    /// Store one durable operation receipt/output and revalidate the journal.
208    pub(in crate::restore) fn record_operation_receipt(
209        &mut self,
210        receipt: RestoreApplyOperationReceipt,
211    ) -> Result<(), RestoreApplyJournalError> {
212        self.operation_receipts.push(receipt);
213        if let Err(error) = self.validate() {
214            self.operation_receipts.pop();
215            return Err(error);
216        }
217
218        Ok(())
219    }
220
221    /// Mark the next transitionable operation pending with an update marker.
222    pub fn mark_next_operation_pending_at(
223        &mut self,
224        updated_at: Option<String>,
225    ) -> Result<(), RestoreApplyJournalError> {
226        let sequence = self
227            .next_transition_sequence()
228            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
229        self.mark_operation_pending_at(sequence, updated_at)
230    }
231
232    /// Mark one restore apply operation pending with an update marker.
233    pub(in crate::restore) fn mark_operation_pending_at(
234        &mut self,
235        sequence: usize,
236        updated_at: Option<String>,
237    ) -> Result<(), RestoreApplyJournalError> {
238        self.transition_operation(
239            sequence,
240            RestoreApplyOperationState::Pending,
241            Vec::new(),
242            updated_at,
243        )
244    }
245
246    /// Mark the current pending operation ready again with an update marker.
247    pub(in crate::restore) fn mark_next_operation_ready_at(
248        &mut self,
249        updated_at: Option<String>,
250    ) -> Result<(), RestoreApplyJournalError> {
251        let operation = self
252            .next_transition_operation()
253            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
254        if operation.state != RestoreApplyOperationState::Pending {
255            return Err(RestoreApplyJournalError::NoPendingOperation);
256        }
257
258        self.mark_operation_ready_at(operation.sequence, updated_at)
259    }
260
261    /// Mark one restore apply operation ready again with an update marker.
262    pub(in crate::restore) fn mark_operation_ready_at(
263        &mut self,
264        sequence: usize,
265        updated_at: Option<String>,
266    ) -> Result<(), RestoreApplyJournalError> {
267        self.transition_operation(
268            sequence,
269            RestoreApplyOperationState::Ready,
270            Vec::new(),
271            updated_at,
272        )
273    }
274
275    /// Retry one failed restore apply operation by moving it back to ready.
276    pub fn retry_failed_operation_at(
277        &mut self,
278        sequence: usize,
279        updated_at: Option<String>,
280    ) -> Result<(), RestoreApplyJournalError> {
281        self.transition_operation(
282            sequence,
283            RestoreApplyOperationState::Ready,
284            Vec::new(),
285            updated_at,
286        )
287    }
288
289    /// Mark one restore apply operation completed with an update marker.
290    pub(in crate::restore) fn mark_operation_completed_at(
291        &mut self,
292        sequence: usize,
293        updated_at: Option<String>,
294    ) -> Result<(), RestoreApplyJournalError> {
295        self.transition_operation(
296            sequence,
297            RestoreApplyOperationState::Completed,
298            Vec::new(),
299            updated_at,
300        )
301    }
302
303    /// Mark one restore apply operation failed with an update marker.
304    pub(in crate::restore) fn mark_operation_failed_at(
305        &mut self,
306        sequence: usize,
307        reason: String,
308        updated_at: Option<String>,
309    ) -> Result<(), RestoreApplyJournalError> {
310        if reason.trim().is_empty() {
311            return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
312        }
313
314        self.transition_operation(
315            sequence,
316            RestoreApplyOperationState::Failed,
317            vec![reason],
318            updated_at,
319        )
320    }
321
322    // Apply one legal operation state transition and revalidate the journal.
323    fn transition_operation(
324        &mut self,
325        sequence: usize,
326        next_state: RestoreApplyOperationState,
327        blocking_reasons: Vec<String>,
328        updated_at: Option<String>,
329    ) -> Result<(), RestoreApplyJournalError> {
330        let index = self
331            .operations
332            .iter()
333            .position(|operation| operation.sequence == sequence)
334            .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
335        let operation = &self.operations[index];
336
337        if !operation.can_transition_to(&next_state) {
338            return Err(RestoreApplyJournalError::InvalidOperationTransition {
339                sequence,
340                from: operation.state.clone(),
341                to: next_state,
342            });
343        }
344
345        self.validate_operation_transition_order(operation, &next_state)?;
346
347        let operation = &mut self.operations[index];
348        operation.state = next_state;
349        operation.blocking_reasons = blocking_reasons;
350        operation.state_updated_at = updated_at;
351        self.refresh_operation_counts();
352        self.validate()
353    }
354
355    // Ensure fresh operation transitions advance in journal order.
356    fn validate_operation_transition_order(
357        &self,
358        operation: &RestoreApplyJournalOperation,
359        next_state: &RestoreApplyOperationState,
360    ) -> Result<(), RestoreApplyJournalError> {
361        if operation.state == *next_state {
362            return Ok(());
363        }
364
365        let next_sequence = self
366            .next_transition_sequence()
367            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
368
369        if operation.sequence == next_sequence {
370            return Ok(());
371        }
372
373        Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
374            requested: operation.sequence,
375            next: next_sequence,
376        })
377    }
378
379    // Return the next operation sequence that can be advanced by a runner.
380    fn next_transition_sequence(&self) -> Option<usize> {
381        self.next_transition_operation()
382            .map(|operation| operation.sequence)
383    }
384
385    // Recompute operation counts after a journal operation state change.
386    fn refresh_operation_counts(&mut self) {
387        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
388        self.operation_count = self.operations.len();
389        self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
390        self.pending_operations = state_counts.pending;
391        self.ready_operations = state_counts.ready;
392        self.blocked_operations = state_counts.blocked;
393        self.completed_operations = state_counts.completed;
394        self.failed_operations = state_counts.failed;
395    }
396
397    // Return whether every planned operation has completed.
398    pub(super) const fn is_complete(&self) -> bool {
399        self.operation_count > 0 && self.completed_operations == self.operation_count
400    }
401
402    // Recompute operation-kind counts from concrete operation rows.
403    pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
404        RestoreApplyOperationKindCounts::from_operations(&self.operations)
405    }
406
407    // Ensure one operation attempt has exactly one durable command outcome.
408    fn validate_operation_receipt_attempts(&self) -> Result<(), RestoreApplyJournalError> {
409        let mut attempts = BTreeSet::new();
410        for receipt in &self.operation_receipts {
411            if !attempts.insert((receipt.sequence, receipt.attempt)) {
412                return Err(RestoreApplyJournalError::DuplicateOperationReceiptAttempt {
413                    sequence: receipt.sequence,
414                    attempt: receipt.attempt,
415                });
416            }
417        }
418
419        Ok(())
420    }
421
422    // Find the uploaded target snapshot ID required by one load operation.
423    pub(super) fn uploaded_snapshot_id_for_load(
424        &self,
425        load: &RestoreApplyJournalOperation,
426    ) -> Option<&str> {
427        self.operation_receipts
428            .iter()
429            .find(|receipt| {
430                receipt.matches_load_operation(load)
431                    && self.operations.iter().any(|operation| {
432                        operation.sequence == receipt.sequence
433                            && operation.operation == RestoreApplyOperationKind::UploadSnapshot
434                            && operation.state == RestoreApplyOperationState::Completed
435                    })
436            })
437            .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
438    }
439}