1use super::{
2 RestoreApplyCommandConfig, RestoreApplyCommandOutputPair, RestoreApplyCommandPreview,
3 RestoreApplyJournal, RestoreApplyJournalError, RestoreApplyJournalOperation,
4 RestoreApplyJournalReport, RestoreApplyOperationKind, RestoreApplyOperationKindCounts,
5 RestoreApplyOperationReceipt, RestoreApplyOperationState, RestoreApplyPendingSummary,
6 RestoreApplyProgressSummary, RestoreApplyReportOperation, RestoreApplyReportOutcome,
7 RestoreApplyRunnerCommand,
8};
9use serde::Serialize;
10use std::{
11 fs,
12 io::{self, Write},
13 path::{Path, PathBuf},
14 process::Command as ProcessCommand,
15};
16use thiserror::Error as ThisError;
17
18const RESTORE_RUN_MODE_DRY_RUN: &str = "dry-run";
19const RESTORE_RUN_MODE_EXECUTE: &str = "execute";
20const RESTORE_RUN_MODE_UNCLAIM_PENDING: &str = "unclaim-pending";
21
22const RESTORE_RUN_STOPPED_BLOCKED: &str = "blocked";
23const RESTORE_RUN_STOPPED_COMMAND_FAILED: &str = "command-failed";
24const RESTORE_RUN_STOPPED_COMPLETE: &str = "complete";
25const RESTORE_RUN_STOPPED_MAX_STEPS: &str = "max-steps-reached";
26const RESTORE_RUN_STOPPED_PENDING: &str = "pending";
27const RESTORE_RUN_STOPPED_PREVIEW: &str = "preview";
28const RESTORE_RUN_STOPPED_READY: &str = "ready";
29const RESTORE_RUN_STOPPED_RECOVERED_PENDING: &str = "recovered-pending";
30
31const RESTORE_RUN_ACTION_DONE: &str = "done";
32const RESTORE_RUN_ACTION_FIX_BLOCKED: &str = "fix-blocked-journal";
33const RESTORE_RUN_ACTION_INSPECT_FAILED: &str = "inspect-failed-operation";
34const RESTORE_RUN_ACTION_RERUN: &str = "rerun";
35const RESTORE_RUN_ACTION_UNCLAIM_PENDING: &str = "unclaim-pending";
36
37pub const RESTORE_RUN_RECEIPT_COMPLETED: &str = "command-completed";
38pub const RESTORE_RUN_RECEIPT_FAILED: &str = "command-failed";
39pub const RESTORE_RUN_RECEIPT_RECOVERED_PENDING: &str = "pending-recovered";
40
41const RESTORE_RUN_EXECUTED_COMPLETED: &str = "completed";
42const RESTORE_RUN_EXECUTED_FAILED: &str = "failed";
43const RESTORE_RUN_RECEIPT_STATE_READY: &str = "ready";
44const RESTORE_RUN_COMMAND_EXIT_PREFIX: &str = "runner-command-exit";
45const RESTORE_RUN_RESPONSE_VERSION: u16 = 1;
46const RESTORE_RUN_OUTPUT_RECEIPT_LIMIT: usize = 64 * 1024;
47
48#[derive(Clone, Debug, Eq, PartialEq)]
53pub struct RestoreRunnerConfig {
54 pub journal: PathBuf,
55 pub command: RestoreApplyCommandConfig,
56 pub max_steps: Option<usize>,
57 pub updated_at: Option<String>,
58}
59
60#[derive(Debug, ThisError)]
65pub enum RestoreRunnerError {
66 #[error("restore run command failed for operation {sequence}: status={status}")]
67 CommandFailed { sequence: usize, status: String },
68
69 #[error("restore apply journal is locked: {lock_path}")]
70 JournalLocked { lock_path: String },
71
72 #[error(
73 "restore apply journal for backup {backup_id} has pending operations: pending={pending_operations}, next={next_transition_sequence:?}"
74 )]
75 Pending {
76 backup_id: String,
77 pending_operations: usize,
78 next_transition_sequence: Option<usize>,
79 },
80
81 #[error(
82 "restore apply journal for backup {backup_id} has failed operations: failed={failed_operations}"
83 )]
84 Failed {
85 backup_id: String,
86 failed_operations: usize,
87 },
88
89 #[error("restore apply journal for backup {backup_id} is not ready: reasons={reasons:?}")]
90 NotReady {
91 backup_id: String,
92 reasons: Vec<String>,
93 },
94
95 #[error(
96 "restore apply journal for backup {backup_id} has no executable command: operation_available={operation_available}, complete={complete}, blocked_reasons={blocked_reasons:?}"
97 )]
98 CommandUnavailable {
99 backup_id: String,
100 operation_available: bool,
101 complete: bool,
102 blocked_reasons: Vec<String>,
103 },
104
105 #[error(
106 "restore apply journal next operation changed before claim: expected={expected}, actual={actual:?}"
107 )]
108 ClaimSequenceMismatch {
109 expected: usize,
110 actual: Option<usize>,
111 },
112
113 #[error(transparent)]
114 Io(#[from] std::io::Error),
115
116 #[error(transparent)]
117 Json(#[from] serde_json::Error),
118
119 #[error(transparent)]
120 Journal(#[from] RestoreApplyJournalError),
121}
122
123#[derive(Clone, Debug, Serialize)]
128#[expect(
129 clippy::struct_excessive_bools,
130 reason = "Runner response exposes stable JSON status flags for operators and CI"
131)]
132pub struct RestoreRunResponse {
133 pub run_version: u16,
134 pub backup_id: String,
135 pub run_mode: &'static str,
136 pub dry_run: bool,
137 pub execute: bool,
138 pub unclaim_pending: bool,
139 pub stopped_reason: &'static str,
140 pub next_action: &'static str,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub requested_state_updated_at: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub max_steps_reached: Option<bool>,
145 #[serde(default, skip_serializing_if = "Vec::is_empty")]
146 pub executed_operations: Vec<RestoreRunExecutedOperation>,
147 #[serde(default, skip_serializing_if = "Vec::is_empty")]
148 pub operation_receipts: Vec<RestoreRunOperationReceipt>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub operation_receipt_count: Option<usize>,
151 pub operation_receipt_summary: RestoreRunReceiptSummary,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub executed_operation_count: Option<usize>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub recovered_operation: Option<RestoreApplyJournalOperation>,
156 pub batch_summary: RestoreRunBatchSummary,
157 pub ready: bool,
158 pub complete: bool,
159 pub attention_required: bool,
160 pub outcome: RestoreApplyReportOutcome,
161 pub operation_count: usize,
162 pub operation_counts: RestoreApplyOperationKindCounts,
163 pub operation_counts_supplied: bool,
164 pub progress: RestoreApplyProgressSummary,
165 pub pending_summary: RestoreApplyPendingSummary,
166 pub pending_operations: usize,
167 pub ready_operations: usize,
168 pub blocked_operations: usize,
169 pub completed_operations: usize,
170 pub failed_operations: usize,
171 pub blocked_reasons: Vec<String>,
172 pub next_transition: Option<RestoreApplyReportOperation>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub operation_available: Option<bool>,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub command_available: Option<bool>,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub command: Option<RestoreApplyRunnerCommand>,
179}
180
181impl RestoreRunResponse {
182 fn from_report(
184 backup_id: String,
185 report: RestoreApplyJournalReport,
186 mode: RestoreRunResponseMode,
187 ) -> Self {
188 Self {
189 run_version: RESTORE_RUN_RESPONSE_VERSION,
190 backup_id,
191 run_mode: mode.run_mode,
192 dry_run: mode.dry_run,
193 execute: mode.execute,
194 unclaim_pending: mode.unclaim_pending,
195 stopped_reason: mode.stopped_reason,
196 next_action: mode.next_action,
197 requested_state_updated_at: None,
198 max_steps_reached: None,
199 executed_operations: Vec::new(),
200 operation_receipts: Vec::new(),
201 operation_receipt_count: Some(0),
202 operation_receipt_summary: RestoreRunReceiptSummary::default(),
203 executed_operation_count: None,
204 recovered_operation: None,
205 batch_summary: RestoreRunBatchSummary::from_counts(
206 RestoreRunBatchStart::new(
207 None,
208 report.ready_operations,
209 report.progress.remaining_operations,
210 ),
211 0,
212 report.ready_operations,
213 report.progress.remaining_operations,
214 false,
215 report.complete,
216 ),
217 ready: report.ready,
218 complete: report.complete,
219 attention_required: report.attention_required,
220 outcome: report.outcome,
221 operation_count: report.operation_count,
222 operation_counts: report.operation_counts,
223 operation_counts_supplied: report.operation_counts_supplied,
224 progress: report.progress,
225 pending_summary: report.pending_summary,
226 pending_operations: report.pending_operations,
227 ready_operations: report.ready_operations,
228 blocked_operations: report.blocked_operations,
229 completed_operations: report.completed_operations,
230 failed_operations: report.failed_operations,
231 blocked_reasons: report.blocked_reasons,
232 next_transition: report.next_transition,
233 operation_available: None,
234 command_available: None,
235 command: None,
236 }
237 }
238
239 fn set_operation_receipts(&mut self, receipts: Vec<RestoreRunOperationReceipt>) {
241 self.operation_receipt_summary = RestoreRunReceiptSummary::from_receipts(&receipts);
242 self.operation_receipt_count = Some(receipts.len());
243 self.operation_receipts = receipts;
244 }
245
246 fn set_requested_state_updated_at(&mut self, updated_at: Option<&String>) {
248 self.requested_state_updated_at = updated_at.cloned();
249 }
250
251 const fn set_batch_summary(
253 &mut self,
254 batch_start: RestoreRunBatchStart,
255 executed_operations: usize,
256 stopped_by_max_steps: bool,
257 ) {
258 self.batch_summary = RestoreRunBatchSummary::from_counts(
259 batch_start,
260 executed_operations,
261 self.ready_operations,
262 self.progress.remaining_operations,
263 stopped_by_max_steps,
264 self.complete,
265 );
266 }
267}
268
269#[derive(Clone, Debug, Serialize)]
274pub struct RestoreRunBatchSummary {
275 pub requested_max_steps: Option<usize>,
276 pub initial_ready_operations: usize,
277 pub initial_remaining_operations: usize,
278 pub executed_operations: usize,
279 pub remaining_ready_operations: usize,
280 pub remaining_operations: usize,
281 pub ready_operations_delta: isize,
282 pub remaining_operations_delta: isize,
283 pub stopped_by_max_steps: bool,
284 pub complete: bool,
285}
286
287#[derive(Clone, Debug, Default, Serialize)]
292pub struct RestoreRunReceiptSummary {
293 pub total_receipts: usize,
294 pub command_completed: usize,
295 pub command_failed: usize,
296 pub pending_recovered: usize,
297}
298
299#[derive(Clone, Debug, Serialize)]
304pub struct RestoreRunOperationReceipt {
305 pub event: &'static str,
306 pub sequence: usize,
307 pub operation: RestoreApplyOperationKind,
308 pub target_canister: String,
309 pub state: &'static str,
310 #[serde(skip_serializing_if = "Option::is_none")]
311 pub updated_at: Option<String>,
312 #[serde(skip_serializing_if = "Option::is_none")]
313 pub command: Option<RestoreApplyRunnerCommand>,
314 #[serde(skip_serializing_if = "Option::is_none")]
315 pub status: Option<String>,
316}
317
318#[derive(Clone, Debug, Serialize)]
323pub struct RestoreRunExecutedOperation {
324 pub sequence: usize,
325 pub operation: RestoreApplyOperationKind,
326 pub target_canister: String,
327 pub command: RestoreApplyRunnerCommand,
328 pub status: String,
329 pub state: &'static str,
330}
331
332pub struct RestoreRunnerOutcome {
337 pub response: RestoreRunResponse,
338 pub error: Option<RestoreRunnerError>,
339}
340
341impl RestoreRunnerOutcome {
342 const fn ok(response: RestoreRunResponse) -> Self {
344 Self {
345 response,
346 error: None,
347 }
348 }
349}
350
351#[derive(Clone, Copy, Debug)]
356struct RestoreRunBatchStart {
357 requested_max_steps: Option<usize>,
358 initial_ready_operations: usize,
359 initial_remaining_operations: usize,
360}
361
362impl RestoreRunBatchStart {
363 const fn new(
365 requested_max_steps: Option<usize>,
366 initial_ready_operations: usize,
367 initial_remaining_operations: usize,
368 ) -> Self {
369 Self {
370 requested_max_steps,
371 initial_ready_operations,
372 initial_remaining_operations,
373 }
374 }
375}
376
377impl RestoreRunBatchSummary {
378 const fn from_counts(
380 batch_start: RestoreRunBatchStart,
381 executed_operations: usize,
382 remaining_ready_operations: usize,
383 remaining_operations: usize,
384 stopped_by_max_steps: bool,
385 complete: bool,
386 ) -> Self {
387 Self {
388 requested_max_steps: batch_start.requested_max_steps,
389 initial_ready_operations: batch_start.initial_ready_operations,
390 initial_remaining_operations: batch_start.initial_remaining_operations,
391 executed_operations,
392 remaining_ready_operations,
393 remaining_operations,
394 ready_operations_delta: remaining_ready_operations.cast_signed()
395 - batch_start.initial_ready_operations.cast_signed(),
396 remaining_operations_delta: remaining_operations.cast_signed()
397 - batch_start.initial_remaining_operations.cast_signed(),
398 stopped_by_max_steps,
399 complete,
400 }
401 }
402}
403
404impl RestoreRunReceiptSummary {
405 fn from_receipts(receipts: &[RestoreRunOperationReceipt]) -> Self {
407 let mut summary = Self {
408 total_receipts: receipts.len(),
409 ..Self::default()
410 };
411
412 for receipt in receipts {
413 match receipt.event {
414 RESTORE_RUN_RECEIPT_COMPLETED => summary.command_completed += 1,
415 RESTORE_RUN_RECEIPT_FAILED => summary.command_failed += 1,
416 RESTORE_RUN_RECEIPT_RECOVERED_PENDING => summary.pending_recovered += 1,
417 _ => {}
418 }
419 }
420
421 summary
422 }
423}
424
425impl RestoreRunOperationReceipt {
426 fn completed(
428 operation: RestoreApplyJournalOperation,
429 command: RestoreApplyRunnerCommand,
430 status: String,
431 updated_at: Option<String>,
432 ) -> Self {
433 Self::from_operation(
434 RESTORE_RUN_RECEIPT_COMPLETED,
435 operation,
436 RESTORE_RUN_EXECUTED_COMPLETED,
437 updated_at,
438 Some(command),
439 Some(status),
440 )
441 }
442
443 fn failed(
445 operation: RestoreApplyJournalOperation,
446 command: RestoreApplyRunnerCommand,
447 status: String,
448 updated_at: Option<String>,
449 ) -> Self {
450 Self::from_operation(
451 RESTORE_RUN_RECEIPT_FAILED,
452 operation,
453 RESTORE_RUN_EXECUTED_FAILED,
454 updated_at,
455 Some(command),
456 Some(status),
457 )
458 }
459
460 fn recovered_pending(
462 operation: RestoreApplyJournalOperation,
463 updated_at: Option<String>,
464 ) -> Self {
465 Self::from_operation(
466 RESTORE_RUN_RECEIPT_RECOVERED_PENDING,
467 operation,
468 RESTORE_RUN_RECEIPT_STATE_READY,
469 updated_at,
470 None,
471 None,
472 )
473 }
474
475 fn from_operation(
477 event: &'static str,
478 operation: RestoreApplyJournalOperation,
479 state: &'static str,
480 updated_at: Option<String>,
481 command: Option<RestoreApplyRunnerCommand>,
482 status: Option<String>,
483 ) -> Self {
484 Self {
485 event,
486 sequence: operation.sequence,
487 operation: operation.operation,
488 target_canister: operation.target_canister,
489 state,
490 updated_at,
491 command,
492 status,
493 }
494 }
495}
496
497impl RestoreRunExecutedOperation {
498 fn completed(
500 operation: RestoreApplyJournalOperation,
501 command: RestoreApplyRunnerCommand,
502 status: String,
503 ) -> Self {
504 Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_COMPLETED)
505 }
506
507 fn failed(
509 operation: RestoreApplyJournalOperation,
510 command: RestoreApplyRunnerCommand,
511 status: String,
512 ) -> Self {
513 Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_FAILED)
514 }
515
516 fn from_operation(
518 operation: RestoreApplyJournalOperation,
519 command: RestoreApplyRunnerCommand,
520 status: String,
521 state: &'static str,
522 ) -> Self {
523 Self {
524 sequence: operation.sequence,
525 operation: operation.operation,
526 target_canister: operation.target_canister,
527 command,
528 status,
529 state,
530 }
531 }
532}
533
534struct RestoreRunResponseMode {
539 run_mode: &'static str,
540 dry_run: bool,
541 execute: bool,
542 unclaim_pending: bool,
543 stopped_reason: &'static str,
544 next_action: &'static str,
545}
546
547impl RestoreRunResponseMode {
548 const fn new(
550 run_mode: &'static str,
551 dry_run: bool,
552 execute: bool,
553 unclaim_pending: bool,
554 stopped_reason: &'static str,
555 next_action: &'static str,
556 ) -> Self {
557 Self {
558 run_mode,
559 dry_run,
560 execute,
561 unclaim_pending,
562 stopped_reason,
563 next_action,
564 }
565 }
566
567 const fn dry_run(stopped_reason: &'static str, next_action: &'static str) -> Self {
569 Self::new(
570 RESTORE_RUN_MODE_DRY_RUN,
571 true,
572 false,
573 false,
574 stopped_reason,
575 next_action,
576 )
577 }
578
579 const fn execute(stopped_reason: &'static str, next_action: &'static str) -> Self {
581 Self::new(
582 RESTORE_RUN_MODE_EXECUTE,
583 false,
584 true,
585 false,
586 stopped_reason,
587 next_action,
588 )
589 }
590
591 const fn unclaim_pending(next_action: &'static str) -> Self {
593 Self::new(
594 RESTORE_RUN_MODE_UNCLAIM_PENDING,
595 false,
596 false,
597 true,
598 RESTORE_RUN_STOPPED_RECOVERED_PENDING,
599 next_action,
600 )
601 }
602}
603
604pub fn restore_run_dry_run(
606 config: &RestoreRunnerConfig,
607) -> Result<RestoreRunResponse, RestoreRunnerError> {
608 let journal = read_apply_journal_file(&config.journal)?;
609 let report = journal.report();
610 let initial_ready_operations = report.ready_operations;
611 let initial_remaining_operations = report.progress.remaining_operations;
612 let preview = journal.next_command_preview_with_config(&config.command);
613 let stopped_reason = restore_run_stopped_reason(&report, false, false);
614 let next_action = restore_run_next_action(&report, false);
615
616 let mut response = RestoreRunResponse::from_report(
617 journal.backup_id,
618 report,
619 RestoreRunResponseMode::dry_run(stopped_reason, next_action),
620 );
621 response.set_requested_state_updated_at(config.updated_at.as_ref());
622 response.set_batch_summary(
623 RestoreRunBatchStart::new(
624 config.max_steps,
625 initial_ready_operations,
626 initial_remaining_operations,
627 ),
628 0,
629 false,
630 );
631 response.operation_available = Some(preview.operation_available);
632 response.command_available = Some(preview.command_available);
633 response.command = preview.command;
634 Ok(response)
635}
636
637pub fn restore_run_unclaim_pending(
639 config: &RestoreRunnerConfig,
640) -> Result<RestoreRunResponse, RestoreRunnerError> {
641 let _lock = RestoreJournalLock::acquire(&config.journal)?;
642 let mut journal = read_apply_journal_file(&config.journal)?;
643 let initial_report = journal.report();
644 let initial_ready_operations = initial_report.ready_operations;
645 let initial_remaining_operations = initial_report.progress.remaining_operations;
646 let recovered_operation = journal
647 .next_transition_operation()
648 .filter(|operation| operation.state == RestoreApplyOperationState::Pending)
649 .cloned()
650 .ok_or(RestoreApplyJournalError::NoPendingOperation)?;
651
652 let recovered_updated_at = state_updated_at(config.updated_at.as_ref());
653 journal.mark_next_operation_ready_at(Some(recovered_updated_at.clone()))?;
654 write_apply_journal_file(&config.journal, &journal)?;
655
656 let report = journal.report();
657 let next_action = restore_run_next_action(&report, true);
658 let mut response = RestoreRunResponse::from_report(
659 journal.backup_id,
660 report,
661 RestoreRunResponseMode::unclaim_pending(next_action),
662 );
663 response.set_requested_state_updated_at(config.updated_at.as_ref());
664 response.set_batch_summary(
665 RestoreRunBatchStart::new(
666 config.max_steps,
667 initial_ready_operations,
668 initial_remaining_operations,
669 ),
670 0,
671 false,
672 );
673 response.set_operation_receipts(vec![RestoreRunOperationReceipt::recovered_pending(
674 recovered_operation.clone(),
675 Some(recovered_updated_at),
676 )]);
677 response.recovered_operation = Some(recovered_operation);
678 Ok(response)
679}
680
681pub fn restore_run_execute(
683 config: &RestoreRunnerConfig,
684) -> Result<RestoreRunResponse, RestoreRunnerError> {
685 let run = restore_run_execute_result(config)?;
686 if let Some(error) = run.error {
687 return Err(error);
688 }
689
690 Ok(run.response)
691}
692
693#[expect(
694 clippy::too_many_lines,
695 reason = "runner execution keeps claim, command, receipt, and journal commit steps together"
696)]
697pub fn restore_run_execute_result(
699 config: &RestoreRunnerConfig,
700) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
701 let _lock = RestoreJournalLock::acquire(&config.journal)?;
702 let mut journal = read_apply_journal_file(&config.journal)?;
703 let initial_report = journal.report();
704 let batch_start = RestoreRunBatchStart::new(
705 config.max_steps,
706 initial_report.ready_operations,
707 initial_report.progress.remaining_operations,
708 );
709 let mut executed_operations = Vec::new();
710 let mut operation_receipts = Vec::new();
711
712 loop {
713 let report = journal.report();
714 let max_steps_reached =
715 restore_run_max_steps_reached(config, executed_operations.len(), &report);
716 if report.complete || max_steps_reached {
717 return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
718 &journal,
719 executed_operations,
720 operation_receipts,
721 max_steps_reached,
722 config.updated_at.as_ref(),
723 batch_start,
724 )));
725 }
726
727 enforce_restore_run_executable(&journal, &report)?;
728 let preview = journal.next_command_preview_with_config(&config.command);
729 enforce_restore_run_command_available(&preview)?;
730
731 let operation = preview
732 .operation
733 .clone()
734 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
735 let command = preview
736 .command
737 .clone()
738 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
739 let sequence = operation.sequence;
740 let attempt = journal
741 .operation_receipts
742 .iter()
743 .filter(|receipt| receipt.sequence == sequence)
744 .count()
745 + 1;
746
747 enforce_apply_claim_sequence(sequence, &journal)?;
748 journal.mark_operation_pending_at(
749 sequence,
750 Some(state_updated_at(config.updated_at.as_ref())),
751 )?;
752 write_apply_journal_file(&config.journal, &journal)?;
753
754 let output = ProcessCommand::new(&command.program)
755 .args(&command.args)
756 .output()?;
757 let status_label = exit_status_label(output.status);
758 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
759 &output.stdout,
760 &output.stderr,
761 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
762 );
763 if output.status.success() {
764 let uploaded_snapshot_id =
765 parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout));
766 let completed_updated_at = state_updated_at(config.updated_at.as_ref());
767 journal.mark_operation_completed_at(sequence, Some(completed_updated_at.clone()))?;
768 if operation.operation != RestoreApplyOperationKind::UploadSnapshot
769 || uploaded_snapshot_id.is_some()
770 {
771 journal.record_operation_receipt(
772 RestoreApplyOperationReceipt::command_completed(
773 &operation,
774 command.clone(),
775 status_label.clone(),
776 Some(completed_updated_at.clone()),
777 output_pair.clone(),
778 attempt,
779 uploaded_snapshot_id,
780 ),
781 )?;
782 }
783 write_apply_journal_file(&config.journal, &journal)?;
784 executed_operations.push(RestoreRunExecutedOperation::completed(
785 operation.clone(),
786 command.clone(),
787 status_label.clone(),
788 ));
789 operation_receipts.push(RestoreRunOperationReceipt::completed(
790 operation,
791 command,
792 status_label,
793 Some(completed_updated_at),
794 ));
795 continue;
796 }
797
798 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
799 let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
800 journal.mark_operation_failed_at(
801 sequence,
802 failure_reason.clone(),
803 Some(failed_updated_at.clone()),
804 )?;
805 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
806 &operation,
807 command.clone(),
808 status_label.clone(),
809 Some(failed_updated_at.clone()),
810 output_pair,
811 attempt,
812 failure_reason,
813 ))?;
814 write_apply_journal_file(&config.journal, &journal)?;
815 executed_operations.push(RestoreRunExecutedOperation::failed(
816 operation.clone(),
817 command.clone(),
818 status_label.clone(),
819 ));
820 operation_receipts.push(RestoreRunOperationReceipt::failed(
821 operation,
822 command,
823 status_label.clone(),
824 Some(failed_updated_at),
825 ));
826 let response = restore_run_execute_summary(
827 &journal,
828 executed_operations,
829 operation_receipts,
830 false,
831 config.updated_at.as_ref(),
832 batch_start,
833 );
834 return Ok(RestoreRunnerOutcome {
835 response,
836 error: Some(RestoreRunnerError::CommandFailed {
837 sequence,
838 status: status_label,
839 }),
840 });
841 }
842}
843
844fn restore_run_max_steps_reached(
846 config: &RestoreRunnerConfig,
847 executed_operation_count: usize,
848 report: &RestoreApplyJournalReport,
849) -> bool {
850 config.max_steps == Some(executed_operation_count) && !report.complete
851}
852
853fn restore_run_execute_summary(
855 journal: &RestoreApplyJournal,
856 executed_operations: Vec<RestoreRunExecutedOperation>,
857 operation_receipts: Vec<RestoreRunOperationReceipt>,
858 max_steps_reached: bool,
859 requested_state_updated_at: Option<&String>,
860 batch_start: RestoreRunBatchStart,
861) -> RestoreRunResponse {
862 let report = journal.report();
863 let executed_operation_count = executed_operations.len();
864 let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
865 let next_action = restore_run_next_action(&report, false);
866
867 let mut response = RestoreRunResponse::from_report(
868 journal.backup_id.clone(),
869 report,
870 RestoreRunResponseMode::execute(stopped_reason, next_action),
871 );
872 response.set_requested_state_updated_at(requested_state_updated_at);
873 response.set_batch_summary(batch_start, executed_operation_count, max_steps_reached);
874 response.max_steps_reached = Some(max_steps_reached);
875 response.executed_operation_count = Some(executed_operation_count);
876 response.executed_operations = executed_operations;
877 response.set_operation_receipts(operation_receipts);
878 response
879}
880
881const fn restore_run_stopped_reason(
883 report: &RestoreApplyJournalReport,
884 max_steps_reached: bool,
885 executed: bool,
886) -> &'static str {
887 if report.complete {
888 return RESTORE_RUN_STOPPED_COMPLETE;
889 }
890 if report.failed_operations > 0 {
891 return RESTORE_RUN_STOPPED_COMMAND_FAILED;
892 }
893 if report.pending_operations > 0 {
894 return RESTORE_RUN_STOPPED_PENDING;
895 }
896 if !report.ready || report.blocked_operations > 0 {
897 return RESTORE_RUN_STOPPED_BLOCKED;
898 }
899 if max_steps_reached {
900 return RESTORE_RUN_STOPPED_MAX_STEPS;
901 }
902 if executed {
903 return RESTORE_RUN_STOPPED_READY;
904 }
905 RESTORE_RUN_STOPPED_PREVIEW
906}
907
908const fn restore_run_next_action(
910 report: &RestoreApplyJournalReport,
911 recovered_pending: bool,
912) -> &'static str {
913 if report.complete {
914 return RESTORE_RUN_ACTION_DONE;
915 }
916 if report.failed_operations > 0 {
917 return RESTORE_RUN_ACTION_INSPECT_FAILED;
918 }
919 if report.pending_operations > 0 {
920 return RESTORE_RUN_ACTION_UNCLAIM_PENDING;
921 }
922 if !report.ready || report.blocked_operations > 0 {
923 return RESTORE_RUN_ACTION_FIX_BLOCKED;
924 }
925 if recovered_pending {
926 return RESTORE_RUN_ACTION_RERUN;
927 }
928 RESTORE_RUN_ACTION_RERUN
929}
930
931fn enforce_restore_run_executable(
933 journal: &RestoreApplyJournal,
934 report: &RestoreApplyJournalReport,
935) -> Result<(), RestoreRunnerError> {
936 if report.pending_operations > 0 {
937 return Err(RestoreRunnerError::Pending {
938 backup_id: report.backup_id.clone(),
939 pending_operations: report.pending_operations,
940 next_transition_sequence: report
941 .next_transition
942 .as_ref()
943 .map(|operation| operation.sequence),
944 });
945 }
946
947 if report.failed_operations > 0 {
948 return Err(RestoreRunnerError::Failed {
949 backup_id: report.backup_id.clone(),
950 failed_operations: report.failed_operations,
951 });
952 }
953
954 if report.ready {
955 return Ok(());
956 }
957
958 Err(RestoreRunnerError::NotReady {
959 backup_id: journal.backup_id.clone(),
960 reasons: report.blocked_reasons.clone(),
961 })
962}
963
964fn enforce_restore_run_command_available(
966 preview: &RestoreApplyCommandPreview,
967) -> Result<(), RestoreRunnerError> {
968 if preview.command_available {
969 return Ok(());
970 }
971
972 Err(restore_command_unavailable_error(preview))
973}
974
975fn restore_command_unavailable_error(preview: &RestoreApplyCommandPreview) -> RestoreRunnerError {
977 RestoreRunnerError::CommandUnavailable {
978 backup_id: preview.backup_id.clone(),
979 operation_available: preview.operation_available,
980 complete: preview.complete,
981 blocked_reasons: preview.blocked_reasons.clone(),
982 }
983}
984
985fn exit_status_label(status: std::process::ExitStatus) -> String {
987 status
988 .code()
989 .map_or_else(|| "signal".to_string(), |code| code.to_string())
990}
991
992pub fn parse_uploaded_snapshot_id(output: &str) -> Option<String> {
994 output
995 .lines()
996 .filter_map(|line| line.split_once(':').map(|(_, value)| value.trim()))
997 .find(|value| !value.is_empty())
998 .map(str::to_string)
999}
1000
1001fn enforce_apply_claim_sequence(
1003 expected: usize,
1004 journal: &RestoreApplyJournal,
1005) -> Result<(), RestoreRunnerError> {
1006 let actual = journal
1007 .next_transition_operation()
1008 .map(|operation| operation.sequence);
1009
1010 if actual == Some(expected) {
1011 return Ok(());
1012 }
1013
1014 Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
1015}
1016
1017fn read_apply_journal_file(path: &Path) -> Result<RestoreApplyJournal, RestoreRunnerError> {
1019 let data = fs::read_to_string(path)?;
1020 let journal: RestoreApplyJournal = serde_json::from_str(&data)?;
1021 journal.validate()?;
1022 Ok(journal)
1023}
1024
1025fn state_updated_at(updated_at: Option<&String>) -> String {
1027 updated_at.cloned().unwrap_or_else(timestamp_placeholder)
1028}
1029
1030fn timestamp_placeholder() -> String {
1032 "unknown".to_string()
1033}
1034
1035fn write_apply_journal_file(
1037 path: &Path,
1038 journal: &RestoreApplyJournal,
1039) -> Result<(), RestoreRunnerError> {
1040 let data = serde_json::to_vec_pretty(journal)?;
1041 fs::write(path, data)?;
1042 Ok(())
1043}
1044
1045struct RestoreJournalLock {
1050 path: PathBuf,
1051}
1052
1053impl RestoreJournalLock {
1054 fn acquire(journal_path: &Path) -> Result<Self, RestoreRunnerError> {
1056 let path = journal_lock_path(journal_path);
1057 match fs::OpenOptions::new()
1058 .write(true)
1059 .create_new(true)
1060 .open(&path)
1061 {
1062 Ok(mut file) => {
1063 writeln!(file, "pid={}", std::process::id())?;
1064 Ok(Self { path })
1065 }
1066 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
1067 Err(RestoreRunnerError::JournalLocked {
1068 lock_path: path.to_string_lossy().to_string(),
1069 })
1070 }
1071 Err(error) => Err(error.into()),
1072 }
1073 }
1074}
1075
1076impl Drop for RestoreJournalLock {
1077 fn drop(&mut self) {
1079 let _ = fs::remove_file(&self.path);
1080 }
1081}
1082
1083fn journal_lock_path(path: &Path) -> PathBuf {
1085 let mut lock_path = path.as_os_str().to_os_string();
1086 lock_path.push(".lock");
1087 PathBuf::from(lock_path)
1088}