Skip to main content

canic_backup/restore/apply/journal/
mod.rs

1//! Module: restore::apply::journal
2//!
3//! Responsibility: persist and advance restore apply operation state.
4//! Does not own: restore plan construction, command execution, or backup artifact checksums.
5//! Boundary: turns dry-runs into durable journals, reports, previews, and receipts.
6
7mod commands;
8mod counts;
9mod receipts;
10mod reports;
11mod types;
12
13pub use commands::{
14    RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
15};
16use counts::RestoreApplyJournalStateCounts;
17pub use counts::RestoreApplyOperationKindCounts;
18pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
19pub use receipts::{
20    RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
21};
22pub(in crate::restore) use reports::RestoreApplyJournalReport;
23pub use reports::{
24    RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
25    RestoreApplyReportOutcome,
26};
27pub use types::{
28    RestoreApplyJournalError, RestoreApplyJournalOperation, RestoreApplyOperationKind,
29    RestoreApplyOperationState,
30};
31
32use crate::restore::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
33
34use std::collections::BTreeSet;
35
36use serde::{Deserialize, Serialize};
37
38use types::{
39    restore_apply_blocked_reasons, validate_apply_journal_count, validate_apply_journal_nonempty,
40    validate_apply_journal_sequences, validate_apply_journal_version,
41};
42
43///
44/// RestoreApplyJournal
45///
46/// Durable restore apply operation journal.
47/// Owned by restore apply journaling and consumed by restore runners and reports.
48///
49
50#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
51pub struct RestoreApplyJournal {
52    pub journal_version: u16,
53    pub backup_id: String,
54    pub ready: bool,
55    pub blocked_reasons: Vec<String>,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub backup_root: Option<String>,
58    pub operation_count: usize,
59    pub operation_counts: RestoreApplyOperationKindCounts,
60    pub pending_operations: usize,
61    pub ready_operations: usize,
62    pub blocked_operations: usize,
63    pub completed_operations: usize,
64    pub failed_operations: usize,
65    pub operations: Vec<RestoreApplyJournalOperation>,
66    #[serde(default, skip_serializing_if = "Vec::is_empty")]
67    pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
68}
69
70impl RestoreApplyJournal {
71    /// Build the initial no-mutation restore apply journal from a dry-run.
72    #[must_use]
73    pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
74        let blocked_reasons = restore_apply_blocked_reasons(dry_run);
75        let initial_state = if blocked_reasons.is_empty() {
76            RestoreApplyOperationState::Ready
77        } else {
78            RestoreApplyOperationState::Blocked
79        };
80        let operations = dry_run
81            .operations
82            .iter()
83            .map(|operation| {
84                RestoreApplyJournalOperation::from_dry_run_operation(
85                    operation,
86                    initial_state.clone(),
87                    &blocked_reasons,
88                )
89            })
90            .collect::<Vec<_>>();
91        let ready_operations = operations
92            .iter()
93            .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
94            .count();
95        let blocked_operations = operations
96            .iter()
97            .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
98            .count();
99        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
100
101        Self {
102            journal_version: 1,
103            backup_id: dry_run.backup_id.clone(),
104            ready: blocked_reasons.is_empty(),
105            blocked_reasons,
106            backup_root: dry_run
107                .artifact_validation
108                .as_ref()
109                .map(|validation| validation.backup_root.clone()),
110            operation_count: operations.len(),
111            operation_counts,
112            pending_operations: 0,
113            ready_operations,
114            blocked_operations,
115            completed_operations: 0,
116            failed_operations: 0,
117            operations,
118            operation_receipts: Vec::new(),
119        }
120    }
121
122    /// Validate the structural consistency of a restore apply journal.
123    pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
124        validate_apply_journal_version(self.journal_version)?;
125        validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
126        if let Some(backup_root) = &self.backup_root {
127            validate_apply_journal_nonempty("backup_root", backup_root)?;
128        }
129        validate_apply_journal_count(
130            "operation_count",
131            self.operation_count,
132            self.operations.len(),
133        )?;
134
135        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
136        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
137        self.operation_counts.validate_matches(&operation_counts)?;
138        validate_apply_journal_count(
139            "pending_operations",
140            self.pending_operations,
141            state_counts.pending,
142        )?;
143        validate_apply_journal_count(
144            "ready_operations",
145            self.ready_operations,
146            state_counts.ready,
147        )?;
148        validate_apply_journal_count(
149            "blocked_operations",
150            self.blocked_operations,
151            state_counts.blocked,
152        )?;
153        validate_apply_journal_count(
154            "completed_operations",
155            self.completed_operations,
156            state_counts.completed,
157        )?;
158        validate_apply_journal_count(
159            "failed_operations",
160            self.failed_operations,
161            state_counts.failed,
162        )?;
163
164        if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
165            return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
166        }
167
168        validate_apply_journal_sequences(&self.operations)?;
169        for operation in &self.operations {
170            operation.validate()?;
171        }
172        self.validate_operation_receipt_attempts()?;
173        for receipt in &self.operation_receipts {
174            receipt.validate_against(self)?;
175        }
176
177        Ok(())
178    }
179
180    /// Build an operator-oriented report from this apply journal.
181    #[must_use]
182    pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
183        RestoreApplyJournalReport::from_journal(self)
184    }
185
186    /// Return the next ready or pending operation that controls runner progress.
187    #[must_use]
188    pub(in crate::restore) fn next_transition_operation(
189        &self,
190    ) -> Option<&RestoreApplyJournalOperation> {
191        self.operations
192            .iter()
193            .filter(|operation| {
194                matches!(
195                    operation.state,
196                    RestoreApplyOperationState::Ready
197                        | RestoreApplyOperationState::Pending
198                        | RestoreApplyOperationState::Failed
199                )
200            })
201            .min_by_key(|operation| operation.sequence)
202    }
203
204    /// Render the next transitionable operation as a no-execute command preview.
205    #[must_use]
206    pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
207        RestoreApplyCommandPreview::from_journal(self)
208    }
209
210    /// Render the next transitionable operation with a configured command preview.
211    #[must_use]
212    pub(in crate::restore) fn next_command_preview_with_config(
213        &self,
214        config: &RestoreApplyCommandConfig,
215    ) -> RestoreApplyCommandPreview {
216        RestoreApplyCommandPreview::from_journal_with_config(self, config)
217    }
218
219    /// Store one durable operation receipt/output and revalidate the journal.
220    pub(in crate::restore) fn record_operation_receipt(
221        &mut self,
222        receipt: RestoreApplyOperationReceipt,
223    ) -> Result<(), RestoreApplyJournalError> {
224        self.operation_receipts.push(receipt);
225        if let Err(error) = self.validate() {
226            self.operation_receipts.pop();
227            return Err(error);
228        }
229
230        Ok(())
231    }
232
233    /// Mark the next transitionable operation pending with an update marker.
234    pub fn mark_next_operation_pending_at(
235        &mut self,
236        updated_at: Option<String>,
237    ) -> Result<(), RestoreApplyJournalError> {
238        let sequence = self
239            .next_transition_sequence()
240            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
241        self.mark_operation_pending_at(sequence, updated_at)
242    }
243
244    /// Mark one restore apply operation pending with an update marker.
245    pub(in crate::restore) fn mark_operation_pending_at(
246        &mut self,
247        sequence: usize,
248        updated_at: Option<String>,
249    ) -> Result<(), RestoreApplyJournalError> {
250        self.transition_operation(
251            sequence,
252            RestoreApplyOperationState::Pending,
253            Vec::new(),
254            updated_at,
255        )
256    }
257
258    /// Mark the current pending operation ready again with an update marker.
259    pub(in crate::restore) fn mark_next_operation_ready_at(
260        &mut self,
261        updated_at: Option<String>,
262    ) -> Result<(), RestoreApplyJournalError> {
263        let operation = self
264            .next_transition_operation()
265            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
266        if operation.state != RestoreApplyOperationState::Pending {
267            return Err(RestoreApplyJournalError::NoPendingOperation);
268        }
269
270        self.mark_operation_ready_at(operation.sequence, updated_at)
271    }
272
273    /// Mark one restore apply operation ready again with an update marker.
274    pub(in crate::restore) fn mark_operation_ready_at(
275        &mut self,
276        sequence: usize,
277        updated_at: Option<String>,
278    ) -> Result<(), RestoreApplyJournalError> {
279        self.transition_operation(
280            sequence,
281            RestoreApplyOperationState::Ready,
282            Vec::new(),
283            updated_at,
284        )
285    }
286
287    /// Retry one failed restore apply operation by moving it back to ready.
288    pub fn retry_failed_operation_at(
289        &mut self,
290        sequence: usize,
291        updated_at: Option<String>,
292    ) -> Result<(), RestoreApplyJournalError> {
293        self.transition_operation(
294            sequence,
295            RestoreApplyOperationState::Ready,
296            Vec::new(),
297            updated_at,
298        )
299    }
300
301    /// Mark one restore apply operation completed with an update marker.
302    pub(in crate::restore) fn mark_operation_completed_at(
303        &mut self,
304        sequence: usize,
305        updated_at: Option<String>,
306    ) -> Result<(), RestoreApplyJournalError> {
307        self.transition_operation(
308            sequence,
309            RestoreApplyOperationState::Completed,
310            Vec::new(),
311            updated_at,
312        )
313    }
314
315    /// Mark one restore apply operation failed with an update marker.
316    pub(in crate::restore) fn mark_operation_failed_at(
317        &mut self,
318        sequence: usize,
319        reason: String,
320        updated_at: Option<String>,
321    ) -> Result<(), RestoreApplyJournalError> {
322        if reason.trim().is_empty() {
323            return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
324        }
325
326        self.transition_operation(
327            sequence,
328            RestoreApplyOperationState::Failed,
329            vec![reason],
330            updated_at,
331        )
332    }
333
334    // Apply one legal operation state transition and revalidate the journal.
335    fn transition_operation(
336        &mut self,
337        sequence: usize,
338        next_state: RestoreApplyOperationState,
339        blocking_reasons: Vec<String>,
340        updated_at: Option<String>,
341    ) -> Result<(), RestoreApplyJournalError> {
342        let index = self
343            .operations
344            .iter()
345            .position(|operation| operation.sequence == sequence)
346            .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
347        let operation = &self.operations[index];
348
349        if !operation.can_transition_to(&next_state) {
350            return Err(RestoreApplyJournalError::InvalidOperationTransition {
351                sequence,
352                from: operation.state.clone(),
353                to: next_state,
354            });
355        }
356
357        self.validate_operation_transition_order(operation, &next_state)?;
358
359        let operation = &mut self.operations[index];
360        operation.state = next_state;
361        operation.blocking_reasons = blocking_reasons;
362        operation.state_updated_at = updated_at;
363        self.refresh_operation_counts();
364        self.validate()
365    }
366
367    // Ensure fresh operation transitions advance in journal order.
368    fn validate_operation_transition_order(
369        &self,
370        operation: &RestoreApplyJournalOperation,
371        next_state: &RestoreApplyOperationState,
372    ) -> Result<(), RestoreApplyJournalError> {
373        if operation.state == *next_state {
374            return Ok(());
375        }
376
377        let next_sequence = self
378            .next_transition_sequence()
379            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
380
381        if operation.sequence == next_sequence {
382            return Ok(());
383        }
384
385        Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
386            requested: operation.sequence,
387            next: next_sequence,
388        })
389    }
390
391    // Return the next operation sequence that can be advanced by a runner.
392    fn next_transition_sequence(&self) -> Option<usize> {
393        self.next_transition_operation()
394            .map(|operation| operation.sequence)
395    }
396
397    // Recompute operation counts after a journal operation state change.
398    fn refresh_operation_counts(&mut self) {
399        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
400        self.operation_count = self.operations.len();
401        self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
402        self.pending_operations = state_counts.pending;
403        self.ready_operations = state_counts.ready;
404        self.blocked_operations = state_counts.blocked;
405        self.completed_operations = state_counts.completed;
406        self.failed_operations = state_counts.failed;
407    }
408
409    // Return whether every planned operation has completed.
410    pub(super) const fn is_complete(&self) -> bool {
411        self.operation_count > 0 && self.completed_operations == self.operation_count
412    }
413
414    // Recompute operation-kind counts from concrete operation rows.
415    pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
416        RestoreApplyOperationKindCounts::from_operations(&self.operations)
417    }
418
419    // Ensure one operation attempt has exactly one durable command outcome.
420    fn validate_operation_receipt_attempts(&self) -> Result<(), RestoreApplyJournalError> {
421        let mut attempts = BTreeSet::new();
422        for receipt in &self.operation_receipts {
423            if !attempts.insert((receipt.sequence, receipt.attempt)) {
424                return Err(RestoreApplyJournalError::DuplicateOperationReceiptAttempt {
425                    sequence: receipt.sequence,
426                    attempt: receipt.attempt,
427                });
428            }
429        }
430
431        Ok(())
432    }
433
434    // Find the uploaded target snapshot ID required by one load operation.
435    pub(super) fn uploaded_snapshot_id_for_load(
436        &self,
437        load: &RestoreApplyJournalOperation,
438    ) -> Option<&str> {
439        self.operation_receipts
440            .iter()
441            .find(|receipt| {
442                receipt.matches_load_operation(load)
443                    && self.operations.iter().any(|operation| {
444                        operation.sequence == receipt.sequence
445                            && operation.operation == RestoreApplyOperationKind::UploadSnapshot
446                            && operation.state == RestoreApplyOperationState::Completed
447                    })
448            })
449            .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
450    }
451}