canic_backup/restore/apply/journal/
mod.rs1mod commands;
8mod counts;
9mod receipts;
10mod reports;
11mod types;
12
13pub use commands::{
14 RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
15};
16use counts::RestoreApplyJournalStateCounts;
17pub use counts::RestoreApplyOperationKindCounts;
18pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
19pub use receipts::{
20 RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
21};
22pub(in crate::restore) use reports::RestoreApplyJournalReport;
23pub use reports::{
24 RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
25 RestoreApplyReportOutcome,
26};
27pub use types::{
28 RestoreApplyJournalError, RestoreApplyJournalOperation, RestoreApplyOperationKind,
29 RestoreApplyOperationState,
30};
31
32use crate::restore::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
33
34use std::collections::BTreeSet;
35
36use serde::{Deserialize, Serialize};
37
38use types::{
39 restore_apply_blocked_reasons, validate_apply_journal_count, validate_apply_journal_nonempty,
40 validate_apply_journal_sequences, validate_apply_journal_version,
41};
42
43#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
51pub struct RestoreApplyJournal {
52 pub journal_version: u16,
53 pub backup_id: String,
54 pub ready: bool,
55 pub blocked_reasons: Vec<String>,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub backup_root: Option<String>,
58 pub operation_count: usize,
59 pub operation_counts: RestoreApplyOperationKindCounts,
60 pub pending_operations: usize,
61 pub ready_operations: usize,
62 pub blocked_operations: usize,
63 pub completed_operations: usize,
64 pub failed_operations: usize,
65 pub operations: Vec<RestoreApplyJournalOperation>,
66 #[serde(default, skip_serializing_if = "Vec::is_empty")]
67 pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
68}
69
70impl RestoreApplyJournal {
71 #[must_use]
73 pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
74 let blocked_reasons = restore_apply_blocked_reasons(dry_run);
75 let initial_state = if blocked_reasons.is_empty() {
76 RestoreApplyOperationState::Ready
77 } else {
78 RestoreApplyOperationState::Blocked
79 };
80 let operations = dry_run
81 .operations
82 .iter()
83 .map(|operation| {
84 RestoreApplyJournalOperation::from_dry_run_operation(
85 operation,
86 initial_state.clone(),
87 &blocked_reasons,
88 )
89 })
90 .collect::<Vec<_>>();
91 let ready_operations = operations
92 .iter()
93 .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
94 .count();
95 let blocked_operations = operations
96 .iter()
97 .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
98 .count();
99 let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
100
101 Self {
102 journal_version: 1,
103 backup_id: dry_run.backup_id.clone(),
104 ready: blocked_reasons.is_empty(),
105 blocked_reasons,
106 backup_root: dry_run
107 .artifact_validation
108 .as_ref()
109 .map(|validation| validation.backup_root.clone()),
110 operation_count: operations.len(),
111 operation_counts,
112 pending_operations: 0,
113 ready_operations,
114 blocked_operations,
115 completed_operations: 0,
116 failed_operations: 0,
117 operations,
118 operation_receipts: Vec::new(),
119 }
120 }
121
122 pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
124 validate_apply_journal_version(self.journal_version)?;
125 validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
126 if let Some(backup_root) = &self.backup_root {
127 validate_apply_journal_nonempty("backup_root", backup_root)?;
128 }
129 validate_apply_journal_count(
130 "operation_count",
131 self.operation_count,
132 self.operations.len(),
133 )?;
134
135 let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
136 let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
137 self.operation_counts.validate_matches(&operation_counts)?;
138 validate_apply_journal_count(
139 "pending_operations",
140 self.pending_operations,
141 state_counts.pending,
142 )?;
143 validate_apply_journal_count(
144 "ready_operations",
145 self.ready_operations,
146 state_counts.ready,
147 )?;
148 validate_apply_journal_count(
149 "blocked_operations",
150 self.blocked_operations,
151 state_counts.blocked,
152 )?;
153 validate_apply_journal_count(
154 "completed_operations",
155 self.completed_operations,
156 state_counts.completed,
157 )?;
158 validate_apply_journal_count(
159 "failed_operations",
160 self.failed_operations,
161 state_counts.failed,
162 )?;
163
164 if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
165 return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
166 }
167
168 validate_apply_journal_sequences(&self.operations)?;
169 for operation in &self.operations {
170 operation.validate()?;
171 }
172 self.validate_operation_receipt_attempts()?;
173 for receipt in &self.operation_receipts {
174 receipt.validate_against(self)?;
175 }
176
177 Ok(())
178 }
179
180 #[must_use]
182 pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
183 RestoreApplyJournalReport::from_journal(self)
184 }
185
186 #[must_use]
188 pub(in crate::restore) fn next_transition_operation(
189 &self,
190 ) -> Option<&RestoreApplyJournalOperation> {
191 self.operations
192 .iter()
193 .filter(|operation| {
194 matches!(
195 operation.state,
196 RestoreApplyOperationState::Ready
197 | RestoreApplyOperationState::Pending
198 | RestoreApplyOperationState::Failed
199 )
200 })
201 .min_by_key(|operation| operation.sequence)
202 }
203
204 #[must_use]
206 pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
207 RestoreApplyCommandPreview::from_journal(self)
208 }
209
210 #[must_use]
212 pub(in crate::restore) fn next_command_preview_with_config(
213 &self,
214 config: &RestoreApplyCommandConfig,
215 ) -> RestoreApplyCommandPreview {
216 RestoreApplyCommandPreview::from_journal_with_config(self, config)
217 }
218
219 pub(in crate::restore) fn record_operation_receipt(
221 &mut self,
222 receipt: RestoreApplyOperationReceipt,
223 ) -> Result<(), RestoreApplyJournalError> {
224 self.operation_receipts.push(receipt);
225 if let Err(error) = self.validate() {
226 self.operation_receipts.pop();
227 return Err(error);
228 }
229
230 Ok(())
231 }
232
233 pub fn mark_next_operation_pending_at(
235 &mut self,
236 updated_at: Option<String>,
237 ) -> Result<(), RestoreApplyJournalError> {
238 let sequence = self
239 .next_transition_sequence()
240 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
241 self.mark_operation_pending_at(sequence, updated_at)
242 }
243
244 pub(in crate::restore) fn mark_operation_pending_at(
246 &mut self,
247 sequence: usize,
248 updated_at: Option<String>,
249 ) -> Result<(), RestoreApplyJournalError> {
250 self.transition_operation(
251 sequence,
252 RestoreApplyOperationState::Pending,
253 Vec::new(),
254 updated_at,
255 )
256 }
257
258 pub(in crate::restore) fn mark_next_operation_ready_at(
260 &mut self,
261 updated_at: Option<String>,
262 ) -> Result<(), RestoreApplyJournalError> {
263 let operation = self
264 .next_transition_operation()
265 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
266 if operation.state != RestoreApplyOperationState::Pending {
267 return Err(RestoreApplyJournalError::NoPendingOperation);
268 }
269
270 self.mark_operation_ready_at(operation.sequence, updated_at)
271 }
272
273 pub(in crate::restore) fn mark_operation_ready_at(
275 &mut self,
276 sequence: usize,
277 updated_at: Option<String>,
278 ) -> Result<(), RestoreApplyJournalError> {
279 self.transition_operation(
280 sequence,
281 RestoreApplyOperationState::Ready,
282 Vec::new(),
283 updated_at,
284 )
285 }
286
287 pub fn retry_failed_operation_at(
289 &mut self,
290 sequence: usize,
291 updated_at: Option<String>,
292 ) -> Result<(), RestoreApplyJournalError> {
293 self.transition_operation(
294 sequence,
295 RestoreApplyOperationState::Ready,
296 Vec::new(),
297 updated_at,
298 )
299 }
300
301 pub(in crate::restore) fn mark_operation_completed_at(
303 &mut self,
304 sequence: usize,
305 updated_at: Option<String>,
306 ) -> Result<(), RestoreApplyJournalError> {
307 self.transition_operation(
308 sequence,
309 RestoreApplyOperationState::Completed,
310 Vec::new(),
311 updated_at,
312 )
313 }
314
315 pub(in crate::restore) fn mark_operation_failed_at(
317 &mut self,
318 sequence: usize,
319 reason: String,
320 updated_at: Option<String>,
321 ) -> Result<(), RestoreApplyJournalError> {
322 if reason.trim().is_empty() {
323 return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
324 }
325
326 self.transition_operation(
327 sequence,
328 RestoreApplyOperationState::Failed,
329 vec![reason],
330 updated_at,
331 )
332 }
333
334 fn transition_operation(
336 &mut self,
337 sequence: usize,
338 next_state: RestoreApplyOperationState,
339 blocking_reasons: Vec<String>,
340 updated_at: Option<String>,
341 ) -> Result<(), RestoreApplyJournalError> {
342 let index = self
343 .operations
344 .iter()
345 .position(|operation| operation.sequence == sequence)
346 .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
347 let operation = &self.operations[index];
348
349 if !operation.can_transition_to(&next_state) {
350 return Err(RestoreApplyJournalError::InvalidOperationTransition {
351 sequence,
352 from: operation.state.clone(),
353 to: next_state,
354 });
355 }
356
357 self.validate_operation_transition_order(operation, &next_state)?;
358
359 let operation = &mut self.operations[index];
360 operation.state = next_state;
361 operation.blocking_reasons = blocking_reasons;
362 operation.state_updated_at = updated_at;
363 self.refresh_operation_counts();
364 self.validate()
365 }
366
367 fn validate_operation_transition_order(
369 &self,
370 operation: &RestoreApplyJournalOperation,
371 next_state: &RestoreApplyOperationState,
372 ) -> Result<(), RestoreApplyJournalError> {
373 if operation.state == *next_state {
374 return Ok(());
375 }
376
377 let next_sequence = self
378 .next_transition_sequence()
379 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
380
381 if operation.sequence == next_sequence {
382 return Ok(());
383 }
384
385 Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
386 requested: operation.sequence,
387 next: next_sequence,
388 })
389 }
390
391 fn next_transition_sequence(&self) -> Option<usize> {
393 self.next_transition_operation()
394 .map(|operation| operation.sequence)
395 }
396
397 fn refresh_operation_counts(&mut self) {
399 let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
400 self.operation_count = self.operations.len();
401 self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
402 self.pending_operations = state_counts.pending;
403 self.ready_operations = state_counts.ready;
404 self.blocked_operations = state_counts.blocked;
405 self.completed_operations = state_counts.completed;
406 self.failed_operations = state_counts.failed;
407 }
408
409 pub(super) const fn is_complete(&self) -> bool {
411 self.operation_count > 0 && self.completed_operations == self.operation_count
412 }
413
414 pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
416 RestoreApplyOperationKindCounts::from_operations(&self.operations)
417 }
418
419 fn validate_operation_receipt_attempts(&self) -> Result<(), RestoreApplyJournalError> {
421 let mut attempts = BTreeSet::new();
422 for receipt in &self.operation_receipts {
423 if !attempts.insert((receipt.sequence, receipt.attempt)) {
424 return Err(RestoreApplyJournalError::DuplicateOperationReceiptAttempt {
425 sequence: receipt.sequence,
426 attempt: receipt.attempt,
427 });
428 }
429 }
430
431 Ok(())
432 }
433
434 pub(super) fn uploaded_snapshot_id_for_load(
436 &self,
437 load: &RestoreApplyJournalOperation,
438 ) -> Option<&str> {
439 self.operation_receipts
440 .iter()
441 .find(|receipt| {
442 receipt.matches_load_operation(load)
443 && self.operations.iter().any(|operation| {
444 operation.sequence == receipt.sequence
445 && operation.operation == RestoreApplyOperationKind::UploadSnapshot
446 && operation.state == RestoreApplyOperationState::Completed
447 })
448 })
449 .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
450 }
451}