Skip to main content

canic_backup/execution/
mod.rs

1mod operation;
2mod receipt;
3mod types;
4mod validation;
5
6pub use types::*;
7
8use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
9use validation::{
10    operation_kind_is_mutating, operation_kind_is_preflight, validate_nonempty,
11    validate_operation_sequences,
12};
13
14const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
15const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
16
17impl BackupExecutionJournal {
18    /// Build an execution journal from a validated backup plan.
19    pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
20        plan.validate()
21            .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
22        let operations = plan
23            .phases
24            .iter()
25            .map(BackupExecutionJournalOperation::from_plan_operation)
26            .collect::<Vec<_>>();
27        let mut journal = Self {
28            journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
29            plan_id: plan.plan_id.clone(),
30            run_id: plan.run_id.clone(),
31            preflight_id: None,
32            preflight_accepted: false,
33            restart_required: false,
34            operations,
35            operation_receipts: Vec::new(),
36        };
37        journal.refresh_blocked_operations();
38        journal.validate()?;
39        Ok(journal)
40    }
41
42    /// Validate journal structure and operation receipts.
43    pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
44        if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
45            return Err(BackupExecutionJournalError::UnsupportedVersion(
46                self.journal_version,
47            ));
48        }
49        validate_nonempty("plan_id", &self.plan_id)?;
50        validate_nonempty("run_id", &self.run_id)?;
51        if let Some(preflight_id) = &self.preflight_id {
52            validate_nonempty("preflight_id", preflight_id)?;
53        } else if self.preflight_accepted {
54            return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
55        }
56        if self.restart_required != self.derived_restart_required() {
57            return Err(BackupExecutionJournalError::RestartRequiredMismatch);
58        }
59        validate_operation_sequences(&self.operations)?;
60        for operation in &self.operations {
61            operation.validate()?;
62            if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
63                match operation.state {
64                    BackupExecutionOperationState::Blocked => {}
65                    BackupExecutionOperationState::Ready
66                    | BackupExecutionOperationState::Pending
67                    | BackupExecutionOperationState::Completed
68                    | BackupExecutionOperationState::Failed
69                    | BackupExecutionOperationState::Skipped => {
70                        return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
71                            sequence: operation.sequence,
72                        });
73                    }
74                }
75            }
76        }
77        for receipt in &self.operation_receipts {
78            receipt.validate_against(self)?;
79        }
80        Ok(())
81    }
82
83    /// Mark all preflight operations completed and unblock mutating operations.
84    pub fn accept_preflight_bundle_at(
85        &mut self,
86        preflight_id: String,
87        updated_at: Option<String>,
88    ) -> Result<(), BackupExecutionJournalError> {
89        validate_nonempty("preflight_id", &preflight_id)?;
90        validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
91        if let Some(existing) = &self.preflight_id
92            && existing != &preflight_id
93        {
94            return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
95                existing: existing.clone(),
96                attempted: preflight_id,
97            });
98        }
99
100        self.preflight_id = Some(preflight_id);
101        self.preflight_accepted = true;
102        for operation in &mut self.operations {
103            if operation_kind_is_preflight(&operation.kind) {
104                operation.state = BackupExecutionOperationState::Completed;
105                operation.state_updated_at.clone_from(&updated_at);
106                operation.blocking_reasons.clear();
107            } else if operation.state == BackupExecutionOperationState::Blocked {
108                operation.state = BackupExecutionOperationState::Ready;
109                operation.blocking_reasons.clear();
110            }
111        }
112        self.refresh_restart_required();
113        self.validate()
114    }
115
116    /// Accept a typed preflight receipt bundle and unblock mutating operations.
117    pub fn accept_preflight_receipts_at(
118        &mut self,
119        receipts: &BackupExecutionPreflightReceipts,
120        updated_at: Option<String>,
121    ) -> Result<(), BackupExecutionJournalError> {
122        validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
123        if receipts.plan_id != self.plan_id {
124            return Err(BackupExecutionJournalError::PreflightPlanMismatch {
125                expected: self.plan_id.clone(),
126                actual: receipts.plan_id.clone(),
127            });
128        }
129        self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
130    }
131
132    /// Return the next operation that should control runner progress.
133    #[must_use]
134    pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
135        self.operations
136            .iter()
137            .filter(|operation| {
138                matches!(
139                    operation.state,
140                    BackupExecutionOperationState::Ready
141                        | BackupExecutionOperationState::Pending
142                        | BackupExecutionOperationState::Failed
143                )
144            })
145            .min_by_key(|operation| operation.sequence)
146    }
147
148    /// Mark the next transitionable operation pending.
149    pub fn mark_next_operation_pending_at(
150        &mut self,
151        updated_at: Option<String>,
152    ) -> Result<(), BackupExecutionJournalError> {
153        let sequence = self
154            .next_ready_operation()
155            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
156            .sequence;
157        self.mark_operation_pending_at(sequence, updated_at)
158    }
159
160    /// Mark one operation pending.
161    pub fn mark_operation_pending_at(
162        &mut self,
163        sequence: usize,
164        updated_at: Option<String>,
165    ) -> Result<(), BackupExecutionJournalError> {
166        validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
167        let expected = self
168            .next_ready_operation()
169            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
170            .sequence;
171        if sequence != expected {
172            return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
173                requested: sequence,
174                next: expected,
175            });
176        }
177        let index = self.operation_index(sequence)?;
178        let operation = &self.operations[index];
179        if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
180            return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
181        }
182        if !matches!(
183            operation.state,
184            BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
185        ) {
186            return Err(BackupExecutionJournalError::InvalidOperationTransition {
187                sequence,
188                from: operation.state.clone(),
189                to: BackupExecutionOperationState::Pending,
190            });
191        }
192
193        let operation = &mut self.operations[index];
194        operation.state = BackupExecutionOperationState::Pending;
195        operation.state_updated_at = updated_at;
196        operation.blocking_reasons.clear();
197        self.refresh_restart_required();
198        self.validate()
199    }
200
201    /// Record one operation receipt and transition the matching operation.
202    pub fn record_operation_receipt(
203        &mut self,
204        receipt: BackupExecutionOperationReceipt,
205    ) -> Result<(), BackupExecutionJournalError> {
206        receipt.validate_against(self)?;
207        let index = self.operation_index(receipt.sequence)?;
208        let operation = &self.operations[index];
209        if operation.state != BackupExecutionOperationState::Pending {
210            return Err(
211                BackupExecutionJournalError::ReceiptWithoutPendingOperation {
212                    sequence: receipt.sequence,
213                },
214            );
215        }
216
217        let next_state = match receipt.outcome {
218            BackupExecutionOperationReceiptOutcome::Completed => {
219                BackupExecutionOperationState::Completed
220            }
221            BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
222            BackupExecutionOperationReceiptOutcome::Skipped => {
223                BackupExecutionOperationState::Skipped
224            }
225        };
226        let failure_reason = receipt.failure_reason.clone();
227        let previous_operation = self.operations[index].clone();
228        let previous_restart_required = self.restart_required;
229        self.operation_receipts.push(receipt);
230
231        let operation = &mut self.operations[index];
232        operation.state = next_state;
233        operation.state_updated_at = self
234            .operation_receipts
235            .last()
236            .and_then(|receipt| receipt.updated_at.clone());
237        operation.blocking_reasons = failure_reason.into_iter().collect();
238        self.refresh_restart_required();
239        if let Err(error) = self.validate() {
240            self.operation_receipts.pop();
241            self.operations[index] = previous_operation;
242            self.restart_required = previous_restart_required;
243            return Err(error);
244        }
245        Ok(())
246    }
247
248    /// Move a failed operation back to ready for retry.
249    pub fn retry_failed_operation_at(
250        &mut self,
251        sequence: usize,
252        updated_at: Option<String>,
253    ) -> Result<(), BackupExecutionJournalError> {
254        validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
255        let index = self.operation_index(sequence)?;
256        if self.operations[index].state != BackupExecutionOperationState::Failed {
257            return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
258        }
259        self.operations[index].state = BackupExecutionOperationState::Ready;
260        self.operations[index].state_updated_at = updated_at;
261        self.operations[index].blocking_reasons.clear();
262        self.refresh_restart_required();
263        self.validate()
264    }
265
266    /// Build a compact resumability summary.
267    #[must_use]
268    pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
269        let mut summary = BackupExecutionResumeSummary {
270            plan_id: self.plan_id.clone(),
271            run_id: self.run_id.clone(),
272            preflight_id: self.preflight_id.clone(),
273            preflight_accepted: self.preflight_accepted,
274            restart_required: self.restart_required,
275            total_operations: self.operations.len(),
276            ready_operations: 0,
277            pending_operations: 0,
278            blocked_operations: 0,
279            completed_operations: 0,
280            failed_operations: 0,
281            skipped_operations: 0,
282            next_operation: self.next_ready_operation().cloned(),
283        };
284        for operation in &self.operations {
285            match operation.state {
286                BackupExecutionOperationState::Ready => summary.ready_operations += 1,
287                BackupExecutionOperationState::Pending => summary.pending_operations += 1,
288                BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
289                BackupExecutionOperationState::Completed => summary.completed_operations += 1,
290                BackupExecutionOperationState::Failed => summary.failed_operations += 1,
291                BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
292            }
293        }
294        summary
295    }
296
297    fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
298        self.operations
299            .iter()
300            .position(|operation| operation.sequence == sequence)
301            .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
302    }
303
304    fn refresh_blocked_operations(&mut self) {
305        if self.preflight_accepted {
306            return;
307        }
308        for operation in &mut self.operations {
309            if operation_kind_is_mutating(&operation.kind) {
310                operation.state = BackupExecutionOperationState::Blocked;
311                operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
312            }
313        }
314    }
315
316    fn refresh_restart_required(&mut self) {
317        self.restart_required = self.derived_restart_required();
318    }
319
320    fn derived_restart_required(&self) -> bool {
321        let stopped = self.operations.iter().any(|operation| {
322            operation.kind == BackupOperationKind::Stop
323                && operation.state == BackupExecutionOperationState::Completed
324        });
325        let unstarted = self.operations.iter().any(|operation| {
326            operation.kind == BackupOperationKind::Start
327                && !matches!(
328                    operation.state,
329                    BackupExecutionOperationState::Completed
330                        | BackupExecutionOperationState::Skipped
331                )
332        });
333        stopped && unstarted
334    }
335}
336
337#[cfg(test)]
338mod tests;