1use super::{
2 RestoreApplyCommandConfig, RestoreApplyCommandOutputPair, RestoreApplyCommandPreview,
3 RestoreApplyJournal, RestoreApplyJournalError, RestoreApplyJournalOperation,
4 RestoreApplyJournalReport, RestoreApplyOperationKind, RestoreApplyOperationKindCounts,
5 RestoreApplyOperationReceipt, RestoreApplyOperationState, RestoreApplyPendingSummary,
6 RestoreApplyProgressSummary, RestoreApplyReportOperation, RestoreApplyReportOutcome,
7 RestoreApplyRunnerCommand,
8};
9use crate::timestamp::current_timestamp_marker;
10use serde::Serialize;
11use std::{
12 fs,
13 io::{self, Write},
14 path::{Path, PathBuf},
15};
16use thiserror::Error as ThisError;
17
18const RESTORE_RUN_MODE_DRY_RUN: &str = "dry-run";
19const RESTORE_RUN_MODE_EXECUTE: &str = "execute";
20const RESTORE_RUN_MODE_UNCLAIM_PENDING: &str = "unclaim-pending";
21
22const RESTORE_RUN_STOPPED_BLOCKED: &str = "blocked";
23const RESTORE_RUN_STOPPED_COMMAND_FAILED: &str = "command-failed";
24const RESTORE_RUN_STOPPED_COMPLETE: &str = "complete";
25const RESTORE_RUN_STOPPED_MAX_STEPS: &str = "max-steps-reached";
26const RESTORE_RUN_STOPPED_PENDING: &str = "pending";
27const RESTORE_RUN_STOPPED_PREVIEW: &str = "preview";
28const RESTORE_RUN_STOPPED_READY: &str = "ready";
29const RESTORE_RUN_STOPPED_RECOVERED_PENDING: &str = "recovered-pending";
30
31const RESTORE_RUN_ACTION_DONE: &str = "done";
32const RESTORE_RUN_ACTION_FIX_BLOCKED: &str = "fix-blocked-journal";
33const RESTORE_RUN_ACTION_INSPECT_FAILED: &str = "inspect-failed-operation";
34const RESTORE_RUN_ACTION_RERUN: &str = "rerun";
35const RESTORE_RUN_ACTION_UNCLAIM_PENDING: &str = "unclaim-pending";
36
37pub const RESTORE_RUN_RECEIPT_COMPLETED: &str = "command-completed";
38pub const RESTORE_RUN_RECEIPT_FAILED: &str = "command-failed";
39pub const RESTORE_RUN_RECEIPT_RECOVERED_PENDING: &str = "pending-recovered";
40
41const RESTORE_RUN_EXECUTED_COMPLETED: &str = "completed";
42const RESTORE_RUN_EXECUTED_FAILED: &str = "failed";
43const RESTORE_RUN_RECEIPT_STATE_READY: &str = "ready";
44const RESTORE_RUN_COMMAND_EXIT_PREFIX: &str = "runner-command-exit";
45const RESTORE_RUN_STOPPED_PRECONDITION_FAILED: &str = "stopped-precondition-failed";
46const RESTORE_RUN_RESPONSE_VERSION: u16 = 1;
47const RESTORE_RUN_OUTPUT_RECEIPT_LIMIT: usize = 64 * 1024;
48
49#[derive(Clone, Debug, Eq, PartialEq)]
54pub struct RestoreRunnerConfig {
55 pub journal: PathBuf,
56 pub command: RestoreApplyCommandConfig,
57 pub max_steps: Option<usize>,
58 pub updated_at: Option<String>,
59}
60
61pub trait RestoreRunnerCommandExecutor {
66 fn execute(
68 &mut self,
69 command: &RestoreApplyRunnerCommand,
70 ) -> Result<RestoreRunnerCommandOutput, io::Error>;
71}
72
73#[derive(Clone, Debug, Eq, PartialEq)]
78pub struct RestoreRunnerCommandOutput {
79 pub success: bool,
80 pub status: String,
81 pub stdout: Vec<u8>,
82 pub stderr: Vec<u8>,
83}
84
85#[derive(Debug, ThisError)]
90pub enum RestoreRunnerError {
91 #[error("restore run command failed for operation {sequence}: status={status}")]
92 CommandFailed { sequence: usize, status: String },
93
94 #[error("restore apply journal is locked: {lock_path}")]
95 JournalLocked { lock_path: String },
96
97 #[error(
98 "restore apply journal for backup {backup_id} has pending operations: pending={pending_operations}, next={next_transition_sequence:?}"
99 )]
100 Pending {
101 backup_id: String,
102 pending_operations: usize,
103 next_transition_sequence: Option<usize>,
104 },
105
106 #[error(
107 "restore apply journal for backup {backup_id} has failed operations: failed={failed_operations}"
108 )]
109 Failed {
110 backup_id: String,
111 failed_operations: usize,
112 },
113
114 #[error("restore apply journal for backup {backup_id} is not ready: reasons={reasons:?}")]
115 NotReady {
116 backup_id: String,
117 reasons: Vec<String>,
118 },
119
120 #[error(
121 "restore apply journal for backup {backup_id} has no executable command: operation_available={operation_available}, complete={complete}, blocked_reasons={blocked_reasons:?}"
122 )]
123 CommandUnavailable {
124 backup_id: String,
125 operation_available: bool,
126 complete: bool,
127 blocked_reasons: Vec<String>,
128 },
129
130 #[error(
131 "restore apply journal next operation changed before claim: expected={expected}, actual={actual:?}"
132 )]
133 ClaimSequenceMismatch {
134 expected: usize,
135 actual: Option<usize>,
136 },
137
138 #[error(transparent)]
139 Io(#[from] std::io::Error),
140
141 #[error(transparent)]
142 Json(#[from] serde_json::Error),
143
144 #[error(transparent)]
145 Journal(#[from] RestoreApplyJournalError),
146}
147
148#[derive(Clone, Debug, Serialize)]
153#[expect(
154 clippy::struct_excessive_bools,
155 reason = "Runner response exposes stable JSON status flags for operators and CI"
156)]
157pub struct RestoreRunResponse {
158 pub run_version: u16,
159 pub backup_id: String,
160 pub run_mode: &'static str,
161 pub dry_run: bool,
162 pub execute: bool,
163 pub unclaim_pending: bool,
164 pub stopped_reason: &'static str,
165 pub next_action: &'static str,
166 #[serde(skip_serializing_if = "Option::is_none")]
167 pub requested_state_updated_at: Option<String>,
168 #[serde(skip_serializing_if = "Option::is_none")]
169 pub max_steps_reached: Option<bool>,
170 #[serde(default, skip_serializing_if = "Vec::is_empty")]
171 pub executed_operations: Vec<RestoreRunExecutedOperation>,
172 #[serde(default, skip_serializing_if = "Vec::is_empty")]
173 pub operation_receipts: Vec<RestoreRunOperationReceipt>,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub operation_receipt_count: Option<usize>,
176 pub operation_receipt_summary: RestoreRunReceiptSummary,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub executed_operation_count: Option<usize>,
179 #[serde(skip_serializing_if = "Option::is_none")]
180 pub recovered_operation: Option<RestoreApplyJournalOperation>,
181 pub ready: bool,
182 pub complete: bool,
183 pub attention_required: bool,
184 pub outcome: RestoreApplyReportOutcome,
185 pub operation_count: usize,
186 pub operation_counts: RestoreApplyOperationKindCounts,
187 pub progress: RestoreApplyProgressSummary,
188 pub pending_summary: RestoreApplyPendingSummary,
189 pub pending_operations: usize,
190 pub ready_operations: usize,
191 pub blocked_operations: usize,
192 pub completed_operations: usize,
193 pub failed_operations: usize,
194 pub blocked_reasons: Vec<String>,
195 pub next_transition: Option<RestoreApplyReportOperation>,
196 #[serde(skip_serializing_if = "Option::is_none")]
197 pub operation_available: Option<bool>,
198 #[serde(skip_serializing_if = "Option::is_none")]
199 pub command_available: Option<bool>,
200 #[serde(skip_serializing_if = "Option::is_none")]
201 pub command: Option<RestoreApplyRunnerCommand>,
202}
203
204impl RestoreRunResponse {
205 fn from_report(
207 backup_id: String,
208 report: RestoreApplyJournalReport,
209 mode: RestoreRunResponseMode,
210 ) -> Self {
211 Self {
212 run_version: RESTORE_RUN_RESPONSE_VERSION,
213 backup_id,
214 run_mode: mode.run_mode,
215 dry_run: mode.dry_run,
216 execute: mode.execute,
217 unclaim_pending: mode.unclaim_pending,
218 stopped_reason: mode.stopped_reason,
219 next_action: mode.next_action,
220 requested_state_updated_at: None,
221 max_steps_reached: None,
222 executed_operations: Vec::new(),
223 operation_receipts: Vec::new(),
224 operation_receipt_count: Some(0),
225 operation_receipt_summary: RestoreRunReceiptSummary::default(),
226 executed_operation_count: None,
227 recovered_operation: None,
228 ready: report.ready,
229 complete: report.complete,
230 attention_required: report.attention_required,
231 outcome: report.outcome,
232 operation_count: report.operation_count,
233 operation_counts: report.operation_counts,
234 progress: report.progress,
235 pending_summary: report.pending_summary,
236 pending_operations: report.pending_operations,
237 ready_operations: report.ready_operations,
238 blocked_operations: report.blocked_operations,
239 completed_operations: report.completed_operations,
240 failed_operations: report.failed_operations,
241 blocked_reasons: report.blocked_reasons,
242 next_transition: report.next_transition,
243 operation_available: None,
244 command_available: None,
245 command: None,
246 }
247 }
248
249 fn set_operation_receipts(&mut self, receipts: Vec<RestoreRunOperationReceipt>) {
251 self.operation_receipt_summary = RestoreRunReceiptSummary::from_receipts(&receipts);
252 self.operation_receipt_count = Some(receipts.len());
253 self.operation_receipts = receipts;
254 }
255
256 fn set_requested_state_updated_at(&mut self, updated_at: Option<&String>) {
258 self.requested_state_updated_at = updated_at.cloned();
259 }
260}
261
262#[derive(Clone, Debug, Default, Serialize)]
267pub struct RestoreRunReceiptSummary {
268 pub total_receipts: usize,
269 pub command_completed: usize,
270 pub command_failed: usize,
271 pub pending_recovered: usize,
272}
273
274#[derive(Clone, Debug, Serialize)]
279pub struct RestoreRunOperationReceipt {
280 pub event: &'static str,
281 pub sequence: usize,
282 pub operation: RestoreApplyOperationKind,
283 pub target_canister: String,
284 pub state: &'static str,
285 #[serde(skip_serializing_if = "Option::is_none")]
286 pub updated_at: Option<String>,
287 #[serde(skip_serializing_if = "Option::is_none")]
288 pub command: Option<RestoreApplyRunnerCommand>,
289 #[serde(skip_serializing_if = "Option::is_none")]
290 pub status: Option<String>,
291}
292
293#[derive(Clone, Debug, Serialize)]
298pub struct RestoreRunExecutedOperation {
299 pub sequence: usize,
300 pub operation: RestoreApplyOperationKind,
301 pub target_canister: String,
302 pub command: RestoreApplyRunnerCommand,
303 pub status: String,
304 pub state: &'static str,
305}
306
307pub struct RestoreRunnerOutcome {
312 pub response: RestoreRunResponse,
313 pub error: Option<RestoreRunnerError>,
314}
315
316impl RestoreRunnerOutcome {
317 const fn ok(response: RestoreRunResponse) -> Self {
319 Self {
320 response,
321 error: None,
322 }
323 }
324}
325
326struct RestoreStoppedPreconditionFailure {
331 command: RestoreApplyRunnerCommand,
332 status_label: String,
333 output: RestoreApplyCommandOutputPair,
334 failure_reason: String,
335}
336
337impl RestoreRunReceiptSummary {
338 fn from_receipts(receipts: &[RestoreRunOperationReceipt]) -> Self {
340 let mut summary = Self {
341 total_receipts: receipts.len(),
342 ..Self::default()
343 };
344
345 for receipt in receipts {
346 match receipt.event {
347 RESTORE_RUN_RECEIPT_COMPLETED => summary.command_completed += 1,
348 RESTORE_RUN_RECEIPT_FAILED => summary.command_failed += 1,
349 RESTORE_RUN_RECEIPT_RECOVERED_PENDING => summary.pending_recovered += 1,
350 _ => {}
351 }
352 }
353
354 summary
355 }
356}
357
358impl RestoreRunOperationReceipt {
359 fn completed(
361 operation: RestoreApplyJournalOperation,
362 command: RestoreApplyRunnerCommand,
363 status: String,
364 updated_at: Option<String>,
365 ) -> Self {
366 Self::from_operation(
367 RESTORE_RUN_RECEIPT_COMPLETED,
368 operation,
369 RESTORE_RUN_EXECUTED_COMPLETED,
370 updated_at,
371 Some(command),
372 Some(status),
373 )
374 }
375
376 fn failed(
378 operation: RestoreApplyJournalOperation,
379 command: RestoreApplyRunnerCommand,
380 status: String,
381 updated_at: Option<String>,
382 ) -> Self {
383 Self::from_operation(
384 RESTORE_RUN_RECEIPT_FAILED,
385 operation,
386 RESTORE_RUN_EXECUTED_FAILED,
387 updated_at,
388 Some(command),
389 Some(status),
390 )
391 }
392
393 fn recovered_pending(
395 operation: RestoreApplyJournalOperation,
396 updated_at: Option<String>,
397 ) -> Self {
398 Self::from_operation(
399 RESTORE_RUN_RECEIPT_RECOVERED_PENDING,
400 operation,
401 RESTORE_RUN_RECEIPT_STATE_READY,
402 updated_at,
403 None,
404 None,
405 )
406 }
407
408 fn from_operation(
410 event: &'static str,
411 operation: RestoreApplyJournalOperation,
412 state: &'static str,
413 updated_at: Option<String>,
414 command: Option<RestoreApplyRunnerCommand>,
415 status: Option<String>,
416 ) -> Self {
417 Self {
418 event,
419 sequence: operation.sequence,
420 operation: operation.operation,
421 target_canister: operation.target_canister,
422 state,
423 updated_at,
424 command,
425 status,
426 }
427 }
428}
429
430impl RestoreRunExecutedOperation {
431 fn completed(
433 operation: RestoreApplyJournalOperation,
434 command: RestoreApplyRunnerCommand,
435 status: String,
436 ) -> Self {
437 Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_COMPLETED)
438 }
439
440 fn failed(
442 operation: RestoreApplyJournalOperation,
443 command: RestoreApplyRunnerCommand,
444 status: String,
445 ) -> Self {
446 Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_FAILED)
447 }
448
449 fn from_operation(
451 operation: RestoreApplyJournalOperation,
452 command: RestoreApplyRunnerCommand,
453 status: String,
454 state: &'static str,
455 ) -> Self {
456 Self {
457 sequence: operation.sequence,
458 operation: operation.operation,
459 target_canister: operation.target_canister,
460 command,
461 status,
462 state,
463 }
464 }
465}
466
467struct RestoreRunResponseMode {
472 run_mode: &'static str,
473 dry_run: bool,
474 execute: bool,
475 unclaim_pending: bool,
476 stopped_reason: &'static str,
477 next_action: &'static str,
478}
479
480impl RestoreRunResponseMode {
481 const fn new(
483 run_mode: &'static str,
484 dry_run: bool,
485 execute: bool,
486 unclaim_pending: bool,
487 stopped_reason: &'static str,
488 next_action: &'static str,
489 ) -> Self {
490 Self {
491 run_mode,
492 dry_run,
493 execute,
494 unclaim_pending,
495 stopped_reason,
496 next_action,
497 }
498 }
499
500 const fn dry_run(stopped_reason: &'static str, next_action: &'static str) -> Self {
502 Self::new(
503 RESTORE_RUN_MODE_DRY_RUN,
504 true,
505 false,
506 false,
507 stopped_reason,
508 next_action,
509 )
510 }
511
512 const fn execute(stopped_reason: &'static str, next_action: &'static str) -> Self {
514 Self::new(
515 RESTORE_RUN_MODE_EXECUTE,
516 false,
517 true,
518 false,
519 stopped_reason,
520 next_action,
521 )
522 }
523
524 const fn unclaim_pending(next_action: &'static str) -> Self {
526 Self::new(
527 RESTORE_RUN_MODE_UNCLAIM_PENDING,
528 false,
529 false,
530 true,
531 RESTORE_RUN_STOPPED_RECOVERED_PENDING,
532 next_action,
533 )
534 }
535}
536
537struct RestoreRunPreparedOperation {
542 operation: RestoreApplyJournalOperation,
543 command: RestoreApplyRunnerCommand,
544 sequence: usize,
545 attempt: usize,
546}
547
548enum RestoreRunStepOutcome {
553 Completed {
554 executed_operation: RestoreRunExecutedOperation,
555 operation_receipt: RestoreRunOperationReceipt,
556 },
557 Failed {
558 executed_operation: RestoreRunExecutedOperation,
559 operation_receipt: RestoreRunOperationReceipt,
560 status: String,
561 },
562}
563
564pub fn restore_run_dry_run(
566 config: &RestoreRunnerConfig,
567) -> Result<RestoreRunResponse, RestoreRunnerError> {
568 let journal = read_apply_journal_file(&config.journal)?;
569 let report = journal.report();
570 let preview = journal.next_command_preview_with_config(&config.command);
571 let stopped_reason = restore_run_stopped_reason(&report, false, false);
572 let next_action = restore_run_next_action(&report, false);
573
574 let mut response = RestoreRunResponse::from_report(
575 journal.backup_id,
576 report,
577 RestoreRunResponseMode::dry_run(stopped_reason, next_action),
578 );
579 response.set_requested_state_updated_at(config.updated_at.as_ref());
580 response.operation_available = Some(preview.operation_available);
581 response.command_available = Some(preview.command_available);
582 response.command = preview.command;
583 Ok(response)
584}
585
586pub fn restore_run_unclaim_pending(
588 config: &RestoreRunnerConfig,
589) -> Result<RestoreRunResponse, RestoreRunnerError> {
590 let _lock = RestoreJournalLock::acquire(&config.journal)?;
591 let mut journal = read_apply_journal_file(&config.journal)?;
592 let recovered_operation = journal
593 .next_transition_operation()
594 .filter(|operation| operation.state == RestoreApplyOperationState::Pending)
595 .cloned()
596 .ok_or(RestoreApplyJournalError::NoPendingOperation)?;
597
598 let recovered_updated_at = state_updated_at(config.updated_at.as_ref());
599 journal.mark_next_operation_ready_at(Some(recovered_updated_at.clone()))?;
600 write_apply_journal_file(&config.journal, &journal)?;
601
602 let report = journal.report();
603 let next_action = restore_run_next_action(&report, true);
604 let mut response = RestoreRunResponse::from_report(
605 journal.backup_id,
606 report,
607 RestoreRunResponseMode::unclaim_pending(next_action),
608 );
609 response.set_requested_state_updated_at(config.updated_at.as_ref());
610 response.set_operation_receipts(vec![RestoreRunOperationReceipt::recovered_pending(
611 recovered_operation.clone(),
612 Some(recovered_updated_at),
613 )]);
614 response.recovered_operation = Some(recovered_operation);
615 Ok(response)
616}
617
618pub fn restore_run_execute_with_executor(
620 config: &RestoreRunnerConfig,
621 executor: &mut impl RestoreRunnerCommandExecutor,
622) -> Result<RestoreRunResponse, RestoreRunnerError> {
623 let run = restore_run_execute_result_with_executor(config, executor)?;
624 if let Some(error) = run.error {
625 return Err(error);
626 }
627
628 Ok(run.response)
629}
630
631pub fn restore_run_execute_result_with_executor(
633 config: &RestoreRunnerConfig,
634 executor: &mut impl RestoreRunnerCommandExecutor,
635) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
636 let _lock = RestoreJournalLock::acquire(&config.journal)?;
637 let mut journal = read_apply_journal_file(&config.journal)?;
638 let mut executed_operations = Vec::new();
639 let mut operation_receipts = Vec::new();
640
641 loop {
642 let report = journal.report();
643 let max_steps_reached =
644 restore_run_max_steps_reached(config, executed_operations.len(), &report);
645 if report.complete || max_steps_reached {
646 return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
647 &journal,
648 executed_operations,
649 operation_receipts,
650 max_steps_reached,
651 config.updated_at.as_ref(),
652 )));
653 }
654
655 enforce_restore_run_executable(&journal, &report)?;
656 let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
657 let sequence = prepared.sequence;
658 match restore_run_execute_prepared_operation(config, executor, &mut journal, prepared)? {
659 RestoreRunStepOutcome::Completed {
660 executed_operation,
661 operation_receipt,
662 } => {
663 executed_operations.push(executed_operation);
664 operation_receipts.push(operation_receipt);
665 }
666 RestoreRunStepOutcome::Failed {
667 executed_operation,
668 operation_receipt,
669 status,
670 } => {
671 executed_operations.push(executed_operation);
672 operation_receipts.push(operation_receipt);
673 let response = restore_run_execute_summary(
674 &journal,
675 executed_operations,
676 operation_receipts,
677 false,
678 config.updated_at.as_ref(),
679 );
680 return Ok(RestoreRunnerOutcome {
681 response,
682 error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
683 });
684 }
685 }
686 }
687}
688
689fn restore_run_prepare_next_operation(
691 config: &RestoreRunnerConfig,
692 journal: &mut RestoreApplyJournal,
693) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
694 let preview = journal.next_command_preview_with_config(&config.command);
695 enforce_restore_run_command_available(&preview)?;
696
697 let operation = preview
698 .operation
699 .clone()
700 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
701 let command = preview
702 .command
703 .clone()
704 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
705 let sequence = operation.sequence;
706 let attempt = journal
707 .operation_receipts
708 .iter()
709 .filter(|receipt| receipt.sequence == sequence)
710 .count()
711 + 1;
712
713 enforce_apply_claim_sequence(sequence, journal)?;
714 journal
715 .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
716 write_apply_journal_file(&config.journal, journal)?;
717
718 Ok(RestoreRunPreparedOperation {
719 operation,
720 command,
721 sequence,
722 attempt,
723 })
724}
725
726fn restore_run_execute_prepared_operation(
728 config: &RestoreRunnerConfig,
729 executor: &mut impl RestoreRunnerCommandExecutor,
730 journal: &mut RestoreApplyJournal,
731 prepared: RestoreRunPreparedOperation,
732) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
733 if prepared.command.requires_stopped_canister
734 && let Some(outcome) = enforce_stopped_canister_precondition(
735 config,
736 executor,
737 &prepared.operation,
738 prepared.attempt,
739 config.updated_at.as_ref(),
740 )?
741 {
742 return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
743 }
744
745 let output = executor.execute(&prepared.command)?;
746 let status_label = output.status;
747 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
748 &output.stdout,
749 &output.stderr,
750 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
751 );
752
753 if output.success {
754 let uploaded_snapshot_id =
755 parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout));
756 return restore_run_commit_command_success(
757 config,
758 journal,
759 prepared,
760 status_label,
761 output_pair,
762 uploaded_snapshot_id,
763 );
764 }
765
766 restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
767}
768
769fn restore_run_commit_precondition_failure(
771 config: &RestoreRunnerConfig,
772 journal: &mut RestoreApplyJournal,
773 prepared: RestoreRunPreparedOperation,
774 outcome: RestoreStoppedPreconditionFailure,
775) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
776 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
777 journal.mark_operation_failed_at(
778 prepared.sequence,
779 outcome.failure_reason.clone(),
780 Some(failed_updated_at.clone()),
781 )?;
782 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
783 &prepared.operation,
784 outcome.command.clone(),
785 outcome.status_label.clone(),
786 Some(failed_updated_at.clone()),
787 outcome.output,
788 prepared.attempt,
789 outcome.failure_reason,
790 ))?;
791 write_apply_journal_file(&config.journal, journal)?;
792
793 Ok(RestoreRunStepOutcome::Failed {
794 executed_operation: RestoreRunExecutedOperation::failed(
795 prepared.operation.clone(),
796 outcome.command.clone(),
797 outcome.status_label.clone(),
798 ),
799 operation_receipt: RestoreRunOperationReceipt::failed(
800 prepared.operation,
801 outcome.command,
802 outcome.status_label,
803 Some(failed_updated_at),
804 ),
805 status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
806 })
807}
808
809fn restore_run_commit_command_success(
811 config: &RestoreRunnerConfig,
812 journal: &mut RestoreApplyJournal,
813 prepared: RestoreRunPreparedOperation,
814 status_label: String,
815 output_pair: RestoreApplyCommandOutputPair,
816 uploaded_snapshot_id: Option<String>,
817) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
818 let completed_updated_at = state_updated_at(config.updated_at.as_ref());
819 journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
820 if prepared.operation.operation != RestoreApplyOperationKind::UploadSnapshot
821 || uploaded_snapshot_id.is_some()
822 {
823 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
824 &prepared.operation,
825 prepared.command.clone(),
826 status_label.clone(),
827 Some(completed_updated_at.clone()),
828 output_pair,
829 prepared.attempt,
830 uploaded_snapshot_id,
831 ))?;
832 }
833 write_apply_journal_file(&config.journal, journal)?;
834
835 Ok(RestoreRunStepOutcome::Completed {
836 executed_operation: RestoreRunExecutedOperation::completed(
837 prepared.operation.clone(),
838 prepared.command.clone(),
839 status_label.clone(),
840 ),
841 operation_receipt: RestoreRunOperationReceipt::completed(
842 prepared.operation,
843 prepared.command,
844 status_label,
845 Some(completed_updated_at),
846 ),
847 })
848}
849
850fn restore_run_commit_command_failure(
852 config: &RestoreRunnerConfig,
853 journal: &mut RestoreApplyJournal,
854 prepared: RestoreRunPreparedOperation,
855 status_label: String,
856 output_pair: RestoreApplyCommandOutputPair,
857) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
858 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
859 let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
860 journal.mark_operation_failed_at(
861 prepared.sequence,
862 failure_reason.clone(),
863 Some(failed_updated_at.clone()),
864 )?;
865 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
866 &prepared.operation,
867 prepared.command.clone(),
868 status_label.clone(),
869 Some(failed_updated_at.clone()),
870 output_pair,
871 prepared.attempt,
872 failure_reason,
873 ))?;
874 write_apply_journal_file(&config.journal, journal)?;
875
876 Ok(RestoreRunStepOutcome::Failed {
877 executed_operation: RestoreRunExecutedOperation::failed(
878 prepared.operation.clone(),
879 prepared.command.clone(),
880 status_label.clone(),
881 ),
882 operation_receipt: RestoreRunOperationReceipt::failed(
883 prepared.operation,
884 prepared.command,
885 status_label.clone(),
886 Some(failed_updated_at),
887 ),
888 status: status_label,
889 })
890}
891
892fn enforce_stopped_canister_precondition(
894 config: &RestoreRunnerConfig,
895 executor: &mut impl RestoreRunnerCommandExecutor,
896 operation: &RestoreApplyJournalOperation,
897 attempt: usize,
898 updated_at: Option<&String>,
899) -> Result<Option<RestoreStoppedPreconditionFailure>, RestoreRunnerError> {
900 let command = stopped_canister_status_command(config, operation);
901 let output = executor.execute(&command)?;
902 let status_label = output.status;
903 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
904 &output.stdout,
905 &output.stderr,
906 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
907 );
908 if output.success && status_output_reports_stopped(&output_pair) {
909 return Ok(None);
910 }
911
912 Ok(Some(RestoreStoppedPreconditionFailure {
913 command,
914 status_label,
915 output: output_pair,
916 failure_reason: format!(
917 "{RESTORE_RUN_STOPPED_PRECONDITION_FAILED}-attempt-{attempt}-{}",
918 state_updated_at(updated_at)
919 ),
920 }))
921}
922
923fn stopped_canister_status_command(
925 config: &RestoreRunnerConfig,
926 operation: &RestoreApplyJournalOperation,
927) -> RestoreApplyRunnerCommand {
928 let mut args = vec!["canister".to_string()];
929 if let Some(network) = &config.command.network {
930 args.push("--network".to_string());
931 args.push(network.clone());
932 }
933 args.push("status".to_string());
934 args.push(operation.target_canister.clone());
935
936 RestoreApplyRunnerCommand {
937 program: config.command.program.clone(),
938 args,
939 mutates: false,
940 requires_stopped_canister: false,
941 note: "proves the target canister is stopped before snapshot load".to_string(),
942 }
943}
944
945fn status_output_reports_stopped(output: &RestoreApplyCommandOutputPair) -> bool {
947 output.stdout.text.contains("Status: Stopped")
948 || output.stdout.text.contains("status: stopped")
949 || output.stderr.text.contains("Status: Stopped")
950 || output.stderr.text.contains("status: stopped")
951}
952
953fn restore_run_max_steps_reached(
955 config: &RestoreRunnerConfig,
956 executed_operation_count: usize,
957 report: &RestoreApplyJournalReport,
958) -> bool {
959 config.max_steps == Some(executed_operation_count) && !report.complete
960}
961
962fn restore_run_execute_summary(
964 journal: &RestoreApplyJournal,
965 executed_operations: Vec<RestoreRunExecutedOperation>,
966 operation_receipts: Vec<RestoreRunOperationReceipt>,
967 max_steps_reached: bool,
968 requested_state_updated_at: Option<&String>,
969) -> RestoreRunResponse {
970 let report = journal.report();
971 let executed_operation_count = executed_operations.len();
972 let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
973 let next_action = restore_run_next_action(&report, false);
974
975 let mut response = RestoreRunResponse::from_report(
976 journal.backup_id.clone(),
977 report,
978 RestoreRunResponseMode::execute(stopped_reason, next_action),
979 );
980 response.set_requested_state_updated_at(requested_state_updated_at);
981 response.max_steps_reached = Some(max_steps_reached);
982 response.executed_operation_count = Some(executed_operation_count);
983 response.executed_operations = executed_operations;
984 response.set_operation_receipts(operation_receipts);
985 response
986}
987
988const fn restore_run_stopped_reason(
990 report: &RestoreApplyJournalReport,
991 max_steps_reached: bool,
992 executed: bool,
993) -> &'static str {
994 if report.complete {
995 return RESTORE_RUN_STOPPED_COMPLETE;
996 }
997 if report.failed_operations > 0 {
998 return RESTORE_RUN_STOPPED_COMMAND_FAILED;
999 }
1000 if report.pending_operations > 0 {
1001 return RESTORE_RUN_STOPPED_PENDING;
1002 }
1003 if !report.ready || report.blocked_operations > 0 {
1004 return RESTORE_RUN_STOPPED_BLOCKED;
1005 }
1006 if max_steps_reached {
1007 return RESTORE_RUN_STOPPED_MAX_STEPS;
1008 }
1009 if executed {
1010 return RESTORE_RUN_STOPPED_READY;
1011 }
1012 RESTORE_RUN_STOPPED_PREVIEW
1013}
1014
1015const fn restore_run_next_action(
1017 report: &RestoreApplyJournalReport,
1018 recovered_pending: bool,
1019) -> &'static str {
1020 if report.complete {
1021 return RESTORE_RUN_ACTION_DONE;
1022 }
1023 if report.failed_operations > 0 {
1024 return RESTORE_RUN_ACTION_INSPECT_FAILED;
1025 }
1026 if report.pending_operations > 0 {
1027 return RESTORE_RUN_ACTION_UNCLAIM_PENDING;
1028 }
1029 if !report.ready || report.blocked_operations > 0 {
1030 return RESTORE_RUN_ACTION_FIX_BLOCKED;
1031 }
1032 if recovered_pending {
1033 return RESTORE_RUN_ACTION_RERUN;
1034 }
1035 RESTORE_RUN_ACTION_RERUN
1036}
1037
1038fn enforce_restore_run_executable(
1040 journal: &RestoreApplyJournal,
1041 report: &RestoreApplyJournalReport,
1042) -> Result<(), RestoreRunnerError> {
1043 if report.pending_operations > 0 {
1044 return Err(RestoreRunnerError::Pending {
1045 backup_id: report.backup_id.clone(),
1046 pending_operations: report.pending_operations,
1047 next_transition_sequence: report
1048 .next_transition
1049 .as_ref()
1050 .map(|operation| operation.sequence),
1051 });
1052 }
1053
1054 if report.failed_operations > 0 {
1055 return Err(RestoreRunnerError::Failed {
1056 backup_id: report.backup_id.clone(),
1057 failed_operations: report.failed_operations,
1058 });
1059 }
1060
1061 if report.ready {
1062 return Ok(());
1063 }
1064
1065 Err(RestoreRunnerError::NotReady {
1066 backup_id: journal.backup_id.clone(),
1067 reasons: report.blocked_reasons.clone(),
1068 })
1069}
1070
1071fn enforce_restore_run_command_available(
1073 preview: &RestoreApplyCommandPreview,
1074) -> Result<(), RestoreRunnerError> {
1075 if preview.command_available {
1076 return Ok(());
1077 }
1078
1079 Err(restore_command_unavailable_error(preview))
1080}
1081
1082fn restore_command_unavailable_error(preview: &RestoreApplyCommandPreview) -> RestoreRunnerError {
1084 RestoreRunnerError::CommandUnavailable {
1085 backup_id: preview.backup_id.clone(),
1086 operation_available: preview.operation_available,
1087 complete: preview.complete,
1088 blocked_reasons: preview.blocked_reasons.clone(),
1089 }
1090}
1091
1092pub fn parse_uploaded_snapshot_id(output: &str) -> Option<String> {
1094 output
1095 .lines()
1096 .filter_map(|line| line.split_once(':').map(|(_, value)| value.trim()))
1097 .find(|value| !value.is_empty())
1098 .map(str::to_string)
1099}
1100
1101fn enforce_apply_claim_sequence(
1103 expected: usize,
1104 journal: &RestoreApplyJournal,
1105) -> Result<(), RestoreRunnerError> {
1106 let actual = journal
1107 .next_transition_operation()
1108 .map(|operation| operation.sequence);
1109
1110 if actual == Some(expected) {
1111 return Ok(());
1112 }
1113
1114 Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
1115}
1116
1117fn read_apply_journal_file(path: &Path) -> Result<RestoreApplyJournal, RestoreRunnerError> {
1119 let data = fs::read_to_string(path)?;
1120 let journal: RestoreApplyJournal = serde_json::from_str(&data)?;
1121 journal.validate()?;
1122 Ok(journal)
1123}
1124
1125fn state_updated_at(updated_at: Option<&String>) -> String {
1127 updated_at.cloned().unwrap_or_else(current_timestamp_marker)
1128}
1129
1130fn write_apply_journal_file(
1132 path: &Path,
1133 journal: &RestoreApplyJournal,
1134) -> Result<(), RestoreRunnerError> {
1135 let data = serde_json::to_vec_pretty(journal)?;
1136 fs::write(path, data)?;
1137 Ok(())
1138}
1139
1140struct RestoreJournalLock {
1145 path: PathBuf,
1146}
1147
1148impl RestoreJournalLock {
1149 fn acquire(journal_path: &Path) -> Result<Self, RestoreRunnerError> {
1151 let path = journal_lock_path(journal_path);
1152 match fs::OpenOptions::new()
1153 .write(true)
1154 .create_new(true)
1155 .open(&path)
1156 {
1157 Ok(mut file) => {
1158 writeln!(file, "pid={}", std::process::id())?;
1159 Ok(Self { path })
1160 }
1161 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
1162 Err(RestoreRunnerError::JournalLocked {
1163 lock_path: path.to_string_lossy().to_string(),
1164 })
1165 }
1166 Err(error) => Err(error.into()),
1167 }
1168 }
1169}
1170
1171impl Drop for RestoreJournalLock {
1172 fn drop(&mut self) {
1174 let _ = fs::remove_file(&self.path);
1175 }
1176}
1177
1178fn journal_lock_path(path: &Path) -> PathBuf {
1180 let mut lock_path = path.as_os_str().to_os_string();
1181 lock_path.push(".lock");
1182 PathBuf::from(lock_path)
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187 use super::*;
1188
1189 #[test]
1191 fn status_output_reports_stopped_status() {
1192 let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Stopped\n", b"", 1024);
1193
1194 assert!(status_output_reports_stopped(&output));
1195 }
1196
1197 #[test]
1199 fn status_output_rejects_running_status() {
1200 let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Running\n", b"", 1024);
1201
1202 assert!(!status_output_reports_stopped(&output));
1203 }
1204}