1use super::{
2 RestoreApplyCommandConfig, RestoreApplyCommandOutputPair, RestoreApplyCommandPreview,
3 RestoreApplyJournal, RestoreApplyJournalError, RestoreApplyJournalOperation,
4 RestoreApplyJournalReport, RestoreApplyOperationKind, RestoreApplyOperationKindCounts,
5 RestoreApplyOperationReceipt, RestoreApplyOperationState, RestoreApplyPendingSummary,
6 RestoreApplyProgressSummary, RestoreApplyReportOperation, RestoreApplyReportOutcome,
7 RestoreApplyRunnerCommand,
8};
9use crate::timestamp::current_timestamp_marker;
10use serde::Serialize;
11use std::{
12 fs,
13 io::{self, Write},
14 path::{Path, PathBuf},
15 process::Command as ProcessCommand,
16};
17use thiserror::Error as ThisError;
18
19const RESTORE_RUN_MODE_DRY_RUN: &str = "dry-run";
20const RESTORE_RUN_MODE_EXECUTE: &str = "execute";
21const RESTORE_RUN_MODE_UNCLAIM_PENDING: &str = "unclaim-pending";
22
23const RESTORE_RUN_STOPPED_BLOCKED: &str = "blocked";
24const RESTORE_RUN_STOPPED_COMMAND_FAILED: &str = "command-failed";
25const RESTORE_RUN_STOPPED_COMPLETE: &str = "complete";
26const RESTORE_RUN_STOPPED_MAX_STEPS: &str = "max-steps-reached";
27const RESTORE_RUN_STOPPED_PENDING: &str = "pending";
28const RESTORE_RUN_STOPPED_PREVIEW: &str = "preview";
29const RESTORE_RUN_STOPPED_READY: &str = "ready";
30const RESTORE_RUN_STOPPED_RECOVERED_PENDING: &str = "recovered-pending";
31
32const RESTORE_RUN_ACTION_DONE: &str = "done";
33const RESTORE_RUN_ACTION_FIX_BLOCKED: &str = "fix-blocked-journal";
34const RESTORE_RUN_ACTION_INSPECT_FAILED: &str = "inspect-failed-operation";
35const RESTORE_RUN_ACTION_RERUN: &str = "rerun";
36const RESTORE_RUN_ACTION_UNCLAIM_PENDING: &str = "unclaim-pending";
37
38pub const RESTORE_RUN_RECEIPT_COMPLETED: &str = "command-completed";
39pub const RESTORE_RUN_RECEIPT_FAILED: &str = "command-failed";
40pub const RESTORE_RUN_RECEIPT_RECOVERED_PENDING: &str = "pending-recovered";
41
42const RESTORE_RUN_EXECUTED_COMPLETED: &str = "completed";
43const RESTORE_RUN_EXECUTED_FAILED: &str = "failed";
44const RESTORE_RUN_RECEIPT_STATE_READY: &str = "ready";
45const RESTORE_RUN_COMMAND_EXIT_PREFIX: &str = "runner-command-exit";
46const RESTORE_RUN_STOPPED_PRECONDITION_FAILED: &str = "stopped-precondition-failed";
47const RESTORE_RUN_RESPONSE_VERSION: u16 = 1;
48const RESTORE_RUN_OUTPUT_RECEIPT_LIMIT: usize = 64 * 1024;
49
50#[derive(Clone, Debug, Eq, PartialEq)]
55pub struct RestoreRunnerConfig {
56 pub journal: PathBuf,
57 pub command: RestoreApplyCommandConfig,
58 pub max_steps: Option<usize>,
59 pub updated_at: Option<String>,
60}
61
62#[derive(Debug, ThisError)]
67pub enum RestoreRunnerError {
68 #[error("restore run command failed for operation {sequence}: status={status}")]
69 CommandFailed { sequence: usize, status: String },
70
71 #[error("restore apply journal is locked: {lock_path}")]
72 JournalLocked { lock_path: String },
73
74 #[error(
75 "restore apply journal for backup {backup_id} has pending operations: pending={pending_operations}, next={next_transition_sequence:?}"
76 )]
77 Pending {
78 backup_id: String,
79 pending_operations: usize,
80 next_transition_sequence: Option<usize>,
81 },
82
83 #[error(
84 "restore apply journal for backup {backup_id} has failed operations: failed={failed_operations}"
85 )]
86 Failed {
87 backup_id: String,
88 failed_operations: usize,
89 },
90
91 #[error("restore apply journal for backup {backup_id} is not ready: reasons={reasons:?}")]
92 NotReady {
93 backup_id: String,
94 reasons: Vec<String>,
95 },
96
97 #[error(
98 "restore apply journal for backup {backup_id} has no executable command: operation_available={operation_available}, complete={complete}, blocked_reasons={blocked_reasons:?}"
99 )]
100 CommandUnavailable {
101 backup_id: String,
102 operation_available: bool,
103 complete: bool,
104 blocked_reasons: Vec<String>,
105 },
106
107 #[error(
108 "restore apply journal next operation changed before claim: expected={expected}, actual={actual:?}"
109 )]
110 ClaimSequenceMismatch {
111 expected: usize,
112 actual: Option<usize>,
113 },
114
115 #[error(transparent)]
116 Io(#[from] std::io::Error),
117
118 #[error(transparent)]
119 Json(#[from] serde_json::Error),
120
121 #[error(transparent)]
122 Journal(#[from] RestoreApplyJournalError),
123}
124
125#[derive(Clone, Debug, Serialize)]
130#[expect(
131 clippy::struct_excessive_bools,
132 reason = "Runner response exposes stable JSON status flags for operators and CI"
133)]
134pub struct RestoreRunResponse {
135 pub run_version: u16,
136 pub backup_id: String,
137 pub run_mode: &'static str,
138 pub dry_run: bool,
139 pub execute: bool,
140 pub unclaim_pending: bool,
141 pub stopped_reason: &'static str,
142 pub next_action: &'static str,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub requested_state_updated_at: Option<String>,
145 #[serde(skip_serializing_if = "Option::is_none")]
146 pub max_steps_reached: Option<bool>,
147 #[serde(default, skip_serializing_if = "Vec::is_empty")]
148 pub executed_operations: Vec<RestoreRunExecutedOperation>,
149 #[serde(default, skip_serializing_if = "Vec::is_empty")]
150 pub operation_receipts: Vec<RestoreRunOperationReceipt>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub operation_receipt_count: Option<usize>,
153 pub operation_receipt_summary: RestoreRunReceiptSummary,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub executed_operation_count: Option<usize>,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub recovered_operation: Option<RestoreApplyJournalOperation>,
158 pub ready: bool,
159 pub complete: bool,
160 pub attention_required: bool,
161 pub outcome: RestoreApplyReportOutcome,
162 pub operation_count: usize,
163 pub operation_counts: RestoreApplyOperationKindCounts,
164 pub progress: RestoreApplyProgressSummary,
165 pub pending_summary: RestoreApplyPendingSummary,
166 pub pending_operations: usize,
167 pub ready_operations: usize,
168 pub blocked_operations: usize,
169 pub completed_operations: usize,
170 pub failed_operations: usize,
171 pub blocked_reasons: Vec<String>,
172 pub next_transition: Option<RestoreApplyReportOperation>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub operation_available: Option<bool>,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub command_available: Option<bool>,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub command: Option<RestoreApplyRunnerCommand>,
179}
180
181impl RestoreRunResponse {
182 fn from_report(
184 backup_id: String,
185 report: RestoreApplyJournalReport,
186 mode: RestoreRunResponseMode,
187 ) -> Self {
188 Self {
189 run_version: RESTORE_RUN_RESPONSE_VERSION,
190 backup_id,
191 run_mode: mode.run_mode,
192 dry_run: mode.dry_run,
193 execute: mode.execute,
194 unclaim_pending: mode.unclaim_pending,
195 stopped_reason: mode.stopped_reason,
196 next_action: mode.next_action,
197 requested_state_updated_at: None,
198 max_steps_reached: None,
199 executed_operations: Vec::new(),
200 operation_receipts: Vec::new(),
201 operation_receipt_count: Some(0),
202 operation_receipt_summary: RestoreRunReceiptSummary::default(),
203 executed_operation_count: None,
204 recovered_operation: None,
205 ready: report.ready,
206 complete: report.complete,
207 attention_required: report.attention_required,
208 outcome: report.outcome,
209 operation_count: report.operation_count,
210 operation_counts: report.operation_counts,
211 progress: report.progress,
212 pending_summary: report.pending_summary,
213 pending_operations: report.pending_operations,
214 ready_operations: report.ready_operations,
215 blocked_operations: report.blocked_operations,
216 completed_operations: report.completed_operations,
217 failed_operations: report.failed_operations,
218 blocked_reasons: report.blocked_reasons,
219 next_transition: report.next_transition,
220 operation_available: None,
221 command_available: None,
222 command: None,
223 }
224 }
225
226 fn set_operation_receipts(&mut self, receipts: Vec<RestoreRunOperationReceipt>) {
228 self.operation_receipt_summary = RestoreRunReceiptSummary::from_receipts(&receipts);
229 self.operation_receipt_count = Some(receipts.len());
230 self.operation_receipts = receipts;
231 }
232
233 fn set_requested_state_updated_at(&mut self, updated_at: Option<&String>) {
235 self.requested_state_updated_at = updated_at.cloned();
236 }
237}
238
239#[derive(Clone, Debug, Default, Serialize)]
244pub struct RestoreRunReceiptSummary {
245 pub total_receipts: usize,
246 pub command_completed: usize,
247 pub command_failed: usize,
248 pub pending_recovered: usize,
249}
250
251#[derive(Clone, Debug, Serialize)]
256pub struct RestoreRunOperationReceipt {
257 pub event: &'static str,
258 pub sequence: usize,
259 pub operation: RestoreApplyOperationKind,
260 pub target_canister: String,
261 pub state: &'static str,
262 #[serde(skip_serializing_if = "Option::is_none")]
263 pub updated_at: Option<String>,
264 #[serde(skip_serializing_if = "Option::is_none")]
265 pub command: Option<RestoreApplyRunnerCommand>,
266 #[serde(skip_serializing_if = "Option::is_none")]
267 pub status: Option<String>,
268}
269
270#[derive(Clone, Debug, Serialize)]
275pub struct RestoreRunExecutedOperation {
276 pub sequence: usize,
277 pub operation: RestoreApplyOperationKind,
278 pub target_canister: String,
279 pub command: RestoreApplyRunnerCommand,
280 pub status: String,
281 pub state: &'static str,
282}
283
284pub struct RestoreRunnerOutcome {
289 pub response: RestoreRunResponse,
290 pub error: Option<RestoreRunnerError>,
291}
292
293impl RestoreRunnerOutcome {
294 const fn ok(response: RestoreRunResponse) -> Self {
296 Self {
297 response,
298 error: None,
299 }
300 }
301}
302
303struct RestoreStoppedPreconditionFailure {
308 command: RestoreApplyRunnerCommand,
309 status_label: String,
310 output: RestoreApplyCommandOutputPair,
311 failure_reason: String,
312}
313
314impl RestoreRunReceiptSummary {
315 fn from_receipts(receipts: &[RestoreRunOperationReceipt]) -> Self {
317 let mut summary = Self {
318 total_receipts: receipts.len(),
319 ..Self::default()
320 };
321
322 for receipt in receipts {
323 match receipt.event {
324 RESTORE_RUN_RECEIPT_COMPLETED => summary.command_completed += 1,
325 RESTORE_RUN_RECEIPT_FAILED => summary.command_failed += 1,
326 RESTORE_RUN_RECEIPT_RECOVERED_PENDING => summary.pending_recovered += 1,
327 _ => {}
328 }
329 }
330
331 summary
332 }
333}
334
335impl RestoreRunOperationReceipt {
336 fn completed(
338 operation: RestoreApplyJournalOperation,
339 command: RestoreApplyRunnerCommand,
340 status: String,
341 updated_at: Option<String>,
342 ) -> Self {
343 Self::from_operation(
344 RESTORE_RUN_RECEIPT_COMPLETED,
345 operation,
346 RESTORE_RUN_EXECUTED_COMPLETED,
347 updated_at,
348 Some(command),
349 Some(status),
350 )
351 }
352
353 fn failed(
355 operation: RestoreApplyJournalOperation,
356 command: RestoreApplyRunnerCommand,
357 status: String,
358 updated_at: Option<String>,
359 ) -> Self {
360 Self::from_operation(
361 RESTORE_RUN_RECEIPT_FAILED,
362 operation,
363 RESTORE_RUN_EXECUTED_FAILED,
364 updated_at,
365 Some(command),
366 Some(status),
367 )
368 }
369
370 fn recovered_pending(
372 operation: RestoreApplyJournalOperation,
373 updated_at: Option<String>,
374 ) -> Self {
375 Self::from_operation(
376 RESTORE_RUN_RECEIPT_RECOVERED_PENDING,
377 operation,
378 RESTORE_RUN_RECEIPT_STATE_READY,
379 updated_at,
380 None,
381 None,
382 )
383 }
384
385 fn from_operation(
387 event: &'static str,
388 operation: RestoreApplyJournalOperation,
389 state: &'static str,
390 updated_at: Option<String>,
391 command: Option<RestoreApplyRunnerCommand>,
392 status: Option<String>,
393 ) -> Self {
394 Self {
395 event,
396 sequence: operation.sequence,
397 operation: operation.operation,
398 target_canister: operation.target_canister,
399 state,
400 updated_at,
401 command,
402 status,
403 }
404 }
405}
406
407impl RestoreRunExecutedOperation {
408 fn completed(
410 operation: RestoreApplyJournalOperation,
411 command: RestoreApplyRunnerCommand,
412 status: String,
413 ) -> Self {
414 Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_COMPLETED)
415 }
416
417 fn failed(
419 operation: RestoreApplyJournalOperation,
420 command: RestoreApplyRunnerCommand,
421 status: String,
422 ) -> Self {
423 Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_FAILED)
424 }
425
426 fn from_operation(
428 operation: RestoreApplyJournalOperation,
429 command: RestoreApplyRunnerCommand,
430 status: String,
431 state: &'static str,
432 ) -> Self {
433 Self {
434 sequence: operation.sequence,
435 operation: operation.operation,
436 target_canister: operation.target_canister,
437 command,
438 status,
439 state,
440 }
441 }
442}
443
444struct RestoreRunResponseMode {
449 run_mode: &'static str,
450 dry_run: bool,
451 execute: bool,
452 unclaim_pending: bool,
453 stopped_reason: &'static str,
454 next_action: &'static str,
455}
456
457impl RestoreRunResponseMode {
458 const fn new(
460 run_mode: &'static str,
461 dry_run: bool,
462 execute: bool,
463 unclaim_pending: bool,
464 stopped_reason: &'static str,
465 next_action: &'static str,
466 ) -> Self {
467 Self {
468 run_mode,
469 dry_run,
470 execute,
471 unclaim_pending,
472 stopped_reason,
473 next_action,
474 }
475 }
476
477 const fn dry_run(stopped_reason: &'static str, next_action: &'static str) -> Self {
479 Self::new(
480 RESTORE_RUN_MODE_DRY_RUN,
481 true,
482 false,
483 false,
484 stopped_reason,
485 next_action,
486 )
487 }
488
489 const fn execute(stopped_reason: &'static str, next_action: &'static str) -> Self {
491 Self::new(
492 RESTORE_RUN_MODE_EXECUTE,
493 false,
494 true,
495 false,
496 stopped_reason,
497 next_action,
498 )
499 }
500
501 const fn unclaim_pending(next_action: &'static str) -> Self {
503 Self::new(
504 RESTORE_RUN_MODE_UNCLAIM_PENDING,
505 false,
506 false,
507 true,
508 RESTORE_RUN_STOPPED_RECOVERED_PENDING,
509 next_action,
510 )
511 }
512}
513
514struct RestoreRunPreparedOperation {
519 operation: RestoreApplyJournalOperation,
520 command: RestoreApplyRunnerCommand,
521 sequence: usize,
522 attempt: usize,
523}
524
525enum RestoreRunStepOutcome {
530 Completed {
531 executed_operation: RestoreRunExecutedOperation,
532 operation_receipt: RestoreRunOperationReceipt,
533 },
534 Failed {
535 executed_operation: RestoreRunExecutedOperation,
536 operation_receipt: RestoreRunOperationReceipt,
537 status: String,
538 },
539}
540
541pub fn restore_run_dry_run(
543 config: &RestoreRunnerConfig,
544) -> Result<RestoreRunResponse, RestoreRunnerError> {
545 let journal = read_apply_journal_file(&config.journal)?;
546 let report = journal.report();
547 let preview = journal.next_command_preview_with_config(&config.command);
548 let stopped_reason = restore_run_stopped_reason(&report, false, false);
549 let next_action = restore_run_next_action(&report, false);
550
551 let mut response = RestoreRunResponse::from_report(
552 journal.backup_id,
553 report,
554 RestoreRunResponseMode::dry_run(stopped_reason, next_action),
555 );
556 response.set_requested_state_updated_at(config.updated_at.as_ref());
557 response.operation_available = Some(preview.operation_available);
558 response.command_available = Some(preview.command_available);
559 response.command = preview.command;
560 Ok(response)
561}
562
563pub fn restore_run_unclaim_pending(
565 config: &RestoreRunnerConfig,
566) -> Result<RestoreRunResponse, RestoreRunnerError> {
567 let _lock = RestoreJournalLock::acquire(&config.journal)?;
568 let mut journal = read_apply_journal_file(&config.journal)?;
569 let recovered_operation = journal
570 .next_transition_operation()
571 .filter(|operation| operation.state == RestoreApplyOperationState::Pending)
572 .cloned()
573 .ok_or(RestoreApplyJournalError::NoPendingOperation)?;
574
575 let recovered_updated_at = state_updated_at(config.updated_at.as_ref());
576 journal.mark_next_operation_ready_at(Some(recovered_updated_at.clone()))?;
577 write_apply_journal_file(&config.journal, &journal)?;
578
579 let report = journal.report();
580 let next_action = restore_run_next_action(&report, true);
581 let mut response = RestoreRunResponse::from_report(
582 journal.backup_id,
583 report,
584 RestoreRunResponseMode::unclaim_pending(next_action),
585 );
586 response.set_requested_state_updated_at(config.updated_at.as_ref());
587 response.set_operation_receipts(vec![RestoreRunOperationReceipt::recovered_pending(
588 recovered_operation.clone(),
589 Some(recovered_updated_at),
590 )]);
591 response.recovered_operation = Some(recovered_operation);
592 Ok(response)
593}
594
595pub fn restore_run_execute(
597 config: &RestoreRunnerConfig,
598) -> Result<RestoreRunResponse, RestoreRunnerError> {
599 let run = restore_run_execute_result(config)?;
600 if let Some(error) = run.error {
601 return Err(error);
602 }
603
604 Ok(run.response)
605}
606
607pub fn restore_run_execute_result(
609 config: &RestoreRunnerConfig,
610) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
611 let _lock = RestoreJournalLock::acquire(&config.journal)?;
612 let mut journal = read_apply_journal_file(&config.journal)?;
613 let mut executed_operations = Vec::new();
614 let mut operation_receipts = Vec::new();
615
616 loop {
617 let report = journal.report();
618 let max_steps_reached =
619 restore_run_max_steps_reached(config, executed_operations.len(), &report);
620 if report.complete || max_steps_reached {
621 return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
622 &journal,
623 executed_operations,
624 operation_receipts,
625 max_steps_reached,
626 config.updated_at.as_ref(),
627 )));
628 }
629
630 enforce_restore_run_executable(&journal, &report)?;
631 let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
632 let sequence = prepared.sequence;
633 match restore_run_execute_prepared_operation(config, &mut journal, prepared)? {
634 RestoreRunStepOutcome::Completed {
635 executed_operation,
636 operation_receipt,
637 } => {
638 executed_operations.push(executed_operation);
639 operation_receipts.push(operation_receipt);
640 }
641 RestoreRunStepOutcome::Failed {
642 executed_operation,
643 operation_receipt,
644 status,
645 } => {
646 executed_operations.push(executed_operation);
647 operation_receipts.push(operation_receipt);
648 let response = restore_run_execute_summary(
649 &journal,
650 executed_operations,
651 operation_receipts,
652 false,
653 config.updated_at.as_ref(),
654 );
655 return Ok(RestoreRunnerOutcome {
656 response,
657 error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
658 });
659 }
660 }
661 }
662}
663
664fn restore_run_prepare_next_operation(
666 config: &RestoreRunnerConfig,
667 journal: &mut RestoreApplyJournal,
668) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
669 let preview = journal.next_command_preview_with_config(&config.command);
670 enforce_restore_run_command_available(&preview)?;
671
672 let operation = preview
673 .operation
674 .clone()
675 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
676 let command = preview
677 .command
678 .clone()
679 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
680 let sequence = operation.sequence;
681 let attempt = journal
682 .operation_receipts
683 .iter()
684 .filter(|receipt| receipt.sequence == sequence)
685 .count()
686 + 1;
687
688 enforce_apply_claim_sequence(sequence, journal)?;
689 journal
690 .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
691 write_apply_journal_file(&config.journal, journal)?;
692
693 Ok(RestoreRunPreparedOperation {
694 operation,
695 command,
696 sequence,
697 attempt,
698 })
699}
700
701fn restore_run_execute_prepared_operation(
703 config: &RestoreRunnerConfig,
704 journal: &mut RestoreApplyJournal,
705 prepared: RestoreRunPreparedOperation,
706) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
707 if prepared.command.requires_stopped_canister
708 && let Some(outcome) = enforce_stopped_canister_precondition(
709 config,
710 &prepared.operation,
711 prepared.attempt,
712 config.updated_at.as_ref(),
713 )?
714 {
715 return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
716 }
717
718 let output = ProcessCommand::new(&prepared.command.program)
719 .args(&prepared.command.args)
720 .output()?;
721 let status_label = exit_status_label(output.status);
722 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
723 &output.stdout,
724 &output.stderr,
725 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
726 );
727
728 if output.status.success() {
729 let uploaded_snapshot_id =
730 parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout));
731 return restore_run_commit_command_success(
732 config,
733 journal,
734 prepared,
735 status_label,
736 output_pair,
737 uploaded_snapshot_id,
738 );
739 }
740
741 restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
742}
743
744fn restore_run_commit_precondition_failure(
746 config: &RestoreRunnerConfig,
747 journal: &mut RestoreApplyJournal,
748 prepared: RestoreRunPreparedOperation,
749 outcome: RestoreStoppedPreconditionFailure,
750) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
751 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
752 journal.mark_operation_failed_at(
753 prepared.sequence,
754 outcome.failure_reason.clone(),
755 Some(failed_updated_at.clone()),
756 )?;
757 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
758 &prepared.operation,
759 outcome.command.clone(),
760 outcome.status_label.clone(),
761 Some(failed_updated_at.clone()),
762 outcome.output,
763 prepared.attempt,
764 outcome.failure_reason,
765 ))?;
766 write_apply_journal_file(&config.journal, journal)?;
767
768 Ok(RestoreRunStepOutcome::Failed {
769 executed_operation: RestoreRunExecutedOperation::failed(
770 prepared.operation.clone(),
771 outcome.command.clone(),
772 outcome.status_label.clone(),
773 ),
774 operation_receipt: RestoreRunOperationReceipt::failed(
775 prepared.operation,
776 outcome.command,
777 outcome.status_label,
778 Some(failed_updated_at),
779 ),
780 status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
781 })
782}
783
784fn restore_run_commit_command_success(
786 config: &RestoreRunnerConfig,
787 journal: &mut RestoreApplyJournal,
788 prepared: RestoreRunPreparedOperation,
789 status_label: String,
790 output_pair: RestoreApplyCommandOutputPair,
791 uploaded_snapshot_id: Option<String>,
792) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
793 let completed_updated_at = state_updated_at(config.updated_at.as_ref());
794 journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
795 if prepared.operation.operation != RestoreApplyOperationKind::UploadSnapshot
796 || uploaded_snapshot_id.is_some()
797 {
798 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
799 &prepared.operation,
800 prepared.command.clone(),
801 status_label.clone(),
802 Some(completed_updated_at.clone()),
803 output_pair,
804 prepared.attempt,
805 uploaded_snapshot_id,
806 ))?;
807 }
808 write_apply_journal_file(&config.journal, journal)?;
809
810 Ok(RestoreRunStepOutcome::Completed {
811 executed_operation: RestoreRunExecutedOperation::completed(
812 prepared.operation.clone(),
813 prepared.command.clone(),
814 status_label.clone(),
815 ),
816 operation_receipt: RestoreRunOperationReceipt::completed(
817 prepared.operation,
818 prepared.command,
819 status_label,
820 Some(completed_updated_at),
821 ),
822 })
823}
824
825fn restore_run_commit_command_failure(
827 config: &RestoreRunnerConfig,
828 journal: &mut RestoreApplyJournal,
829 prepared: RestoreRunPreparedOperation,
830 status_label: String,
831 output_pair: RestoreApplyCommandOutputPair,
832) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
833 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
834 let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
835 journal.mark_operation_failed_at(
836 prepared.sequence,
837 failure_reason.clone(),
838 Some(failed_updated_at.clone()),
839 )?;
840 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
841 &prepared.operation,
842 prepared.command.clone(),
843 status_label.clone(),
844 Some(failed_updated_at.clone()),
845 output_pair,
846 prepared.attempt,
847 failure_reason,
848 ))?;
849 write_apply_journal_file(&config.journal, journal)?;
850
851 Ok(RestoreRunStepOutcome::Failed {
852 executed_operation: RestoreRunExecutedOperation::failed(
853 prepared.operation.clone(),
854 prepared.command.clone(),
855 status_label.clone(),
856 ),
857 operation_receipt: RestoreRunOperationReceipt::failed(
858 prepared.operation,
859 prepared.command,
860 status_label.clone(),
861 Some(failed_updated_at),
862 ),
863 status: status_label,
864 })
865}
866
867fn enforce_stopped_canister_precondition(
869 config: &RestoreRunnerConfig,
870 operation: &RestoreApplyJournalOperation,
871 attempt: usize,
872 updated_at: Option<&String>,
873) -> Result<Option<RestoreStoppedPreconditionFailure>, RestoreRunnerError> {
874 let command = stopped_canister_status_command(config, operation);
875 let output = ProcessCommand::new(&command.program)
876 .args(&command.args)
877 .output()?;
878 let status_label = exit_status_label(output.status);
879 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
880 &output.stdout,
881 &output.stderr,
882 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
883 );
884 if output.status.success() && status_output_reports_stopped(&output_pair) {
885 return Ok(None);
886 }
887
888 Ok(Some(RestoreStoppedPreconditionFailure {
889 command,
890 status_label,
891 output: output_pair,
892 failure_reason: format!(
893 "{RESTORE_RUN_STOPPED_PRECONDITION_FAILED}-attempt-{attempt}-{}",
894 state_updated_at(updated_at)
895 ),
896 }))
897}
898
899fn stopped_canister_status_command(
901 config: &RestoreRunnerConfig,
902 operation: &RestoreApplyJournalOperation,
903) -> RestoreApplyRunnerCommand {
904 let mut args = vec!["canister".to_string()];
905 if let Some(network) = &config.command.network {
906 args.push("--network".to_string());
907 args.push(network.clone());
908 }
909 args.push("status".to_string());
910 args.push(operation.target_canister.clone());
911
912 RestoreApplyRunnerCommand {
913 program: config.command.program.clone(),
914 args,
915 mutates: false,
916 requires_stopped_canister: false,
917 note: "proves the target canister is stopped before snapshot load".to_string(),
918 }
919}
920
921fn status_output_reports_stopped(output: &RestoreApplyCommandOutputPair) -> bool {
923 output.stdout.text.contains("Status: Stopped")
924 || output.stdout.text.contains("status: stopped")
925 || output.stderr.text.contains("Status: Stopped")
926 || output.stderr.text.contains("status: stopped")
927}
928
929fn restore_run_max_steps_reached(
931 config: &RestoreRunnerConfig,
932 executed_operation_count: usize,
933 report: &RestoreApplyJournalReport,
934) -> bool {
935 config.max_steps == Some(executed_operation_count) && !report.complete
936}
937
938fn restore_run_execute_summary(
940 journal: &RestoreApplyJournal,
941 executed_operations: Vec<RestoreRunExecutedOperation>,
942 operation_receipts: Vec<RestoreRunOperationReceipt>,
943 max_steps_reached: bool,
944 requested_state_updated_at: Option<&String>,
945) -> RestoreRunResponse {
946 let report = journal.report();
947 let executed_operation_count = executed_operations.len();
948 let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
949 let next_action = restore_run_next_action(&report, false);
950
951 let mut response = RestoreRunResponse::from_report(
952 journal.backup_id.clone(),
953 report,
954 RestoreRunResponseMode::execute(stopped_reason, next_action),
955 );
956 response.set_requested_state_updated_at(requested_state_updated_at);
957 response.max_steps_reached = Some(max_steps_reached);
958 response.executed_operation_count = Some(executed_operation_count);
959 response.executed_operations = executed_operations;
960 response.set_operation_receipts(operation_receipts);
961 response
962}
963
964const fn restore_run_stopped_reason(
966 report: &RestoreApplyJournalReport,
967 max_steps_reached: bool,
968 executed: bool,
969) -> &'static str {
970 if report.complete {
971 return RESTORE_RUN_STOPPED_COMPLETE;
972 }
973 if report.failed_operations > 0 {
974 return RESTORE_RUN_STOPPED_COMMAND_FAILED;
975 }
976 if report.pending_operations > 0 {
977 return RESTORE_RUN_STOPPED_PENDING;
978 }
979 if !report.ready || report.blocked_operations > 0 {
980 return RESTORE_RUN_STOPPED_BLOCKED;
981 }
982 if max_steps_reached {
983 return RESTORE_RUN_STOPPED_MAX_STEPS;
984 }
985 if executed {
986 return RESTORE_RUN_STOPPED_READY;
987 }
988 RESTORE_RUN_STOPPED_PREVIEW
989}
990
991const fn restore_run_next_action(
993 report: &RestoreApplyJournalReport,
994 recovered_pending: bool,
995) -> &'static str {
996 if report.complete {
997 return RESTORE_RUN_ACTION_DONE;
998 }
999 if report.failed_operations > 0 {
1000 return RESTORE_RUN_ACTION_INSPECT_FAILED;
1001 }
1002 if report.pending_operations > 0 {
1003 return RESTORE_RUN_ACTION_UNCLAIM_PENDING;
1004 }
1005 if !report.ready || report.blocked_operations > 0 {
1006 return RESTORE_RUN_ACTION_FIX_BLOCKED;
1007 }
1008 if recovered_pending {
1009 return RESTORE_RUN_ACTION_RERUN;
1010 }
1011 RESTORE_RUN_ACTION_RERUN
1012}
1013
1014fn enforce_restore_run_executable(
1016 journal: &RestoreApplyJournal,
1017 report: &RestoreApplyJournalReport,
1018) -> Result<(), RestoreRunnerError> {
1019 if report.pending_operations > 0 {
1020 return Err(RestoreRunnerError::Pending {
1021 backup_id: report.backup_id.clone(),
1022 pending_operations: report.pending_operations,
1023 next_transition_sequence: report
1024 .next_transition
1025 .as_ref()
1026 .map(|operation| operation.sequence),
1027 });
1028 }
1029
1030 if report.failed_operations > 0 {
1031 return Err(RestoreRunnerError::Failed {
1032 backup_id: report.backup_id.clone(),
1033 failed_operations: report.failed_operations,
1034 });
1035 }
1036
1037 if report.ready {
1038 return Ok(());
1039 }
1040
1041 Err(RestoreRunnerError::NotReady {
1042 backup_id: journal.backup_id.clone(),
1043 reasons: report.blocked_reasons.clone(),
1044 })
1045}
1046
1047fn enforce_restore_run_command_available(
1049 preview: &RestoreApplyCommandPreview,
1050) -> Result<(), RestoreRunnerError> {
1051 if preview.command_available {
1052 return Ok(());
1053 }
1054
1055 Err(restore_command_unavailable_error(preview))
1056}
1057
1058fn restore_command_unavailable_error(preview: &RestoreApplyCommandPreview) -> RestoreRunnerError {
1060 RestoreRunnerError::CommandUnavailable {
1061 backup_id: preview.backup_id.clone(),
1062 operation_available: preview.operation_available,
1063 complete: preview.complete,
1064 blocked_reasons: preview.blocked_reasons.clone(),
1065 }
1066}
1067
1068fn exit_status_label(status: std::process::ExitStatus) -> String {
1070 status
1071 .code()
1072 .map_or_else(|| "signal".to_string(), |code| code.to_string())
1073}
1074
1075pub fn parse_uploaded_snapshot_id(output: &str) -> Option<String> {
1077 output
1078 .lines()
1079 .filter_map(|line| line.split_once(':').map(|(_, value)| value.trim()))
1080 .find(|value| !value.is_empty())
1081 .map(str::to_string)
1082}
1083
1084fn enforce_apply_claim_sequence(
1086 expected: usize,
1087 journal: &RestoreApplyJournal,
1088) -> Result<(), RestoreRunnerError> {
1089 let actual = journal
1090 .next_transition_operation()
1091 .map(|operation| operation.sequence);
1092
1093 if actual == Some(expected) {
1094 return Ok(());
1095 }
1096
1097 Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
1098}
1099
1100fn read_apply_journal_file(path: &Path) -> Result<RestoreApplyJournal, RestoreRunnerError> {
1102 let data = fs::read_to_string(path)?;
1103 let journal: RestoreApplyJournal = serde_json::from_str(&data)?;
1104 journal.validate()?;
1105 Ok(journal)
1106}
1107
1108fn state_updated_at(updated_at: Option<&String>) -> String {
1110 updated_at.cloned().unwrap_or_else(current_timestamp_marker)
1111}
1112
1113fn write_apply_journal_file(
1115 path: &Path,
1116 journal: &RestoreApplyJournal,
1117) -> Result<(), RestoreRunnerError> {
1118 let data = serde_json::to_vec_pretty(journal)?;
1119 fs::write(path, data)?;
1120 Ok(())
1121}
1122
1123struct RestoreJournalLock {
1128 path: PathBuf,
1129}
1130
1131impl RestoreJournalLock {
1132 fn acquire(journal_path: &Path) -> Result<Self, RestoreRunnerError> {
1134 let path = journal_lock_path(journal_path);
1135 match fs::OpenOptions::new()
1136 .write(true)
1137 .create_new(true)
1138 .open(&path)
1139 {
1140 Ok(mut file) => {
1141 writeln!(file, "pid={}", std::process::id())?;
1142 Ok(Self { path })
1143 }
1144 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
1145 Err(RestoreRunnerError::JournalLocked {
1146 lock_path: path.to_string_lossy().to_string(),
1147 })
1148 }
1149 Err(error) => Err(error.into()),
1150 }
1151 }
1152}
1153
1154impl Drop for RestoreJournalLock {
1155 fn drop(&mut self) {
1157 let _ = fs::remove_file(&self.path);
1158 }
1159}
1160
1161fn journal_lock_path(path: &Path) -> PathBuf {
1163 let mut lock_path = path.as_os_str().to_os_string();
1164 lock_path.push(".lock");
1165 PathBuf::from(lock_path)
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use super::*;
1171
1172 #[test]
1174 fn status_output_reports_stopped_status() {
1175 let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Stopped\n", b"", 1024);
1176
1177 assert!(status_output_reports_stopped(&output));
1178 }
1179
1180 #[test]
1182 fn status_output_rejects_running_status() {
1183 let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Running\n", b"", 1024);
1184
1185 assert!(!status_output_reports_stopped(&output));
1186 }
1187}