1mod operation;
2mod receipt;
3mod types;
4mod validation;
5
6pub use types::*;
7
8use crate::plan::{BackupExecutionPreflightReceipts, BackupOperationKind, BackupPlan};
9use validation::{
10 operation_kind_is_mutating, operation_kind_is_preflight, validate_nonempty,
11 validate_operation_sequences, validate_optional_nonempty,
12};
13
14const BACKUP_EXECUTION_JOURNAL_VERSION: u16 = 1;
15const PREFLIGHT_NOT_ACCEPTED: &str = "preflight-not-accepted";
16
17impl BackupExecutionJournal {
18 pub fn from_plan(plan: &BackupPlan) -> Result<Self, BackupExecutionJournalError> {
20 plan.validate()
21 .map_err(|error| BackupExecutionJournalError::InvalidPlan(error.to_string()))?;
22 let operations = plan
23 .phases
24 .iter()
25 .map(BackupExecutionJournalOperation::from_plan_operation)
26 .collect::<Vec<_>>();
27 let mut journal = Self {
28 journal_version: BACKUP_EXECUTION_JOURNAL_VERSION,
29 plan_id: plan.plan_id.clone(),
30 run_id: plan.run_id.clone(),
31 preflight_id: None,
32 preflight_accepted: false,
33 restart_required: false,
34 operations,
35 operation_receipts: Vec::new(),
36 };
37 journal.refresh_blocked_operations();
38 journal.validate()?;
39 Ok(journal)
40 }
41
42 pub fn validate(&self) -> Result<(), BackupExecutionJournalError> {
44 if self.journal_version != BACKUP_EXECUTION_JOURNAL_VERSION {
45 return Err(BackupExecutionJournalError::UnsupportedVersion(
46 self.journal_version,
47 ));
48 }
49 validate_nonempty("plan_id", &self.plan_id)?;
50 validate_nonempty("run_id", &self.run_id)?;
51 if let Some(preflight_id) = &self.preflight_id {
52 validate_nonempty("preflight_id", preflight_id)?;
53 } else if self.preflight_accepted {
54 return Err(BackupExecutionJournalError::AcceptedPreflightMissingId);
55 }
56 validate_operation_sequences(&self.operations)?;
57 for operation in &self.operations {
58 operation.validate()?;
59 if !self.preflight_accepted && operation_kind_is_mutating(&operation.kind) {
60 match operation.state {
61 BackupExecutionOperationState::Blocked => {}
62 BackupExecutionOperationState::Ready
63 | BackupExecutionOperationState::Pending
64 | BackupExecutionOperationState::Completed
65 | BackupExecutionOperationState::Failed
66 | BackupExecutionOperationState::Skipped => {
67 return Err(BackupExecutionJournalError::MutationReadyBeforePreflight {
68 sequence: operation.sequence,
69 });
70 }
71 }
72 }
73 }
74 for receipt in &self.operation_receipts {
75 receipt.validate_against(self)?;
76 }
77 Ok(())
78 }
79
80 pub fn accept_preflight_bundle_at(
82 &mut self,
83 preflight_id: String,
84 updated_at: Option<String>,
85 ) -> Result<(), BackupExecutionJournalError> {
86 validate_nonempty("preflight_id", &preflight_id)?;
87 validate_optional_nonempty("updated_at", updated_at.as_deref())?;
88 if let Some(existing) = &self.preflight_id
89 && existing != &preflight_id
90 {
91 return Err(BackupExecutionJournalError::PreflightAlreadyAccepted {
92 existing: existing.clone(),
93 attempted: preflight_id,
94 });
95 }
96
97 self.preflight_id = Some(preflight_id);
98 self.preflight_accepted = true;
99 for operation in &mut self.operations {
100 if operation_kind_is_preflight(&operation.kind) {
101 operation.state = BackupExecutionOperationState::Completed;
102 operation.state_updated_at.clone_from(&updated_at);
103 operation.blocking_reasons.clear();
104 } else if operation.state == BackupExecutionOperationState::Blocked {
105 operation.state = BackupExecutionOperationState::Ready;
106 operation.blocking_reasons.clear();
107 }
108 }
109 self.refresh_restart_required();
110 self.validate()
111 }
112
113 pub fn accept_preflight_receipts_at(
115 &mut self,
116 receipts: &BackupExecutionPreflightReceipts,
117 updated_at: Option<String>,
118 ) -> Result<(), BackupExecutionJournalError> {
119 validate_nonempty("preflight_receipts.plan_id", &receipts.plan_id)?;
120 if receipts.plan_id != self.plan_id {
121 return Err(BackupExecutionJournalError::PreflightPlanMismatch {
122 expected: self.plan_id.clone(),
123 actual: receipts.plan_id.clone(),
124 });
125 }
126 self.accept_preflight_bundle_at(receipts.preflight_id.clone(), updated_at)
127 }
128
129 #[must_use]
131 pub fn next_ready_operation(&self) -> Option<&BackupExecutionJournalOperation> {
132 self.operations
133 .iter()
134 .filter(|operation| {
135 matches!(
136 operation.state,
137 BackupExecutionOperationState::Ready
138 | BackupExecutionOperationState::Pending
139 | BackupExecutionOperationState::Failed
140 )
141 })
142 .min_by_key(|operation| operation.sequence)
143 }
144
145 pub fn mark_next_operation_pending_at(
147 &mut self,
148 updated_at: Option<String>,
149 ) -> Result<(), BackupExecutionJournalError> {
150 let sequence = self
151 .next_ready_operation()
152 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
153 .sequence;
154 self.mark_operation_pending_at(sequence, updated_at)
155 }
156
157 pub fn mark_operation_pending_at(
159 &mut self,
160 sequence: usize,
161 updated_at: Option<String>,
162 ) -> Result<(), BackupExecutionJournalError> {
163 validate_optional_nonempty("updated_at", updated_at.as_deref())?;
164 let expected = self
165 .next_ready_operation()
166 .ok_or(BackupExecutionJournalError::NoTransitionableOperation)?
167 .sequence;
168 if sequence != expected {
169 return Err(BackupExecutionJournalError::OutOfOrderOperationTransition {
170 requested: sequence,
171 next: expected,
172 });
173 }
174 let index = self.operation_index(sequence)?;
175 let operation = &self.operations[index];
176 if operation_kind_is_mutating(&operation.kind) && !self.preflight_accepted {
177 return Err(BackupExecutionJournalError::MutationBeforePreflightAccepted { sequence });
178 }
179 if !matches!(
180 operation.state,
181 BackupExecutionOperationState::Ready | BackupExecutionOperationState::Failed
182 ) {
183 return Err(BackupExecutionJournalError::InvalidOperationTransition {
184 sequence,
185 from: operation.state.clone(),
186 to: BackupExecutionOperationState::Pending,
187 });
188 }
189
190 let operation = &mut self.operations[index];
191 operation.state = BackupExecutionOperationState::Pending;
192 operation.state_updated_at = updated_at;
193 operation.blocking_reasons.clear();
194 self.refresh_restart_required();
195 self.validate()
196 }
197
198 pub fn record_operation_receipt(
200 &mut self,
201 receipt: BackupExecutionOperationReceipt,
202 ) -> Result<(), BackupExecutionJournalError> {
203 receipt.validate_against(self)?;
204 let index = self.operation_index(receipt.sequence)?;
205 let operation = &self.operations[index];
206 if operation.state != BackupExecutionOperationState::Pending {
207 return Err(
208 BackupExecutionJournalError::ReceiptWithoutPendingOperation {
209 sequence: receipt.sequence,
210 },
211 );
212 }
213
214 let next_state = match receipt.outcome {
215 BackupExecutionOperationReceiptOutcome::Completed => {
216 BackupExecutionOperationState::Completed
217 }
218 BackupExecutionOperationReceiptOutcome::Failed => BackupExecutionOperationState::Failed,
219 BackupExecutionOperationReceiptOutcome::Skipped => {
220 BackupExecutionOperationState::Skipped
221 }
222 };
223 let failure_reason = receipt.failure_reason.clone();
224 self.operation_receipts.push(receipt);
225
226 let operation = &mut self.operations[index];
227 operation.state = next_state;
228 operation.state_updated_at = self
229 .operation_receipts
230 .last()
231 .and_then(|receipt| receipt.updated_at.clone());
232 operation.blocking_reasons = failure_reason.into_iter().collect();
233 self.refresh_restart_required();
234 if let Err(error) = self.validate() {
235 self.operation_receipts.pop();
236 return Err(error);
237 }
238 Ok(())
239 }
240
241 pub fn retry_failed_operation_at(
243 &mut self,
244 sequence: usize,
245 updated_at: Option<String>,
246 ) -> Result<(), BackupExecutionJournalError> {
247 validate_optional_nonempty("updated_at", updated_at.as_deref())?;
248 let index = self.operation_index(sequence)?;
249 if self.operations[index].state != BackupExecutionOperationState::Failed {
250 return Err(BackupExecutionJournalError::OperationNotFailed(sequence));
251 }
252 self.operations[index].state = BackupExecutionOperationState::Ready;
253 self.operations[index].state_updated_at = updated_at;
254 self.operations[index].blocking_reasons.clear();
255 self.refresh_restart_required();
256 self.validate()
257 }
258
259 #[must_use]
261 pub fn resume_summary(&self) -> BackupExecutionResumeSummary {
262 let mut summary = BackupExecutionResumeSummary {
263 plan_id: self.plan_id.clone(),
264 run_id: self.run_id.clone(),
265 preflight_id: self.preflight_id.clone(),
266 preflight_accepted: self.preflight_accepted,
267 restart_required: self.restart_required,
268 total_operations: self.operations.len(),
269 ready_operations: 0,
270 pending_operations: 0,
271 blocked_operations: 0,
272 completed_operations: 0,
273 failed_operations: 0,
274 skipped_operations: 0,
275 next_operation: self.next_ready_operation().cloned(),
276 };
277 for operation in &self.operations {
278 match operation.state {
279 BackupExecutionOperationState::Ready => summary.ready_operations += 1,
280 BackupExecutionOperationState::Pending => summary.pending_operations += 1,
281 BackupExecutionOperationState::Blocked => summary.blocked_operations += 1,
282 BackupExecutionOperationState::Completed => summary.completed_operations += 1,
283 BackupExecutionOperationState::Failed => summary.failed_operations += 1,
284 BackupExecutionOperationState::Skipped => summary.skipped_operations += 1,
285 }
286 }
287 summary
288 }
289
290 fn operation_index(&self, sequence: usize) -> Result<usize, BackupExecutionJournalError> {
291 self.operations
292 .iter()
293 .position(|operation| operation.sequence == sequence)
294 .ok_or(BackupExecutionJournalError::OperationNotFound(sequence))
295 }
296
297 fn refresh_blocked_operations(&mut self) {
298 if self.preflight_accepted {
299 return;
300 }
301 for operation in &mut self.operations {
302 if operation_kind_is_mutating(&operation.kind) {
303 operation.state = BackupExecutionOperationState::Blocked;
304 operation.blocking_reasons = vec![PREFLIGHT_NOT_ACCEPTED.to_string()];
305 }
306 }
307 }
308
309 fn refresh_restart_required(&mut self) {
310 let stopped = self.operations.iter().any(|operation| {
311 operation.kind == BackupOperationKind::Stop
312 && operation.state == BackupExecutionOperationState::Completed
313 });
314 let unstarted = self.operations.iter().any(|operation| {
315 operation.kind == BackupOperationKind::Start
316 && !matches!(
317 operation.state,
318 BackupExecutionOperationState::Completed
319 | BackupExecutionOperationState::Skipped
320 )
321 });
322 self.restart_required = stopped && unstarted;
323 }
324}
325
326#[cfg(test)]
327mod tests;