1mod operation;
8mod receipt;
9#[cfg(test)]
10mod tests;
11mod types;
12mod validation;
13
14pub use types::*;
15
16use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
17use validation::{
18 operation_kind_is_mutating, operation_kind_is_preflight, validate_nonempty,
19 validate_operation_sequences,
20};
21
22const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
23const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
24
25impl BackupExecutionJournal {
26 pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
28 plan.validate()
29 .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
30 let operations = plan
31 .phases
32 .iter()
33 .map(BackupExecutionJournalOperation::from_plan_operation)
34 .collect::<Vec<_>>();
35 let mut journal = Self {
36 journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
37 plan_id: plan.plan_id.clone(),
38 run_id: plan.run_id.clone(),
39 preflight_id: None,
40 preflight_accepted: false,
41 restart_required: false,
42 operations,
43 operation_receipts: Vec::new(),
44 };
45 journal.refresh_blocked_operations();
46 journal.validate()?;
47 Ok(journal)
48 }
49
50 pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
52 if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
53 return Err(BackupExecutionJournalError::UnsupportedVersion(
54 self.journal_version,
55 ));
56 }
57 validate_nonempty("plan_id", &self.plan_id)?;
58 validate_nonempty("run_id", &self.run_id)?;
59 if let Some(preflight_id) = &self.preflight_id {
60 validate_nonempty("preflight_id", preflight_id)?;
61 } else if self.preflight_accepted {
62 return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
63 }
64 if self.restart_required != self.derived_restart_required() {
65 return Err(BackupExecutionJournalError::RestartRequiredMismatch);
66 }
67 validate_operation_sequences(&self.operations)?;
68 for operation in &self.operations {
69 operation.validate()?;
70 if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
71 match operation.state {
72 BackupExecutionOperationState::Blocked => {}
73 BackupExecutionOperationState::Ready
74 | BackupExecutionOperationState::Pending
75 | BackupExecutionOperationState::Completed
76 | BackupExecutionOperationState::Failed
77 | BackupExecutionOperationState::Skipped => {
78 return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
79 sequence: operation.sequence,
80 });
81 }
82 }
83 }
84 }
85 for receipt in &self.operation_receipts {
86 receipt.validate_against(self)?;
87 }
88 Ok(())
89 }
90
91 pub fn accept_preflight_bundle_at(
93 &mut self,
94 preflight_id: String,
95 updated_at: Option<String>,
96 ) -> Result<(), BackupExecutionJournalError> {
97 validate_nonempty("preflight_id", &preflight_id)?;
98 validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
99 if let Some(existing) = &self.preflight_id
100 && existing != &preflight_id
101 {
102 return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
103 existing: existing.clone(),
104 attempted: preflight_id,
105 });
106 }
107
108 self.preflight_id = Some(preflight_id);
109 self.preflight_accepted = true;
110 for operation in &mut self.operations {
111 if operation_kind_is_preflight(&operation.kind) {
112 operation.state = BackupExecutionOperationState::Completed;
113 operation.state_updated_at.clone_from(&updated_at);
114 operation.blocking_reasons.clear();
115 } else if operation.state == BackupExecutionOperationState::Blocked {
116 operation.state = BackupExecutionOperationState::Ready;
117 operation.blocking_reasons.clear();
118 }
119 }
120 self.refresh_restart_required();
121 self.validate()
122 }
123
124 pub fn accept_preflight_receipts_at(
126 &mut self,
127 receipts: &BackupExecutionPreflightReceipts,
128 updated_at: Option<String>,
129 ) -> Result<(), BackupExecutionJournalError> {
130 validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
131 if receipts.plan_id != self.plan_id {
132 return Err(BackupExecutionJournalError::PreflightPlanMismatch {
133 expected: self.plan_id.clone(),
134 actual: receipts.plan_id.clone(),
135 });
136 }
137 self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
138 }
139
140 #[must_use]
142 pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
143 self.operations
144 .iter()
145 .filter(|operation| {
146 matches!(
147 operation.state,
148 BackupExecutionOperationState::Ready
149 | BackupExecutionOperationState::Pending
150 | BackupExecutionOperationState::Failed
151 )
152 })
153 .min_by_key(|operation| operation.sequence)
154 }
155
156 pub fn mark_next_operation_pending_at(
158 &mut self,
159 updated_at: Option<String>,
160 ) -> Result<(), BackupExecutionJournalError> {
161 let sequence = self
162 .next_ready_operation()
163 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
164 .sequence;
165 self.mark_operation_pending_at(sequence, updated_at)
166 }
167
168 pub fn mark_operation_pending_at(
170 &mut self,
171 sequence: usize,
172 updated_at: Option<String>,
173 ) -> Result<(), BackupExecutionJournalError> {
174 validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
175 let expected = self
176 .next_ready_operation()
177 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
178 .sequence;
179 if sequence != expected {
180 return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
181 requested: sequence,
182 next: expected,
183 });
184 }
185 let index = self.operation_index(sequence)?;
186 let operation = &self.operations[index];
187 if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
188 return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
189 }
190 if !matches!(
191 operation.state,
192 BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
193 ) {
194 return Err(BackupExecutionJournalError::InvalidOperationTransition {
195 sequence,
196 from: operation.state.clone(),
197 to: BackupExecutionOperationState::Pending,
198 });
199 }
200
201 let operation = &mut self.operations[index];
202 operation.state = BackupExecutionOperationState::Pending;
203 operation.state_updated_at = updated_at;
204 operation.blocking_reasons.clear();
205 self.refresh_restart_required();
206 self.validate()
207 }
208
209 pub fn record_operation_receipt(
211 &mut self,
212 receipt: BackupExecutionOperationReceipt,
213 ) -> Result<(), BackupExecutionJournalError> {
214 receipt.validate_against(self)?;
215 let index = self.operation_index(receipt.sequence)?;
216 let operation = &self.operations[index];
217 if operation.state != BackupExecutionOperationState::Pending {
218 return Err(
219 BackupExecutionJournalError::ReceiptWithoutPendingOperation {
220 sequence: receipt.sequence,
221 },
222 );
223 }
224
225 let next_state = match receipt.outcome {
226 BackupExecutionOperationReceiptOutcome::Completed => {
227 BackupExecutionOperationState::Completed
228 }
229 BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
230 BackupExecutionOperationReceiptOutcome::Skipped => {
231 BackupExecutionOperationState::Skipped
232 }
233 };
234 let failure_reason = receipt.failure_reason.clone();
235 let previous_operation = self.operations[index].clone();
236 let previous_restart_required = self.restart_required;
237 self.operation_receipts.push(receipt);
238
239 let operation = &mut self.operations[index];
240 operation.state = next_state;
241 operation.state_updated_at = self
242 .operation_receipts
243 .last()
244 .and_then(|receipt| receipt.updated_at.clone());
245 operation.blocking_reasons = failure_reason.into_iter().collect();
246 self.refresh_restart_required();
247 if let Err(error) = self.validate() {
248 self.operation_receipts.pop();
249 self.operations[index] = previous_operation;
250 self.restart_required = previous_restart_required;
251 return Err(error);
252 }
253 Ok(())
254 }
255
256 pub fn retry_failed_operation_at(
258 &mut self,
259 sequence: usize,
260 updated_at: Option<String>,
261 ) -> Result<(), BackupExecutionJournalError> {
262 validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
263 let index = self.operation_index(sequence)?;
264 if self.operations[index].state != BackupExecutionOperationState::Failed {
265 return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
266 }
267 self.operations[index].state = BackupExecutionOperationState::Ready;
268 self.operations[index].state_updated_at = updated_at;
269 self.operations[index].blocking_reasons.clear();
270 self.refresh_restart_required();
271 self.validate()
272 }
273
274 #[must_use]
276 pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
277 let mut summary = BackupExecutionResumeSummary {
278 plan_id: self.plan_id.clone(),
279 run_id: self.run_id.clone(),
280 preflight_id: self.preflight_id.clone(),
281 preflight_accepted: self.preflight_accepted,
282 restart_required: self.restart_required,
283 total_operations: self.operations.len(),
284 ready_operations: 0,
285 pending_operations: 0,
286 blocked_operations: 0,
287 completed_operations: 0,
288 failed_operations: 0,
289 skipped_operations: 0,
290 next_operation: self.next_ready_operation().cloned(),
291 };
292 for operation in &self.operations {
293 match operation.state {
294 BackupExecutionOperationState::Ready => summary.ready_operations += 1,
295 BackupExecutionOperationState::Pending => summary.pending_operations += 1,
296 BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
297 BackupExecutionOperationState::Completed => summary.completed_operations += 1,
298 BackupExecutionOperationState::Failed => summary.failed_operations += 1,
299 BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
300 }
301 }
302 summary
303 }
304
305 fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
306 self.operations
307 .iter()
308 .position(|operation| operation.sequence == sequence)
309 .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
310 }
311
312 fn refresh_blocked_operations(&mut self) {
313 if self.preflight_accepted {
314 return;
315 }
316 for operation in &mut self.operations {
317 if operation_kind_is_mutating(&operation.kind) {
318 operation.state = BackupExecutionOperationState::Blocked;
319 operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
320 }
321 }
322 }
323
324 fn refresh_restart_required(&mut self) {
325 self.restart_required = self.derived_restart_required();
326 }
327
328 fn derived_restart_required(&self) -> bool {
329 let stopped = self.operations.iter().any(|operation| {
330 operation.kind == BackupOperationKind::Stop
331 && operation.state == BackupExecutionOperationState::Completed
332 });
333 let unstarted = self.operations.iter().any(|operation| {
334 operation.kind == BackupOperationKind::Start
335 && !matches!(
336 operation.state,
337 BackupExecutionOperationState::Completed
338 | BackupExecutionOperationState::Skipped
339 )
340 });
341 stopped && unstarted
342 }
343}