Skip to main content

canic_backup/execution/
mod.rs

1//! Module: execution
2//!
3//! Responsibility: build and advance backup execution journals.
4//! Does not own: backup plan construction, artifact IO, or manifest storage.
5//! Boundary: tracks runner progress from validated plans through receipts.
6
7mod operation;
8mod receipt;
9#[cfg(test)]
10mod tests;
11mod types;
12mod validation;
13
14pub use types::*;
15
16use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
17use validation::{
18    operation_kind_is_mutating, operation_kind_is_preflight, validate_nonempty,
19    validate_operation_sequences,
20};
21
22const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
23const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
24
25impl BackupExecutionJournal {
26    /// Build an execution journal from a validated backup plan.
27    pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
28        plan.validate()
29            .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
30        let operations = plan
31            .phases
32            .iter()
33            .map(BackupExecutionJournalOperation::from_plan_operation)
34            .collect::<Vec<_>>();
35        let mut journal = Self {
36            journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
37            plan_id: plan.plan_id.clone(),
38            run_id: plan.run_id.clone(),
39            preflight_id: None,
40            preflight_accepted: false,
41            restart_required: false,
42            operations,
43            operation_receipts: Vec::new(),
44        };
45        journal.refresh_blocked_operations();
46        journal.validate()?;
47        Ok(journal)
48    }
49
50    /// Validate journal structure and operation receipts.
51    pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
52        if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
53            return Err(BackupExecutionJournalError::UnsupportedVersion(
54                self.journal_version,
55            ));
56        }
57        validate_nonempty("plan_id", &self.plan_id)?;
58        validate_nonempty("run_id", &self.run_id)?;
59        if let Some(preflight_id) = &self.preflight_id {
60            validate_nonempty("preflight_id", preflight_id)?;
61        } else if self.preflight_accepted {
62            return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
63        }
64        if self.restart_required != self.derived_restart_required() {
65            return Err(BackupExecutionJournalError::RestartRequiredMismatch);
66        }
67        validate_operation_sequences(&self.operations)?;
68        for operation in &self.operations {
69            operation.validate()?;
70            if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
71                match operation.state {
72                    BackupExecutionOperationState::Blocked => {}
73                    BackupExecutionOperationState::Ready
74                    | BackupExecutionOperationState::Pending
75                    | BackupExecutionOperationState::Completed
76                    | BackupExecutionOperationState::Failed
77                    | BackupExecutionOperationState::Skipped => {
78                        return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
79                            sequence: operation.sequence,
80                        });
81                    }
82                }
83            }
84        }
85        for receipt in &self.operation_receipts {
86            receipt.validate_against(self)?;
87        }
88        Ok(())
89    }
90
91    /// Mark all preflight operations completed and unblock mutating operations.
92    pub fn accept_preflight_bundle_at(
93        &mut self,
94        preflight_id: String,
95        updated_at: Option<String>,
96    ) -> Result<(), BackupExecutionJournalError> {
97        validate_nonempty("preflight_id", &preflight_id)?;
98        validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
99        if let Some(existing) = &self.preflight_id
100            && existing != &preflight_id
101        {
102            return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
103                existing: existing.clone(),
104                attempted: preflight_id,
105            });
106        }
107
108        self.preflight_id = Some(preflight_id);
109        self.preflight_accepted = true;
110        for operation in &mut self.operations {
111            if operation_kind_is_preflight(&operation.kind) {
112                operation.state = BackupExecutionOperationState::Completed;
113                operation.state_updated_at.clone_from(&updated_at);
114                operation.blocking_reasons.clear();
115            } else if operation.state == BackupExecutionOperationState::Blocked {
116                operation.state = BackupExecutionOperationState::Ready;
117                operation.blocking_reasons.clear();
118            }
119        }
120        self.refresh_restart_required();
121        self.validate()
122    }
123
124    /// Accept a typed preflight receipt bundle and unblock mutating operations.
125    pub fn accept_preflight_receipts_at(
126        &mut self,
127        receipts: &BackupExecutionPreflightReceipts,
128        updated_at: Option<String>,
129    ) -> Result<(), BackupExecutionJournalError> {
130        validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
131        if receipts.plan_id != self.plan_id {
132            return Err(BackupExecutionJournalError::PreflightPlanMismatch {
133                expected: self.plan_id.clone(),
134                actual: receipts.plan_id.clone(),
135            });
136        }
137        self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
138    }
139
140    /// Return the next operation that should control runner progress.
141    #[must_use]
142    pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
143        self.operations
144            .iter()
145            .filter(|operation| {
146                matches!(
147                    operation.state,
148                    BackupExecutionOperationState::Ready
149                        | BackupExecutionOperationState::Pending
150                        | BackupExecutionOperationState::Failed
151                )
152            })
153            .min_by_key(|operation| operation.sequence)
154    }
155
156    /// Mark the next transitionable operation pending.
157    pub fn mark_next_operation_pending_at(
158        &mut self,
159        updated_at: Option<String>,
160    ) -> Result<(), BackupExecutionJournalError> {
161        let sequence = self
162            .next_ready_operation()
163            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
164            .sequence;
165        self.mark_operation_pending_at(sequence, updated_at)
166    }
167
168    /// Mark one operation pending.
169    pub fn mark_operation_pending_at(
170        &mut self,
171        sequence: usize,
172        updated_at: Option<String>,
173    ) -> Result<(), BackupExecutionJournalError> {
174        validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
175        let expected = self
176            .next_ready_operation()
177            .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
178            .sequence;
179        if sequence != expected {
180            return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
181                requested: sequence,
182                next: expected,
183            });
184        }
185        let index = self.operation_index(sequence)?;
186        let operation = &self.operations[index];
187        if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
188            return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
189        }
190        if !matches!(
191            operation.state,
192            BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
193        ) {
194            return Err(BackupExecutionJournalError::InvalidOperationTransition {
195                sequence,
196                from: operation.state.clone(),
197                to: BackupExecutionOperationState::Pending,
198            });
199        }
200
201        let operation = &mut self.operations[index];
202        operation.state = BackupExecutionOperationState::Pending;
203        operation.state_updated_at = updated_at;
204        operation.blocking_reasons.clear();
205        self.refresh_restart_required();
206        self.validate()
207    }
208
209    /// Record one operation receipt and transition the matching operation.
210    pub fn record_operation_receipt(
211        &mut self,
212        receipt: BackupExecutionOperationReceipt,
213    ) -> Result<(), BackupExecutionJournalError> {
214        receipt.validate_against(self)?;
215        let index = self.operation_index(receipt.sequence)?;
216        let operation = &self.operations[index];
217        if operation.state != BackupExecutionOperationState::Pending {
218            return Err(
219                BackupExecutionJournalError::ReceiptWithoutPendingOperation {
220                    sequence: receipt.sequence,
221                },
222            );
223        }
224
225        let next_state = match receipt.outcome {
226            BackupExecutionOperationReceiptOutcome::Completed => {
227                BackupExecutionOperationState::Completed
228            }
229            BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
230            BackupExecutionOperationReceiptOutcome::Skipped => {
231                BackupExecutionOperationState::Skipped
232            }
233        };
234        let failure_reason = receipt.failure_reason.clone();
235        let previous_operation = self.operations[index].clone();
236        let previous_restart_required = self.restart_required;
237        self.operation_receipts.push(receipt);
238
239        let operation = &mut self.operations[index];
240        operation.state = next_state;
241        operation.state_updated_at = self
242            .operation_receipts
243            .last()
244            .and_then(|receipt| receipt.updated_at.clone());
245        operation.blocking_reasons = failure_reason.into_iter().collect();
246        self.refresh_restart_required();
247        if let Err(error) = self.validate() {
248            self.operation_receipts.pop();
249            self.operations[index] = previous_operation;
250            self.restart_required = previous_restart_required;
251            return Err(error);
252        }
253        Ok(())
254    }
255
256    /// Move a failed operation back to ready for retry.
257    pub fn retry_failed_operation_at(
258        &mut self,
259        sequence: usize,
260        updated_at: Option<String>,
261    ) -> Result<(), BackupExecutionJournalError> {
262        validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
263        let index = self.operation_index(sequence)?;
264        if self.operations[index].state != BackupExecutionOperationState::Failed {
265            return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
266        }
267        self.operations[index].state = BackupExecutionOperationState::Ready;
268        self.operations[index].state_updated_at = updated_at;
269        self.operations[index].blocking_reasons.clear();
270        self.refresh_restart_required();
271        self.validate()
272    }
273
274    /// Build a compact resumability summary.
275    #[must_use]
276    pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
277        let mut summary = BackupExecutionResumeSummary {
278            plan_id: self.plan_id.clone(),
279            run_id: self.run_id.clone(),
280            preflight_id: self.preflight_id.clone(),
281            preflight_accepted: self.preflight_accepted,
282            restart_required: self.restart_required,
283            total_operations: self.operations.len(),
284            ready_operations: 0,
285            pending_operations: 0,
286            blocked_operations: 0,
287            completed_operations: 0,
288            failed_operations: 0,
289            skipped_operations: 0,
290            next_operation: self.next_ready_operation().cloned(),
291        };
292        for operation in &self.operations {
293            match operation.state {
294                BackupExecutionOperationState::Ready => summary.ready_operations += 1,
295                BackupExecutionOperationState::Pending => summary.pending_operations += 1,
296                BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
297                BackupExecutionOperationState::Completed => summary.completed_operations += 1,
298                BackupExecutionOperationState::Failed => summary.failed_operations += 1,
299                BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
300            }
301        }
302        summary
303    }
304
305    fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
306        self.operations
307            .iter()
308            .position(|operation| operation.sequence == sequence)
309            .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
310    }
311
312    fn refresh_blocked_operations(&mut self) {
313        if self.preflight_accepted {
314            return;
315        }
316        for operation in &mut self.operations {
317            if operation_kind_is_mutating(&operation.kind) {
318                operation.state = BackupExecutionOperationState::Blocked;
319                operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
320            }
321        }
322    }
323
324    fn refresh_restart_required(&mut self) {
325        self.restart_required = self.derived_restart_required();
326    }
327
328    fn derived_restart_required(&self) -> bool {
329        let stopped = self.operations.iter().any(|operation| {
330            operation.kind == BackupOperationKind::Stop
331                && operation.state == BackupExecutionOperationState::Completed
332        });
333        let unstarted = self.operations.iter().any(|operation| {
334            operation.kind == BackupOperationKind::Start
335                && !matches!(
336                    operation.state,
337                    BackupExecutionOperationState::Completed
338                        | BackupExecutionOperationState::Skipped
339                )
340        });
341        stopped && unstarted
342    }
343}