Skip to main content

canic_backup/execution/
mod.rs

1mod operation;
2mod receipt;
3mod types;
4mod validation;
5
6pub use types::*;
7
8use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
9use validation::{
10    operation_kind_is_mutating, operation_kind_is_preflight, validate_nonempty,
11    validate_operation_sequences, validate_optional_nonempty,
12};
13
14const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
15const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
16
17impl BackupExecutionJournal {
18    /// Build an execution journal from a validated backup plan.
19    pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
20        plan.validate()
21            .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
22        let operations = plan
23            .phases
24            .iter()
25            .map(BackupExecutionJournalOperation::from_plan_operation)
26            .collect::<Vec<_>>();
27        let mut journal = Self {
28            journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
29            plan_id: plan.plan_id.clone(),
30            run_id: plan.run_id.clone(),
31            preflight_id: None,
32            preflight_accepted: false,
33            restart_required: false,
34            operations,
35            operation_receipts: Vec::new(),
36        };
37        journal.refresh_blocked_operations();
38        journal.validate()?;
39        Ok(journal)
40    }
41
42    /// Validate journal structure and operation receipts.
43    pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
44        if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
45            return Err(BackupExecutionJournalError::UnsupportedVersion(
46                self.journal_version,
47            ));
48        }
49        validate_nonempty("plan_id", &self.plan_id)?;
50        validate_nonempty("run_id", &self.run_id)?;
51        if let Some(preflight_id) = &self.preflight_id {
52            validate_nonempty("preflight_id", preflight_id)?;
53        } else if self.preflight_accepted {
54            return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
55        }
56        validate_operation_sequences(&self.operations)?;
57        for operation in &self.operations {
58            operation.validate()?;
59            if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
60                match operation.state {
61                    BackupExecutionOperationState::Blocked => {}
62                    BackupExecutionOperationState::Ready
63                    | BackupExecutionOperationState::Pending
64                    | BackupExecutionOperationState::Completed
65                    | BackupExecutionOperationState::Failed
66                    | BackupExecutionOperationState::Skipped => {
67                        return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
68                            sequence: operation.sequence,
69                        });
70                    }
71                }
72            }
73        }
74        for receipt in &self.operation_receipts {
75            receipt.validate_against(self)?;
76        }
77        Ok(())
78    }
79
80    /// Mark all preflight operations completed and unblock mutating operations.
81    pub fn accept_preflight_bundle_at(
82        &mut self,
83        preflight_id: String,
84        updated_at: Option<String>,
85    ) -> Result<(), BackupExecutionJournalError> {
86        validate_nonempty("preflight_id", &preflight_id)?;
87        validate_optional_nonempty("updated_at", updated_at.as_deref())?;
88        if let Some(existing) = &self.preflight_id
89            && existing != &preflight_id
90        {
91            return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
92                existing: existing.clone(),
93                attempted: preflight_id,
94            });
95        }
96
97        self.preflight_id = Some(preflight_id);
98        self.preflight_accepted = true;
99        for operation in &mut self.operations {
100            if operation_kind_is_preflight(&operation.kind) {
101                operation.state = BackupExecutionOperationState::Completed;
102                operation.state_updated_at.clone_from(&updated_at);
103                operation.blocking_reasons.clear();
104            } else if operation.state == BackupExecutionOperationState::Blocked {
105                operation.state = BackupExecutionOperationState::Ready;
106                operation.blocking_reasons.clear();
107            }
108        }
109        self.refresh_restart_required();
110        self.validate()
111    }
112
113    /// Accept a typed preflight receipt bundle and unblock mutating operations.
114    pub fn accept_preflight_receipts_at(
115        &mut self,
116        receipts: &BackupExecutionPreflightReceipts,
117        updated_at: Option<String>,
118    ) -> Result<(), BackupExecutionJournalError> {
119        validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
120        if receipts.plan_id != self.plan_id {
121            return Err(BackupExecutionJournalError::PreflightPlanMismatch {
122                expected: self.plan_id.clone(),
123                actual: receipts.plan_id.clone(),
124            });
125        }
126        self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
127    }
128
129    /// Return the next operation that should control runner progress.
130    #[must_use]
131    pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
132        self.operations
133            .iter()
134            .filter(|operation| {
135                matches!(
136                    operation.state,
137                    BackupExecutionOperationState::Ready
138                        | BackupExecutionOperationState::Pending
139                        | BackupExecutionOperationState::Failed
140                )
141            })
142            .min_by_key(|operation| operation.sequence)
143    }
144
145    /// Mark the next transitionable operation pending.
146    pub fn mark_next_operation_pending_at(
147        &mut self,
148        updated_at: Option<String>,
149    ) -> Result<(), BackupExecutionJournalError> {
150        let sequence = self
151            .next_ready_operation()
152            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
153            .sequence;
154        self.mark_operation_pending_at(sequence, updated_at)
155    }
156
157    /// Mark one operation pending.
158    pub fn mark_operation_pending_at(
159        &mut self,
160        sequence: usize,
161        updated_at: Option<String>,
162    ) -> Result<(), BackupExecutionJournalError> {
163        validate_optional_nonempty("updated_at", updated_at.as_deref())?;
164        let expected = self
165            .next_ready_operation()
166            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
167            .sequence;
168        if sequence != expected {
169            return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
170                requested: sequence,
171                next: expected,
172            });
173        }
174        let index = self.operation_index(sequence)?;
175        let operation = &self.operations[index];
176        if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
177            return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
178        }
179        if !matches!(
180            operation.state,
181            BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
182        ) {
183            return Err(BackupExecutionJournalError::InvalidOperationTransition {
184                sequence,
185                from: operation.state.clone(),
186                to: BackupExecutionOperationState::Pending,
187            });
188        }
189
190        let operation = &mut self.operations[index];
191        operation.state = BackupExecutionOperationState::Pending;
192        operation.state_updated_at = updated_at;
193        operation.blocking_reasons.clear();
194        self.refresh_restart_required();
195        self.validate()
196    }
197
198    /// Record one operation receipt and transition the matching operation.
199    pub fn record_operation_receipt(
200        &mut self,
201        receipt: BackupExecutionOperationReceipt,
202    ) -> Result<(), BackupExecutionJournalError> {
203        receipt.validate_against(self)?;
204        let index = self.operation_index(receipt.sequence)?;
205        let operation = &self.operations[index];
206        if operation.state != BackupExecutionOperationState::Pending {
207            return Err(
208                BackupExecutionJournalError::ReceiptWithoutPendingOperation {
209                    sequence: receipt.sequence,
210                },
211            );
212        }
213
214        let next_state = match receipt.outcome {
215            BackupExecutionOperationReceiptOutcome::Completed => {
216                BackupExecutionOperationState::Completed
217            }
218            BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
219            BackupExecutionOperationReceiptOutcome::Skipped => {
220                BackupExecutionOperationState::Skipped
221            }
222        };
223        let failure_reason = receipt.failure_reason.clone();
224        self.operation_receipts.push(receipt);
225
226        let operation = &mut self.operations[index];
227        operation.state = next_state;
228        operation.state_updated_at = self
229            .operation_receipts
230            .last()
231            .and_then(|receipt| receipt.updated_at.clone());
232        operation.blocking_reasons = failure_reason.into_iter().collect();
233        self.refresh_restart_required();
234        if let Err(error) = self.validate() {
235            self.operation_receipts.pop();
236            return Err(error);
237        }
238        Ok(())
239    }
240
241    /// Move a failed operation back to ready for retry.
242    pub fn retry_failed_operation_at(
243        &mut self,
244        sequence: usize,
245        updated_at: Option<String>,
246    ) -> Result<(), BackupExecutionJournalError> {
247        validate_optional_nonempty("updated_at", updated_at.as_deref())?;
248        let index = self.operation_index(sequence)?;
249        if self.operations[index].state != BackupExecutionOperationState::Failed {
250            return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
251        }
252        self.operations[index].state = BackupExecutionOperationState::Ready;
253        self.operations[index].state_updated_at = updated_at;
254        self.operations[index].blocking_reasons.clear();
255        self.refresh_restart_required();
256        self.validate()
257    }
258
259    /// Build a compact resumability summary.
260    #[must_use]
261    pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
262        let mut summary = BackupExecutionResumeSummary {
263            plan_id: self.plan_id.clone(),
264            run_id: self.run_id.clone(),
265            preflight_id: self.preflight_id.clone(),
266            preflight_accepted: self.preflight_accepted,
267            restart_required: self.restart_required,
268            total_operations: self.operations.len(),
269            ready_operations: 0,
270            pending_operations: 0,
271            blocked_operations: 0,
272            completed_operations: 0,
273            failed_operations: 0,
274            skipped_operations: 0,
275            next_operation: self.next_ready_operation().cloned(),
276        };
277        for operation in &self.operations {
278            match operation.state {
279                BackupExecutionOperationState::Ready => summary.ready_operations += 1,
280                BackupExecutionOperationState::Pending => summary.pending_operations += 1,
281                BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
282                BackupExecutionOperationState::Completed => summary.completed_operations += 1,
283                BackupExecutionOperationState::Failed => summary.failed_operations += 1,
284                BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
285            }
286        }
287        summary
288    }
289
290    fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
291        self.operations
292            .iter()
293            .position(|operation| operation.sequence == sequence)
294            .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
295    }
296
297    fn refresh_blocked_operations(&mut self) {
298        if self.preflight_accepted {
299            return;
300        }
301        for operation in &mut self.operations {
302            if operation_kind_is_mutating(&operation.kind) {
303                operation.state = BackupExecutionOperationState::Blocked;
304                operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
305            }
306        }
307    }
308
309    fn refresh_restart_required(&mut self) {
310        let stopped = self.operations.iter().any(|operation| {
311            operation.kind == BackupOperationKind::Stop
312                && operation.state == BackupExecutionOperationState::Completed
313        });
314        let unstarted = self.operations.iter().any(|operation| {
315            operation.kind == BackupOperationKind::Start
316                && !matches!(
317                    operation.state,
318                    BackupExecutionOperationState::Completed
319                        | BackupExecutionOperationState::Skipped
320                )
321        });
322        self.restart_required = stopped && unstarted;
323    }
324}
325
326#[cfg(test)]
327mod tests;