canic_backup/restore/apply/journal/
mod.rs1use super::{RestoreApplyDryRun, RestoreApplyDryRunOperation};
2use serde::{Deserialize, Serialize};
3
4mod commands;
5mod counts;
6mod receipts;
7mod reports;
8mod types;
9
10pub use commands::{
11 RestoreApplyCommandConfig, RestoreApplyCommandPreview, RestoreApplyRunnerCommand,
12};
13use counts::RestoreApplyJournalStateCounts;
14pub use counts::RestoreApplyOperationKindCounts;
15pub(in crate::restore) use receipts::RestoreApplyCommandOutputPair;
16pub use receipts::{
17 RestoreApplyCommandOutput, RestoreApplyOperationReceipt, RestoreApplyOperationReceiptOutcome,
18};
19pub(in crate::restore) use reports::RestoreApplyJournalReport;
20pub use reports::{
21 RestoreApplyPendingSummary, RestoreApplyProgressSummary, RestoreApplyReportOperation,
22 RestoreApplyReportOutcome,
23};
24pub use types::{
25 RestoreApplyJournalError, RestoreApplyJournalOperation, RestoreApplyOperationKind,
26 RestoreApplyOperationState,
27};
28use types::{
29 restore_apply_blocked_reasons, validate_apply_journal_count, validate_apply_journal_nonempty,
30 validate_apply_journal_sequences, validate_apply_journal_version,
31};
32
33#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
38pub struct RestoreApplyJournal {
39 pub journal_version: u16,
40 pub backup_id: String,
41 pub ready: bool,
42 pub blocked_reasons: Vec<String>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub backup_root: Option<String>,
45 pub operation_count: usize,
46 pub operation_counts: RestoreApplyOperationKindCounts,
47 pub pending_operations: usize,
48 pub ready_operations: usize,
49 pub blocked_operations: usize,
50 pub completed_operations: usize,
51 pub failed_operations: usize,
52 pub operations: Vec<RestoreApplyJournalOperation>,
53 #[serde(default, skip_serializing_if = "Vec::is_empty")]
54 pub operation_receipts: Vec<RestoreApplyOperationReceipt>,
55}
56
57impl RestoreApplyJournal {
58 #[must_use]
60 pub fn from_dry_run(dry_run: &RestoreApplyDryRun) -> Self {
61 let blocked_reasons = restore_apply_blocked_reasons(dry_run);
62 let initial_state = if blocked_reasons.is_empty() {
63 RestoreApplyOperationState::Ready
64 } else {
65 RestoreApplyOperationState::Blocked
66 };
67 let operations = dry_run
68 .operations
69 .iter()
70 .map(|operation| {
71 RestoreApplyJournalOperation::from_dry_run_operation(
72 operation,
73 initial_state.clone(),
74 &blocked_reasons,
75 )
76 })
77 .collect::<Vec<_>>();
78 let ready_operations = operations
79 .iter()
80 .filter(|operation| operation.state == RestoreApplyOperationState::Ready)
81 .count();
82 let blocked_operations = operations
83 .iter()
84 .filter(|operation| operation.state == RestoreApplyOperationState::Blocked)
85 .count();
86 let operation_counts = RestoreApplyOperationKindCounts::from_operations(&operations);
87
88 Self {
89 journal_version: 1,
90 backup_id: dry_run.backup_id.clone(),
91 ready: blocked_reasons.is_empty(),
92 blocked_reasons,
93 backup_root: dry_run
94 .artifact_validation
95 .as_ref()
96 .map(|validation| validation.backup_root.clone()),
97 operation_count: operations.len(),
98 operation_counts,
99 pending_operations: 0,
100 ready_operations,
101 blocked_operations,
102 completed_operations: 0,
103 failed_operations: 0,
104 operations,
105 operation_receipts: Vec::new(),
106 }
107 }
108
109 pub fn validate(&self) -> Result<(), RestoreApplyJournalError> {
111 validate_apply_journal_version(self.journal_version)?;
112 validate_apply_journal_nonempty("backup_id", &self.backup_id)?;
113 if let Some(backup_root) = &self.backup_root {
114 validate_apply_journal_nonempty("backup_root", backup_root)?;
115 }
116 validate_apply_journal_count(
117 "operation_count",
118 self.operation_count,
119 self.operations.len(),
120 )?;
121
122 let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
123 let operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
124 self.operation_counts.validate_matches(&operation_counts)?;
125 validate_apply_journal_count(
126 "pending_operations",
127 self.pending_operations,
128 state_counts.pending,
129 )?;
130 validate_apply_journal_count(
131 "ready_operations",
132 self.ready_operations,
133 state_counts.ready,
134 )?;
135 validate_apply_journal_count(
136 "blocked_operations",
137 self.blocked_operations,
138 state_counts.blocked,
139 )?;
140 validate_apply_journal_count(
141 "completed_operations",
142 self.completed_operations,
143 state_counts.completed,
144 )?;
145 validate_apply_journal_count(
146 "failed_operations",
147 self.failed_operations,
148 state_counts.failed,
149 )?;
150
151 if self.ready && (!self.blocked_reasons.is_empty() || self.blocked_operations > 0) {
152 return Err(RestoreApplyJournalError::ReadyJournalHasBlockingState);
153 }
154
155 validate_apply_journal_sequences(&self.operations)?;
156 for operation in &self.operations {
157 operation.validate()?;
158 }
159 for receipt in &self.operation_receipts {
160 receipt.validate_against(self)?;
161 }
162
163 Ok(())
164 }
165
166 #[must_use]
168 pub(in crate::restore) fn report(&self) -> RestoreApplyJournalReport {
169 RestoreApplyJournalReport::from_journal(self)
170 }
171
172 #[must_use]
174 pub(in crate::restore) fn next_transition_operation(
175 &self,
176 ) -> Option<&RestoreApplyJournalOperation> {
177 self.operations
178 .iter()
179 .filter(|operation| {
180 matches!(
181 operation.state,
182 RestoreApplyOperationState::Ready
183 | RestoreApplyOperationState::Pending
184 | RestoreApplyOperationState::Failed
185 )
186 })
187 .min_by_key(|operation| operation.sequence)
188 }
189
190 #[must_use]
192 pub fn next_command_preview(&self) -> RestoreApplyCommandPreview {
193 RestoreApplyCommandPreview::from_journal(self)
194 }
195
196 #[must_use]
198 pub(in crate::restore) fn next_command_preview_with_config(
199 &self,
200 config: &RestoreApplyCommandConfig,
201 ) -> RestoreApplyCommandPreview {
202 RestoreApplyCommandPreview::from_journal_with_config(self, config)
203 }
204
205 pub(in crate::restore) fn record_operation_receipt(
207 &mut self,
208 receipt: RestoreApplyOperationReceipt,
209 ) -> Result<(), RestoreApplyJournalError> {
210 self.operation_receipts.push(receipt);
211 if let Err(error) = self.validate() {
212 self.operation_receipts.pop();
213 return Err(error);
214 }
215
216 Ok(())
217 }
218
219 pub fn mark_next_operation_pending_at(
221 &mut self,
222 updated_at: Option<String>,
223 ) -> Result<(), RestoreApplyJournalError> {
224 let sequence = self
225 .next_transition_sequence()
226 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
227 self.mark_operation_pending_at(sequence, updated_at)
228 }
229
230 pub(in crate::restore) fn mark_operation_pending_at(
232 &mut self,
233 sequence: usize,
234 updated_at: Option<String>,
235 ) -> Result<(), RestoreApplyJournalError> {
236 self.transition_operation(
237 sequence,
238 RestoreApplyOperationState::Pending,
239 Vec::new(),
240 updated_at,
241 )
242 }
243
244 pub(in crate::restore) fn mark_next_operation_ready_at(
246 &mut self,
247 updated_at: Option<String>,
248 ) -> Result<(), RestoreApplyJournalError> {
249 let operation = self
250 .next_transition_operation()
251 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
252 if operation.state != RestoreApplyOperationState::Pending {
253 return Err(RestoreApplyJournalError::NoPendingOperation);
254 }
255
256 self.mark_operation_ready_at(operation.sequence, updated_at)
257 }
258
259 pub(in crate::restore) fn mark_operation_ready_at(
261 &mut self,
262 sequence: usize,
263 updated_at: Option<String>,
264 ) -> Result<(), RestoreApplyJournalError> {
265 self.transition_operation(
266 sequence,
267 RestoreApplyOperationState::Ready,
268 Vec::new(),
269 updated_at,
270 )
271 }
272
273 pub fn retry_failed_operation_at(
275 &mut self,
276 sequence: usize,
277 updated_at: Option<String>,
278 ) -> Result<(), RestoreApplyJournalError> {
279 self.transition_operation(
280 sequence,
281 RestoreApplyOperationState::Ready,
282 Vec::new(),
283 updated_at,
284 )
285 }
286
287 pub(in crate::restore) fn mark_operation_completed_at(
289 &mut self,
290 sequence: usize,
291 updated_at: Option<String>,
292 ) -> Result<(), RestoreApplyJournalError> {
293 self.transition_operation(
294 sequence,
295 RestoreApplyOperationState::Completed,
296 Vec::new(),
297 updated_at,
298 )
299 }
300
301 pub(in crate::restore) fn mark_operation_failed_at(
303 &mut self,
304 sequence: usize,
305 reason: String,
306 updated_at: Option<String>,
307 ) -> Result<(), RestoreApplyJournalError> {
308 if reason.trim().is_empty() {
309 return Err(RestoreApplyJournalError::FailureReasonRequired(sequence));
310 }
311
312 self.transition_operation(
313 sequence,
314 RestoreApplyOperationState::Failed,
315 vec![reason],
316 updated_at,
317 )
318 }
319
320 fn transition_operation(
322 &mut self,
323 sequence: usize,
324 next_state: RestoreApplyOperationState,
325 blocking_reasons: Vec<String>,
326 updated_at: Option<String>,
327 ) -> Result<(), RestoreApplyJournalError> {
328 let index = self
329 .operations
330 .iter()
331 .position(|operation| operation.sequence == sequence)
332 .ok_or(RestoreApplyJournalError::OperationNotFound(sequence))?;
333 let operation = &self.operations[index];
334
335 if !operation.can_transition_to(&next_state) {
336 return Err(RestoreApplyJournalError::InvalidOperationTransition {
337 sequence,
338 from: operation.state.clone(),
339 to: next_state,
340 });
341 }
342
343 self.validate_operation_transition_order(operation, &next_state)?;
344
345 let operation = &mut self.operations[index];
346 operation.state = next_state;
347 operation.blocking_reasons = blocking_reasons;
348 operation.state_updated_at = updated_at;
349 self.refresh_operation_counts();
350 self.validate()
351 }
352
353 fn validate_operation_transition_order(
355 &self,
356 operation: &RestoreApplyJournalOperation,
357 next_state: &RestoreApplyOperationState,
358 ) -> Result<(), RestoreApplyJournalError> {
359 if operation.state == *next_state {
360 return Ok(());
361 }
362
363 let next_sequence = self
364 .next_transition_sequence()
365 .ok_or(RestoreApplyJournalError::NoTransitionableOperation)?;
366
367 if operation.sequence == next_sequence {
368 return Ok(());
369 }
370
371 Err(RestoreApplyJournalError::OutOfOrderOperationTransition {
372 requested: operation.sequence,
373 next: next_sequence,
374 })
375 }
376
377 fn next_transition_sequence(&self) -> Option<usize> {
379 self.next_transition_operation()
380 .map(|operation| operation.sequence)
381 }
382
383 fn refresh_operation_counts(&mut self) {
385 let state_counts = RestoreApplyJournalStateCounts::from_operations(&self.operations);
386 self.operation_count = self.operations.len();
387 self.operation_counts = RestoreApplyOperationKindCounts::from_operations(&self.operations);
388 self.pending_operations = state_counts.pending;
389 self.ready_operations = state_counts.ready;
390 self.blocked_operations = state_counts.blocked;
391 self.completed_operations = state_counts.completed;
392 self.failed_operations = state_counts.failed;
393 }
394
395 pub(super) const fn is_complete(&self) -> bool {
397 self.operation_count > 0 && self.completed_operations == self.operation_count
398 }
399
400 pub(super) fn operation_kind_counts(&self) -> RestoreApplyOperationKindCounts {
402 RestoreApplyOperationKindCounts::from_operations(&self.operations)
403 }
404
405 pub(super) fn uploaded_snapshot_id_for_load(
407 &self,
408 load: &RestoreApplyJournalOperation,
409 ) -> Option<&str> {
410 self.operation_receipts
411 .iter()
412 .find(|receipt| {
413 receipt.matches_load_operation(load)
414 && self.operations.iter().any(|operation| {
415 operation.sequence == receipt.sequence
416 && operation.operation == RestoreApplyOperationKind::UploadSnapshot
417 && operation.state == RestoreApplyOperationState::Completed
418 })
419 })
420 .and_then(|receipt| receipt.uploaded_snapshot_id.as_deref())
421 }
422}