Skip to main content

canic_backup/restore/apply/journal/
mod.rs

1use super::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
2use serde::{Deserialize, Serialize};
3
4mod commands;
5mod counts;
6mod receipts;
7mod reports;
8mod types;
9
10pub use commands::{
11    RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
12};
13use counts::RestoreApplyJournalStateCounts;
14pub use counts::RestoreApplyOperationKindCounts;
15pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
16pub use receipts::{
17    RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
18};
19pub(in crate::restore) use reports::RestoreApplyJournalReport;
20pub use reports::{
21    RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
22    RestoreApplyReportOutcome,
23};
24pub use types::{
25    RestoreApplyJournalError, RestoreApplyJournalOperation, RestoreApplyOperationKind,
26    RestoreApplyOperationState,
27};
28use types::{
29    restore_apply_blocked_reasons, validate_apply_journal_count, validate_apply_journal_nonempty,
30    validate_apply_journal_sequences, validate_apply_journal_version,
31};
32
33///
34/// RestoreApplyJournal
35///
36
37#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
38pub struct RestoreApplyJournal {
39    pub journal_version: u16,
40    pub backup_id: String,
41    pub ready: bool,
42    pub blocked_reasons: Vec<String>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub backup_root: Option<String>,
45    pub operation_count: usize,
46    pub operation_counts: RestoreApplyOperationKindCounts,
47    pub pending_operations: usize,
48    pub ready_operations: usize,
49    pub blocked_operations: usize,
50    pub completed_operations: usize,
51    pub failed_operations: usize,
52    pub operations: Vec<RestoreApplyJournalOperation>,
53    #[serde(default, skip_serializing_if = "Vec::is_empty")]
54    pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
55}
56
57impl RestoreApplyJournal {
58    /// Build the initial no-mutation restore apply journal from a dry-run.
59    #[must_use]
60    pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
61        let blocked_reasons = restore_apply_blocked_reasons(dry_run);
62        let initial_state = if blocked_reasons.is_empty() {
63            RestoreApplyOperationState::Ready
64        } else {
65            RestoreApplyOperationState::Blocked
66        };
67        let operations = dry_run
68            .operations
69            .iter()
70            .map(|operation| {
71                RestoreApplyJournalOperation::from_dry_run_operation(
72                    operation,
73                    initial_state.clone(),
74                    &blocked_reasons,
75                )
76            })
77            .collect::<Vec<_>>();
78        let ready_operations = operations
79            .iter()
80            .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
81            .count();
82        let blocked_operations = operations
83            .iter()
84            .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
85            .count();
86        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
87
88        Self {
89            journal_version: 1,
90            backup_id: dry_run.backup_id.clone(),
91            ready: blocked_reasons.is_empty(),
92            blocked_reasons,
93            backup_root: dry_run
94                .artifact_validation
95                .as_ref()
96                .map(|validation| validation.backup_root.clone()),
97            operation_count: operations.len(),
98            operation_counts,
99            pending_operations: 0,
100            ready_operations,
101            blocked_operations,
102            completed_operations: 0,
103            failed_operations: 0,
104            operations,
105            operation_receipts: Vec::new(),
106        }
107    }
108
109    /// Validate the structural consistency of a restore apply journal.
110    pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
111        validate_apply_journal_version(self.journal_version)?;
112        validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
113        if let Some(backup_root) = &self.backup_root {
114            validate_apply_journal_nonempty("backup_root", backup_root)?;
115        }
116        validate_apply_journal_count(
117            "operation_count",
118            self.operation_count,
119            self.operations.len(),
120        )?;
121
122        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
123        let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
124        self.operation_counts.validate_matches(&operation_counts)?;
125        validate_apply_journal_count(
126            "pending_operations",
127            self.pending_operations,
128            state_counts.pending,
129        )?;
130        validate_apply_journal_count(
131            "ready_operations",
132            self.ready_operations,
133            state_counts.ready,
134        )?;
135        validate_apply_journal_count(
136            "blocked_operations",
137            self.blocked_operations,
138            state_counts.blocked,
139        )?;
140        validate_apply_journal_count(
141            "completed_operations",
142            self.completed_operations,
143            state_counts.completed,
144        )?;
145        validate_apply_journal_count(
146            "failed_operations",
147            self.failed_operations,
148            state_counts.failed,
149        )?;
150
151        if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
152            return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
153        }
154
155        validate_apply_journal_sequences(&self.operations)?;
156        for operation in &self.operations {
157            operation.validate()?;
158        }
159        for receipt in &self.operation_receipts {
160            receipt.validate_against(self)?;
161        }
162
163        Ok(())
164    }
165
166    /// Build an operator-oriented report from this apply journal.
167    #[must_use]
168    pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
169        RestoreApplyJournalReport::from_journal(self)
170    }
171
172    /// Return the next ready or pending operation that controls runner progress.
173    #[must_use]
174    pub(in crate::restore) fn next_transition_operation(
175        &self,
176    ) -> Option<&RestoreApplyJournalOperation> {
177        self.operations
178            .iter()
179            .filter(|operation| {
180                matches!(
181                    operation.state,
182                    RestoreApplyOperationState::Ready
183                        | RestoreApplyOperationState::Pending
184                        | RestoreApplyOperationState::Failed
185                )
186            })
187            .min_by_key(|operation| operation.sequence)
188    }
189
190    /// Render the next transitionable operation as a no-execute command preview.
191    #[must_use]
192    pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
193        RestoreApplyCommandPreview::from_journal(self)
194    }
195
196    /// Render the next transitionable operation with a configured command preview.
197    #[must_use]
198    pub(in crate::restore) fn next_command_preview_with_config(
199        &self,
200        config: &RestoreApplyCommandConfig,
201    ) -> RestoreApplyCommandPreview {
202        RestoreApplyCommandPreview::from_journal_with_config(self, config)
203    }
204
205    /// Store one durable operation receipt/output and revalidate the journal.
206    pub(in crate::restore) fn record_operation_receipt(
207        &mut self,
208        receipt: RestoreApplyOperationReceipt,
209    ) -> Result<(), RestoreApplyJournalError> {
210        self.operation_receipts.push(receipt);
211        if let Err(error) = self.validate() {
212            self.operation_receipts.pop();
213            return Err(error);
214        }
215
216        Ok(())
217    }
218
219    /// Mark the next transitionable operation pending with an update marker.
220    pub fn mark_next_operation_pending_at(
221        &mut self,
222        updated_at: Option<String>,
223    ) -> Result<(), RestoreApplyJournalError> {
224        let sequence = self
225            .next_transition_sequence()
226            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
227        self.mark_operation_pending_at(sequence, updated_at)
228    }
229
230    /// Mark one restore apply operation pending with an update marker.
231    pub(in crate::restore) fn mark_operation_pending_at(
232        &mut self,
233        sequence: usize,
234        updated_at: Option<String>,
235    ) -> Result<(), RestoreApplyJournalError> {
236        self.transition_operation(
237            sequence,
238            RestoreApplyOperationState::Pending,
239            Vec::new(),
240            updated_at,
241        )
242    }
243
244    /// Mark the current pending operation ready again with an update marker.
245    pub(in crate::restore) fn mark_next_operation_ready_at(
246        &mut self,
247        updated_at: Option<String>,
248    ) -> Result<(), RestoreApplyJournalError> {
249        let operation = self
250            .next_transition_operation()
251            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
252        if operation.state != RestoreApplyOperationState::Pending {
253            return Err(RestoreApplyJournalError::NoPendingOperation);
254        }
255
256        self.mark_operation_ready_at(operation.sequence, updated_at)
257    }
258
259    /// Mark one restore apply operation ready again with an update marker.
260    pub(in crate::restore) fn mark_operation_ready_at(
261        &mut self,
262        sequence: usize,
263        updated_at: Option<String>,
264    ) -> Result<(), RestoreApplyJournalError> {
265        self.transition_operation(
266            sequence,
267            RestoreApplyOperationState::Ready,
268            Vec::new(),
269            updated_at,
270        )
271    }
272
273    /// Retry one failed restore apply operation by moving it back to ready.
274    pub fn retry_failed_operation_at(
275        &mut self,
276        sequence: usize,
277        updated_at: Option<String>,
278    ) -> Result<(), RestoreApplyJournalError> {
279        self.transition_operation(
280            sequence,
281            RestoreApplyOperationState::Ready,
282            Vec::new(),
283            updated_at,
284        )
285    }
286
287    /// Mark one restore apply operation completed with an update marker.
288    pub(in crate::restore) fn mark_operation_completed_at(
289        &mut self,
290        sequence: usize,
291        updated_at: Option<String>,
292    ) -> Result<(), RestoreApplyJournalError> {
293        self.transition_operation(
294            sequence,
295            RestoreApplyOperationState::Completed,
296            Vec::new(),
297            updated_at,
298        )
299    }
300
301    /// Mark one restore apply operation failed with an update marker.
302    pub(in crate::restore) fn mark_operation_failed_at(
303        &mut self,
304        sequence: usize,
305        reason: String,
306        updated_at: Option<String>,
307    ) -> Result<(), RestoreApplyJournalError> {
308        if reason.trim().is_empty() {
309            return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
310        }
311
312        self.transition_operation(
313            sequence,
314            RestoreApplyOperationState::Failed,
315            vec![reason],
316            updated_at,
317        )
318    }
319
320    // Apply one legal operation state transition and revalidate the journal.
321    fn transition_operation(
322        &mut self,
323        sequence: usize,
324        next_state: RestoreApplyOperationState,
325        blocking_reasons: Vec<String>,
326        updated_at: Option<String>,
327    ) -> Result<(), RestoreApplyJournalError> {
328        let index = self
329            .operations
330            .iter()
331            .position(|operation| operation.sequence == sequence)
332            .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
333        let operation = &self.operations[index];
334
335        if !operation.can_transition_to(&next_state) {
336            return Err(RestoreApplyJournalError::InvalidOperationTransition {
337                sequence,
338                from: operation.state.clone(),
339                to: next_state,
340            });
341        }
342
343        self.validate_operation_transition_order(operation, &next_state)?;
344
345        let operation = &mut self.operations[index];
346        operation.state = next_state;
347        operation.blocking_reasons = blocking_reasons;
348        operation.state_updated_at = updated_at;
349        self.refresh_operation_counts();
350        self.validate()
351    }
352
353    // Ensure fresh operation transitions advance in journal order.
354    fn validate_operation_transition_order(
355        &self,
356        operation: &RestoreApplyJournalOperation,
357        next_state: &RestoreApplyOperationState,
358    ) -> Result<(), RestoreApplyJournalError> {
359        if operation.state == *next_state {
360            return Ok(());
361        }
362
363        let next_sequence = self
364            .next_transition_sequence()
365            .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
366
367        if operation.sequence == next_sequence {
368            return Ok(());
369        }
370
371        Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
372            requested: operation.sequence,
373            next: next_sequence,
374        })
375    }
376
377    // Return the next operation sequence that can be advanced by a runner.
378    fn next_transition_sequence(&self) -> Option<usize> {
379        self.next_transition_operation()
380            .map(|operation| operation.sequence)
381    }
382
383    // Recompute operation counts after a journal operation state change.
384    fn refresh_operation_counts(&mut self) {
385        let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
386        self.operation_count = self.operations.len();
387        self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
388        self.pending_operations = state_counts.pending;
389        self.ready_operations = state_counts.ready;
390        self.blocked_operations = state_counts.blocked;
391        self.completed_operations = state_counts.completed;
392        self.failed_operations = state_counts.failed;
393    }
394
395    // Return whether every planned operation has completed.
396    pub(super) const fn is_complete(&self) -> bool {
397        self.operation_count > 0 && self.completed_operations == self.operation_count
398    }
399
400    // Recompute operation-kind counts from concrete operation rows.
401    pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
402        RestoreApplyOperationKindCounts::from_operations(&self.operations)
403    }
404
405    // Find the uploaded target snapshot ID required by one load operation.
406    pub(super) fn uploaded_snapshot_id_for_load(
407        &self,
408        load: &RestoreApplyJournalOperation,
409    ) -> Option<&str> {
410        self.operation_receipts
411            .iter()
412            .find(|receipt| {
413                receipt.matches_load_operation(load)
414                    && self.operations.iter().any(|operation| {
415                        operation.sequence == receipt.sequence
416                            && operation.operation == RestoreApplyOperationKind::UploadSnapshot
417                            && operation.state == RestoreApplyOperationState::Completed
418                    })
419            })
420            .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
421    }
422}