canic_backup/restore/apply/journal/
mod.rs1use super::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
2use serde::{Deserialize, Serialize};
3use std::collections::BTreeSet;
4
5mod commands;
6mod counts;
7mod receipts;
8mod reports;
9mod types;
10
11pub use commands::{
12 RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
13};
14use counts::RestoreApplyJournalStateCounts;
15pub use counts::RestoreApplyOperationKindCounts;
16pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
17pub use receipts::{
18 RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
19};
20pub(in crate::restore) use reports::RestoreApplyJournalReport;
21pub use reports::{
22 RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
23 RestoreApplyReportOutcome,
24};
25pub use types::{
26 RestoreApplyJournalError, RestoreApplyJournalOperation, RestoreApplyOperationKind,
27 RestoreApplyOperationState,
28};
29use types::{
30 restore_apply_blocked_reasons, validate_apply_journal_count, validate_apply_journal_nonempty,
31 validate_apply_journal_sequences, validate_apply_journal_version,
32};
33
34#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
39pub struct RestoreApplyJournal {
40 pub journal_version: u16,
41 pub backup_id: String,
42 pub ready: bool,
43 pub blocked_reasons: Vec<String>,
44 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub backup_root: Option<String>,
46 pub operation_count: usize,
47 pub operation_counts: RestoreApplyOperationKindCounts,
48 pub pending_operations: usize,
49 pub ready_operations: usize,
50 pub blocked_operations: usize,
51 pub completed_operations: usize,
52 pub failed_operations: usize,
53 pub operations: Vec<RestoreApplyJournalOperation>,
54 #[serde(default, skip_serializing_if = "Vec::is_empty")]
55 pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
56}
57
58impl RestoreApplyJournal {
59 #[must_use]
61 pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
62 let blocked_reasons = restore_apply_blocked_reasons(dry_run);
63 let initial_state = if blocked_reasons.is_empty() {
64 RestoreApplyOperationState::Ready
65 } else {
66 RestoreApplyOperationState::Blocked
67 };
68 let operations = dry_run
69 .operations
70 .iter()
71 .map(|operation| {
72 RestoreApplyJournalOperation::from_dry_run_operation(
73 operation,
74 initial_state.clone(),
75 &blocked_reasons,
76 )
77 })
78 .collect::<Vec<_>>();
79 let ready_operations = operations
80 .iter()
81 .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
82 .count();
83 let blocked_operations = operations
84 .iter()
85 .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
86 .count();
87 let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
88
89 Self {
90 journal_version: 1,
91 backup_id: dry_run.backup_id.clone(),
92 ready: blocked_reasons.is_empty(),
93 blocked_reasons,
94 backup_root: dry_run
95 .artifact_validation
96 .as_ref()
97 .map(|validation| validation.backup_root.clone()),
98 operation_count: operations.len(),
99 operation_counts,
100 pending_operations: 0,
101 ready_operations,
102 blocked_operations,
103 completed_operations: 0,
104 failed_operations: 0,
105 operations,
106 operation_receipts: Vec::new(),
107 }
108 }
109
110 pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
112 validate_apply_journal_version(self.journal_version)?;
113 validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
114 if let Some(backup_root) = &self.backup_root {
115 validate_apply_journal_nonempty("backup_root", backup_root)?;
116 }
117 validate_apply_journal_count(
118 "operation_count",
119 self.operation_count,
120 self.operations.len(),
121 )?;
122
123 let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
124 let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
125 self.operation_counts.validate_matches(&operation_counts)?;
126 validate_apply_journal_count(
127 "pending_operations",
128 self.pending_operations,
129 state_counts.pending,
130 )?;
131 validate_apply_journal_count(
132 "ready_operations",
133 self.ready_operations,
134 state_counts.ready,
135 )?;
136 validate_apply_journal_count(
137 "blocked_operations",
138 self.blocked_operations,
139 state_counts.blocked,
140 )?;
141 validate_apply_journal_count(
142 "completed_operations",
143 self.completed_operations,
144 state_counts.completed,
145 )?;
146 validate_apply_journal_count(
147 "failed_operations",
148 self.failed_operations,
149 state_counts.failed,
150 )?;
151
152 if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
153 return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
154 }
155
156 validate_apply_journal_sequences(&self.operations)?;
157 for operation in &self.operations {
158 operation.validate()?;
159 }
160 self.validate_operation_receipt_attempts()?;
161 for receipt in &self.operation_receipts {
162 receipt.validate_against(self)?;
163 }
164
165 Ok(())
166 }
167
168 #[must_use]
170 pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
171 RestoreApplyJournalReport::from_journal(self)
172 }
173
174 #[must_use]
176 pub(in crate::restore) fn next_transition_operation(
177 &self,
178 ) -> Option<&RestoreApplyJournalOperation> {
179 self.operations
180 .iter()
181 .filter(|operation| {
182 matches!(
183 operation.state,
184 RestoreApplyOperationState::Ready
185 | RestoreApplyOperationState::Pending
186 | RestoreApplyOperationState::Failed
187 )
188 })
189 .min_by_key(|operation| operation.sequence)
190 }
191
192 #[must_use]
194 pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
195 RestoreApplyCommandPreview::from_journal(self)
196 }
197
198 #[must_use]
200 pub(in crate::restore) fn next_command_preview_with_config(
201 &self,
202 config: &RestoreApplyCommandConfig,
203 ) -> RestoreApplyCommandPreview {
204 RestoreApplyCommandPreview::from_journal_with_config(self, config)
205 }
206
207 pub(in crate::restore) fn record_operation_receipt(
209 &mut self,
210 receipt: RestoreApplyOperationReceipt,
211 ) -> Result<(), RestoreApplyJournalError> {
212 self.operation_receipts.push(receipt);
213 if let Err(error) = self.validate() {
214 self.operation_receipts.pop();
215 return Err(error);
216 }
217
218 Ok(())
219 }
220
221 pub fn mark_next_operation_pending_at(
223 &mut self,
224 updated_at: Option<String>,
225 ) -> Result<(), RestoreApplyJournalError> {
226 let sequence = self
227 .next_transition_sequence()
228 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
229 self.mark_operation_pending_at(sequence, updated_at)
230 }
231
232 pub(in crate::restore) fn mark_operation_pending_at(
234 &mut self,
235 sequence: usize,
236 updated_at: Option<String>,
237 ) -> Result<(), RestoreApplyJournalError> {
238 self.transition_operation(
239 sequence,
240 RestoreApplyOperationState::Pending,
241 Vec::new(),
242 updated_at,
243 )
244 }
245
246 pub(in crate::restore) fn mark_next_operation_ready_at(
248 &mut self,
249 updated_at: Option<String>,
250 ) -> Result<(), RestoreApplyJournalError> {
251 let operation = self
252 .next_transition_operation()
253 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
254 if operation.state != RestoreApplyOperationState::Pending {
255 return Err(RestoreApplyJournalError::NoPendingOperation);
256 }
257
258 self.mark_operation_ready_at(operation.sequence, updated_at)
259 }
260
261 pub(in crate::restore) fn mark_operation_ready_at(
263 &mut self,
264 sequence: usize,
265 updated_at: Option<String>,
266 ) -> Result<(), RestoreApplyJournalError> {
267 self.transition_operation(
268 sequence,
269 RestoreApplyOperationState::Ready,
270 Vec::new(),
271 updated_at,
272 )
273 }
274
275 pub fn retry_failed_operation_at(
277 &mut self,
278 sequence: usize,
279 updated_at: Option<String>,
280 ) -> Result<(), RestoreApplyJournalError> {
281 self.transition_operation(
282 sequence,
283 RestoreApplyOperationState::Ready,
284 Vec::new(),
285 updated_at,
286 )
287 }
288
289 pub(in crate::restore) fn mark_operation_completed_at(
291 &mut self,
292 sequence: usize,
293 updated_at: Option<String>,
294 ) -> Result<(), RestoreApplyJournalError> {
295 self.transition_operation(
296 sequence,
297 RestoreApplyOperationState::Completed,
298 Vec::new(),
299 updated_at,
300 )
301 }
302
303 pub(in crate::restore) fn mark_operation_failed_at(
305 &mut self,
306 sequence: usize,
307 reason: String,
308 updated_at: Option<String>,
309 ) -> Result<(), RestoreApplyJournalError> {
310 if reason.trim().is_empty() {
311 return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
312 }
313
314 self.transition_operation(
315 sequence,
316 RestoreApplyOperationState::Failed,
317 vec![reason],
318 updated_at,
319 )
320 }
321
322 fn transition_operation(
324 &mut self,
325 sequence: usize,
326 next_state: RestoreApplyOperationState,
327 blocking_reasons: Vec<String>,
328 updated_at: Option<String>,
329 ) -> Result<(), RestoreApplyJournalError> {
330 let index = self
331 .operations
332 .iter()
333 .position(|operation| operation.sequence == sequence)
334 .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
335 let operation = &self.operations[index];
336
337 if !operation.can_transition_to(&next_state) {
338 return Err(RestoreApplyJournalError::InvalidOperationTransition {
339 sequence,
340 from: operation.state.clone(),
341 to: next_state,
342 });
343 }
344
345 self.validate_operation_transition_order(operation, &next_state)?;
346
347 let operation = &mut self.operations[index];
348 operation.state = next_state;
349 operation.blocking_reasons = blocking_reasons;
350 operation.state_updated_at = updated_at;
351 self.refresh_operation_counts();
352 self.validate()
353 }
354
355 fn validate_operation_transition_order(
357 &self,
358 operation: &RestoreApplyJournalOperation,
359 next_state: &RestoreApplyOperationState,
360 ) -> Result<(), RestoreApplyJournalError> {
361 if operation.state == *next_state {
362 return Ok(());
363 }
364
365 let next_sequence = self
366 .next_transition_sequence()
367 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
368
369 if operation.sequence == next_sequence {
370 return Ok(());
371 }
372
373 Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
374 requested: operation.sequence,
375 next: next_sequence,
376 })
377 }
378
379 fn next_transition_sequence(&self) -> Option<usize> {
381 self.next_transition_operation()
382 .map(|operation| operation.sequence)
383 }
384
385 fn refresh_operation_counts(&mut self) {
387 let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
388 self.operation_count = self.operations.len();
389 self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
390 self.pending_operations = state_counts.pending;
391 self.ready_operations = state_counts.ready;
392 self.blocked_operations = state_counts.blocked;
393 self.completed_operations = state_counts.completed;
394 self.failed_operations = state_counts.failed;
395 }
396
397 pub(super) const fn is_complete(&self) -> bool {
399 self.operation_count > 0 && self.completed_operations == self.operation_count
400 }
401
402 pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
404 RestoreApplyOperationKindCounts::from_operations(&self.operations)
405 }
406
407 fn validate_operation_receipt_attempts(&self) -> Result<(), RestoreApplyJournalError> {
409 let mut attempts = BTreeSet::new();
410 for receipt in &self.operation_receipts {
411 if !attempts.insert((receipt.sequence, receipt.attempt)) {
412 return Err(RestoreApplyJournalError::DuplicateOperationReceiptAttempt {
413 sequence: receipt.sequence,
414 attempt: receipt.attempt,
415 });
416 }
417 }
418
419 Ok(())
420 }
421
422 pub(super) fn uploaded_snapshot_id_for_load(
424 &self,
425 load: &RestoreApplyJournalOperation,
426 ) -> Option<&str> {
427 self.operation_receipts
428 .iter()
429 .find(|receipt| {
430 receipt.matches_load_operation(load)
431 && self.operations.iter().any(|operation| {
432 operation.sequence == receipt.sequence
433 && operation.operation == RestoreApplyOperationKind::UploadSnapshot
434 && operation.state == RestoreApplyOperationState::Completed
435 })
436 })
437 .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
438 }
439}