1mod operation;
2mod receipt;
3mod types;
4mod validation;
5
6pub use types::*;
7
8use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
9use validation::{
10 operation_kind_is_mutating, operation_kind_is_preflight, validate_nonempty,
11 validate_operation_sequences,
12};
13
14const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
15const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
16
17impl BackupExecutionJournal {
18 pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
20 plan.validate()
21 .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
22 let operations = plan
23 .phases
24 .iter()
25 .map(BackupExecutionJournalOperation::from_plan_operation)
26 .collect::<Vec<_>>();
27 let mut journal = Self {
28 journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
29 plan_id: plan.plan_id.clone(),
30 run_id: plan.run_id.clone(),
31 preflight_id: None,
32 preflight_accepted: false,
33 restart_required: false,
34 operations,
35 operation_receipts: Vec::new(),
36 };
37 journal.refresh_blocked_operations();
38 journal.validate()?;
39 Ok(journal)
40 }
41
42 pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
44 if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
45 return Err(BackupExecutionJournalError::UnsupportedVersion(
46 self.journal_version,
47 ));
48 }
49 validate_nonempty("plan_id", &self.plan_id)?;
50 validate_nonempty("run_id", &self.run_id)?;
51 if let Some(preflight_id) = &self.preflight_id {
52 validate_nonempty("preflight_id", preflight_id)?;
53 } else if self.preflight_accepted {
54 return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
55 }
56 if self.restart_required != self.derived_restart_required() {
57 return Err(BackupExecutionJournalError::RestartRequiredMismatch);
58 }
59 validate_operation_sequences(&self.operations)?;
60 for operation in &self.operations {
61 operation.validate()?;
62 if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
63 match operation.state {
64 BackupExecutionOperationState::Blocked => {}
65 BackupExecutionOperationState::Ready
66 | BackupExecutionOperationState::Pending
67 | BackupExecutionOperationState::Completed
68 | BackupExecutionOperationState::Failed
69 | BackupExecutionOperationState::Skipped => {
70 return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
71 sequence: operation.sequence,
72 });
73 }
74 }
75 }
76 }
77 for receipt in &self.operation_receipts {
78 receipt.validate_against(self)?;
79 }
80 Ok(())
81 }
82
83 pub fn accept_preflight_bundle_at(
85 &mut self,
86 preflight_id: String,
87 updated_at: Option<String>,
88 ) -> Result<(), BackupExecutionJournalError> {
89 validate_nonempty("preflight_id", &preflight_id)?;
90 validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
91 if let Some(existing) = &self.preflight_id
92 && existing != &preflight_id
93 {
94 return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
95 existing: existing.clone(),
96 attempted: preflight_id,
97 });
98 }
99
100 self.preflight_id = Some(preflight_id);
101 self.preflight_accepted = true;
102 for operation in &mut self.operations {
103 if operation_kind_is_preflight(&operation.kind) {
104 operation.state = BackupExecutionOperationState::Completed;
105 operation.state_updated_at.clone_from(&updated_at);
106 operation.blocking_reasons.clear();
107 } else if operation.state == BackupExecutionOperationState::Blocked {
108 operation.state = BackupExecutionOperationState::Ready;
109 operation.blocking_reasons.clear();
110 }
111 }
112 self.refresh_restart_required();
113 self.validate()
114 }
115
116 pub fn accept_preflight_receipts_at(
118 &mut self,
119 receipts: &BackupExecutionPreflightReceipts,
120 updated_at: Option<String>,
121 ) -> Result<(), BackupExecutionJournalError> {
122 validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
123 if receipts.plan_id != self.plan_id {
124 return Err(BackupExecutionJournalError::PreflightPlanMismatch {
125 expected: self.plan_id.clone(),
126 actual: receipts.plan_id.clone(),
127 });
128 }
129 self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
130 }
131
132 #[must_use]
134 pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
135 self.operations
136 .iter()
137 .filter(|operation| {
138 matches!(
139 operation.state,
140 BackupExecutionOperationState::Ready
141 | BackupExecutionOperationState::Pending
142 | BackupExecutionOperationState::Failed
143 )
144 })
145 .min_by_key(|operation| operation.sequence)
146 }
147
148 pub fn mark_next_operation_pending_at(
150 &mut self,
151 updated_at: Option<String>,
152 ) -> Result<(), BackupExecutionJournalError> {
153 let sequence = self
154 .next_ready_operation()
155 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
156 .sequence;
157 self.mark_operation_pending_at(sequence, updated_at)
158 }
159
160 pub fn mark_operation_pending_at(
162 &mut self,
163 sequence: usize,
164 updated_at: Option<String>,
165 ) -> Result<(), BackupExecutionJournalError> {
166 validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
167 let expected = self
168 .next_ready_operation()
169 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
170 .sequence;
171 if sequence != expected {
172 return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
173 requested: sequence,
174 next: expected,
175 });
176 }
177 let index = self.operation_index(sequence)?;
178 let operation = &self.operations[index];
179 if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
180 return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
181 }
182 if !matches!(
183 operation.state,
184 BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
185 ) {
186 return Err(BackupExecutionJournalError::InvalidOperationTransition {
187 sequence,
188 from: operation.state.clone(),
189 to: BackupExecutionOperationState::Pending,
190 });
191 }
192
193 let operation = &mut self.operations[index];
194 operation.state = BackupExecutionOperationState::Pending;
195 operation.state_updated_at = updated_at;
196 operation.blocking_reasons.clear();
197 self.refresh_restart_required();
198 self.validate()
199 }
200
201 pub fn record_operation_receipt(
203 &mut self,
204 receipt: BackupExecutionOperationReceipt,
205 ) -> Result<(), BackupExecutionJournalError> {
206 receipt.validate_against(self)?;
207 let index = self.operation_index(receipt.sequence)?;
208 let operation = &self.operations[index];
209 if operation.state != BackupExecutionOperationState::Pending {
210 return Err(
211 BackupExecutionJournalError::ReceiptWithoutPendingOperation {
212 sequence: receipt.sequence,
213 },
214 );
215 }
216
217 let next_state = match receipt.outcome {
218 BackupExecutionOperationReceiptOutcome::Completed => {
219 BackupExecutionOperationState::Completed
220 }
221 BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
222 BackupExecutionOperationReceiptOutcome::Skipped => {
223 BackupExecutionOperationState::Skipped
224 }
225 };
226 let failure_reason = receipt.failure_reason.clone();
227 let previous_operation = self.operations[index].clone();
228 let previous_restart_required = self.restart_required;
229 self.operation_receipts.push(receipt);
230
231 let operation = &mut self.operations[index];
232 operation.state = next_state;
233 operation.state_updated_at = self
234 .operation_receipts
235 .last()
236 .and_then(|receipt| receipt.updated_at.clone());
237 operation.blocking_reasons = failure_reason.into_iter().collect();
238 self.refresh_restart_required();
239 if let Err(error) = self.validate() {
240 self.operation_receipts.pop();
241 self.operations[index] = previous_operation;
242 self.restart_required = previous_restart_required;
243 return Err(error);
244 }
245 Ok(())
246 }
247
248 pub fn retry_failed_operation_at(
250 &mut self,
251 sequence: usize,
252 updated_at: Option<String>,
253 ) -> Result<(), BackupExecutionJournalError> {
254 validate_nonempty("updated_at", updated_at.as_deref().unwrap_or_default())?;
255 let index = self.operation_index(sequence)?;
256 if self.operations[index].state != BackupExecutionOperationState::Failed {
257 return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
258 }
259 self.operations[index].state = BackupExecutionOperationState::Ready;
260 self.operations[index].state_updated_at = updated_at;
261 self.operations[index].blocking_reasons.clear();
262 self.refresh_restart_required();
263 self.validate()
264 }
265
266 #[must_use]
268 pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
269 let mut summary = BackupExecutionResumeSummary {
270 plan_id: self.plan_id.clone(),
271 run_id: self.run_id.clone(),
272 preflight_id: self.preflight_id.clone(),
273 preflight_accepted: self.preflight_accepted,
274 restart_required: self.restart_required,
275 total_operations: self.operations.len(),
276 ready_operations: 0,
277 pending_operations: 0,
278 blocked_operations: 0,
279 completed_operations: 0,
280 failed_operations: 0,
281 skipped_operations: 0,
282 next_operation: self.next_ready_operation().cloned(),
283 };
284 for operation in &self.operations {
285 match operation.state {
286 BackupExecutionOperationState::Ready => summary.ready_operations += 1,
287 BackupExecutionOperationState::Pending => summary.pending_operations += 1,
288 BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
289 BackupExecutionOperationState::Completed => summary.completed_operations += 1,
290 BackupExecutionOperationState::Failed => summary.failed_operations += 1,
291 BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
292 }
293 }
294 summary
295 }
296
297 fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
298 self.operations
299 .iter()
300 .position(|operation| operation.sequence == sequence)
301 .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
302 }
303
304 fn refresh_blocked_operations(&mut self) {
305 if self.preflight_accepted {
306 return;
307 }
308 for operation in &mut self.operations {
309 if operation_kind_is_mutating(&operation.kind) {
310 operation.state = BackupExecutionOperationState::Blocked;
311 operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
312 }
313 }
314 }
315
316 fn refresh_restart_required(&mut self) {
317 self.restart_required = self.derived_restart_required();
318 }
319
320 fn derived_restart_required(&self) -> bool {
321 let stopped = self.operations.iter().any(|operation| {
322 operation.kind == BackupOperationKind::Stop
323 && operation.state == BackupExecutionOperationState::Completed
324 });
325 let unstarted = self.operations.iter().any(|operation| {
326 operation.kind == BackupOperationKind::Start
327 && !matches!(
328 operation.state,
329 BackupExecutionOperationState::Completed
330 | BackupExecutionOperationState::Skipped
331 )
332 });
333 stopped && unstarted
334 }
335}
336
337#[cfg(test)]
338mod tests;