Skip to main content

canic_backup/restore/runner/
mod.rs

1use super::{
2    RestoreApplyCommandConfig, RestoreApplyCommandOutputPair, RestoreApplyCommandPreview,
3    RestoreApplyJournal, RestoreApplyJournalError, RestoreApplyJournalOperation,
4    RestoreApplyJournalReport, RestoreApplyOperationKind, RestoreApplyOperationKindCounts,
5    RestoreApplyOperationReceipt, RestoreApplyOperationState, RestoreApplyPendingSummary,
6    RestoreApplyProgressSummary, RestoreApplyReportOperation, RestoreApplyReportOutcome,
7    RestoreApplyRunnerCommand,
8};
9use crate::timestamp::current_timestamp_marker;
10use serde::Serialize;
11use std::{
12    fs,
13    io::{self, Write},
14    path::{Path, PathBuf},
15};
16use thiserror::Error as ThisError;
17
18const RESTORE_RUN_MODE_DRY_RUN: &str = "dry-run";
19const RESTORE_RUN_MODE_EXECUTE: &str = "execute";
20const RESTORE_RUN_MODE_UNCLAIM_PENDING: &str = "unclaim-pending";
21
22const RESTORE_RUN_STOPPED_BLOCKED: &str = "blocked";
23const RESTORE_RUN_STOPPED_COMMAND_FAILED: &str = "command-failed";
24const RESTORE_RUN_STOPPED_COMPLETE: &str = "complete";
25const RESTORE_RUN_STOPPED_MAX_STEPS: &str = "max-steps-reached";
26const RESTORE_RUN_STOPPED_PENDING: &str = "pending";
27const RESTORE_RUN_STOPPED_PREVIEW: &str = "preview";
28const RESTORE_RUN_STOPPED_READY: &str = "ready";
29const RESTORE_RUN_STOPPED_RECOVERED_PENDING: &str = "recovered-pending";
30
31const RESTORE_RUN_ACTION_DONE: &str = "done";
32const RESTORE_RUN_ACTION_FIX_BLOCKED: &str = "fix-blocked-journal";
33const RESTORE_RUN_ACTION_INSPECT_FAILED: &str = "inspect-failed-operation";
34const RESTORE_RUN_ACTION_RERUN: &str = "rerun";
35const RESTORE_RUN_ACTION_UNCLAIM_PENDING: &str = "unclaim-pending";
36
37pub const RESTORE_RUN_RECEIPT_COMPLETED: &str = "command-completed";
38pub const RESTORE_RUN_RECEIPT_FAILED: &str = "command-failed";
39pub const RESTORE_RUN_RECEIPT_RECOVERED_PENDING: &str = "pending-recovered";
40
41const RESTORE_RUN_EXECUTED_COMPLETED: &str = "completed";
42const RESTORE_RUN_EXECUTED_FAILED: &str = "failed";
43const RESTORE_RUN_RECEIPT_STATE_READY: &str = "ready";
44const RESTORE_RUN_COMMAND_EXIT_PREFIX: &str = "runner-command-exit";
45const RESTORE_RUN_STOPPED_PRECONDITION_FAILED: &str = "stopped-precondition-failed";
46const RESTORE_RUN_RESPONSE_VERSION: u16 = 1;
47const RESTORE_RUN_OUTPUT_RECEIPT_LIMIT: usize = 64 * 1024;
48
49///
50/// RestoreRunnerConfig
51///
52
53#[derive(Clone, Debug, Eq, PartialEq)]
54pub struct RestoreRunnerConfig {
55    pub journal: PathBuf,
56    pub command: RestoreApplyCommandConfig,
57    pub max_steps: Option<usize>,
58    pub updated_at: Option<String>,
59}
60
61///
62/// RestoreRunnerCommandExecutor
63///
64
65pub trait RestoreRunnerCommandExecutor {
66    /// Execute one rendered restore runner command.
67    fn execute(
68        &mut self,
69        command: &RestoreApplyRunnerCommand,
70    ) -> Result<RestoreRunnerCommandOutput, io::Error>;
71}
72
73///
74/// RestoreRunnerCommandOutput
75///
76
77#[derive(Clone, Debug, Eq, PartialEq)]
78pub struct RestoreRunnerCommandOutput {
79    pub success: bool,
80    pub status: String,
81    pub stdout: Vec<u8>,
82    pub stderr: Vec<u8>,
83}
84
85///
86/// RestoreRunnerError
87///
88
89#[derive(Debug, ThisError)]
90pub enum RestoreRunnerError {
91    #[error("restore run command failed for operation {sequence}: status={status}")]
92    CommandFailed { sequence: usize, status: String },
93
94    #[error("restore apply journal is locked: {lock_path}")]
95    JournalLocked { lock_path: String },
96
97    #[error(
98        "restore apply journal for backup {backup_id} has pending operations: pending={pending_operations}, next={next_transition_sequence:?}"
99    )]
100    Pending {
101        backup_id: String,
102        pending_operations: usize,
103        next_transition_sequence: Option<usize>,
104    },
105
106    #[error(
107        "restore apply journal for backup {backup_id} has failed operations: failed={failed_operations}"
108    )]
109    Failed {
110        backup_id: String,
111        failed_operations: usize,
112    },
113
114    #[error("restore apply journal for backup {backup_id} is not ready: reasons={reasons:?}")]
115    NotReady {
116        backup_id: String,
117        reasons: Vec<String>,
118    },
119
120    #[error(
121        "restore apply journal for backup {backup_id} has no executable command: operation_available={operation_available}, complete={complete}, blocked_reasons={blocked_reasons:?}"
122    )]
123    CommandUnavailable {
124        backup_id: String,
125        operation_available: bool,
126        complete: bool,
127        blocked_reasons: Vec<String>,
128    },
129
130    #[error(
131        "restore apply journal next operation changed before claim: expected={expected}, actual={actual:?}"
132    )]
133    ClaimSequenceMismatch {
134        expected: usize,
135        actual: Option<usize>,
136    },
137
138    #[error(transparent)]
139    Io(#[from] std::io::Error),
140
141    #[error(transparent)]
142    Json(#[from] serde_json::Error),
143
144    #[error(transparent)]
145    Journal(#[from] RestoreApplyJournalError),
146}
147
148///
149/// RestoreRunResponse
150///
151
152#[derive(Clone, Debug, Serialize)]
153#[expect(
154    clippy::struct_excessive_bools,
155    reason = "Runner response exposes stable JSON status flags for operators and CI"
156)]
157pub struct RestoreRunResponse {
158    pub run_version: u16,
159    pub backup_id: String,
160    pub run_mode: &'static str,
161    pub dry_run: bool,
162    pub execute: bool,
163    pub unclaim_pending: bool,
164    pub stopped_reason: &'static str,
165    pub next_action: &'static str,
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub requested_state_updated_at: Option<String>,
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub max_steps_reached: Option<bool>,
170    #[serde(default, skip_serializing_if = "Vec::is_empty")]
171    pub executed_operations: Vec<RestoreRunExecutedOperation>,
172    #[serde(default, skip_serializing_if = "Vec::is_empty")]
173    pub operation_receipts: Vec<RestoreRunOperationReceipt>,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub operation_receipt_count: Option<usize>,
176    pub operation_receipt_summary: RestoreRunReceiptSummary,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub executed_operation_count: Option<usize>,
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub recovered_operation: Option<RestoreApplyJournalOperation>,
181    pub ready: bool,
182    pub complete: bool,
183    pub attention_required: bool,
184    pub outcome: RestoreApplyReportOutcome,
185    pub operation_count: usize,
186    pub operation_counts: RestoreApplyOperationKindCounts,
187    pub progress: RestoreApplyProgressSummary,
188    pub pending_summary: RestoreApplyPendingSummary,
189    pub pending_operations: usize,
190    pub ready_operations: usize,
191    pub blocked_operations: usize,
192    pub completed_operations: usize,
193    pub failed_operations: usize,
194    pub blocked_reasons: Vec<String>,
195    pub next_transition: Option<RestoreApplyReportOperation>,
196    #[serde(skip_serializing_if = "Option::is_none")]
197    pub operation_available: Option<bool>,
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub command_available: Option<bool>,
200    #[serde(skip_serializing_if = "Option::is_none")]
201    pub command: Option<RestoreApplyRunnerCommand>,
202}
203
204impl RestoreRunResponse {
205    // Build the shared native runner response fields from an apply journal report.
206    fn from_report(
207        backup_id: String,
208        report: RestoreApplyJournalReport,
209        mode: RestoreRunResponseMode,
210    ) -> Self {
211        Self {
212            run_version: RESTORE_RUN_RESPONSE_VERSION,
213            backup_id,
214            run_mode: mode.run_mode,
215            dry_run: mode.dry_run,
216            execute: mode.execute,
217            unclaim_pending: mode.unclaim_pending,
218            stopped_reason: mode.stopped_reason,
219            next_action: mode.next_action,
220            requested_state_updated_at: None,
221            max_steps_reached: None,
222            executed_operations: Vec::new(),
223            operation_receipts: Vec::new(),
224            operation_receipt_count: Some(0),
225            operation_receipt_summary: RestoreRunReceiptSummary::default(),
226            executed_operation_count: None,
227            recovered_operation: None,
228            ready: report.ready,
229            complete: report.complete,
230            attention_required: report.attention_required,
231            outcome: report.outcome,
232            operation_count: report.operation_count,
233            operation_counts: report.operation_counts,
234            progress: report.progress,
235            pending_summary: report.pending_summary,
236            pending_operations: report.pending_operations,
237            ready_operations: report.ready_operations,
238            blocked_operations: report.blocked_operations,
239            completed_operations: report.completed_operations,
240            failed_operations: report.failed_operations,
241            blocked_reasons: report.blocked_reasons,
242            next_transition: report.next_transition,
243            operation_available: None,
244            command_available: None,
245            command: None,
246        }
247    }
248
249    // Replace the detailed receipt stream and refresh the compact counters.
250    fn set_operation_receipts(&mut self, receipts: Vec<RestoreRunOperationReceipt>) {
251        self.operation_receipt_summary = RestoreRunReceiptSummary::from_receipts(&receipts);
252        self.operation_receipt_count = Some(receipts.len());
253        self.operation_receipts = receipts;
254    }
255
256    // Echo the caller-provided state marker for receipt-free runner summaries.
257    fn set_requested_state_updated_at(&mut self, updated_at: Option<&String>) {
258        self.requested_state_updated_at = updated_at.cloned();
259    }
260}
261
262///
263/// RestoreRunReceiptSummary
264///
265
266#[derive(Clone, Debug, Default, Serialize)]
267pub struct RestoreRunReceiptSummary {
268    pub total_receipts: usize,
269    pub command_completed: usize,
270    pub command_failed: usize,
271    pub pending_recovered: usize,
272}
273
274///
275/// RestoreRunOperationReceipt
276///
277
278#[derive(Clone, Debug, Serialize)]
279pub struct RestoreRunOperationReceipt {
280    pub event: &'static str,
281    pub sequence: usize,
282    pub operation: RestoreApplyOperationKind,
283    pub target_canister: String,
284    pub state: &'static str,
285    #[serde(skip_serializing_if = "Option::is_none")]
286    pub updated_at: Option<String>,
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub command: Option<RestoreApplyRunnerCommand>,
289    #[serde(skip_serializing_if = "Option::is_none")]
290    pub status: Option<String>,
291}
292
293///
294/// RestoreRunExecutedOperation
295///
296
297#[derive(Clone, Debug, Serialize)]
298pub struct RestoreRunExecutedOperation {
299    pub sequence: usize,
300    pub operation: RestoreApplyOperationKind,
301    pub target_canister: String,
302    pub command: RestoreApplyRunnerCommand,
303    pub status: String,
304    pub state: &'static str,
305}
306
307///
308/// RestoreRunnerOutcome
309///
310
311pub struct RestoreRunnerOutcome {
312    pub response: RestoreRunResponse,
313    pub error: Option<RestoreRunnerError>,
314}
315
316impl RestoreRunnerOutcome {
317    // Build a successful runner response with no deferred error.
318    const fn ok(response: RestoreRunResponse) -> Self {
319        Self {
320            response,
321            error: None,
322        }
323    }
324}
325
326///
327/// RestoreStoppedPreconditionFailure
328///
329
330struct RestoreStoppedPreconditionFailure {
331    command: RestoreApplyRunnerCommand,
332    status_label: String,
333    output: RestoreApplyCommandOutputPair,
334    failure_reason: String,
335}
336
337impl RestoreRunReceiptSummary {
338    // Count restore runner receipt classes for script-friendly summaries.
339    fn from_receipts(receipts: &[RestoreRunOperationReceipt]) -> Self {
340        let mut summary = Self {
341            total_receipts: receipts.len(),
342            ..Self::default()
343        };
344
345        for receipt in receipts {
346            match receipt.event {
347                RESTORE_RUN_RECEIPT_COMPLETED => summary.command_completed += 1,
348                RESTORE_RUN_RECEIPT_FAILED => summary.command_failed += 1,
349                RESTORE_RUN_RECEIPT_RECOVERED_PENDING => summary.pending_recovered += 1,
350                _ => {}
351            }
352        }
353
354        summary
355    }
356}
357
358impl RestoreRunOperationReceipt {
359    // Build a receipt for a completed runner command.
360    fn completed(
361        operation: RestoreApplyJournalOperation,
362        command: RestoreApplyRunnerCommand,
363        status: String,
364        updated_at: Option<String>,
365    ) -> Self {
366        Self::from_operation(
367            RESTORE_RUN_RECEIPT_COMPLETED,
368            operation,
369            RESTORE_RUN_EXECUTED_COMPLETED,
370            updated_at,
371            Some(command),
372            Some(status),
373        )
374    }
375
376    // Build a receipt for a failed runner command.
377    fn failed(
378        operation: RestoreApplyJournalOperation,
379        command: RestoreApplyRunnerCommand,
380        status: String,
381        updated_at: Option<String>,
382    ) -> Self {
383        Self::from_operation(
384            RESTORE_RUN_RECEIPT_FAILED,
385            operation,
386            RESTORE_RUN_EXECUTED_FAILED,
387            updated_at,
388            Some(command),
389            Some(status),
390        )
391    }
392
393    // Build a receipt for a recovered pending operation.
394    fn recovered_pending(
395        operation: RestoreApplyJournalOperation,
396        updated_at: Option<String>,
397    ) -> Self {
398        Self::from_operation(
399            RESTORE_RUN_RECEIPT_RECOVERED_PENDING,
400            operation,
401            RESTORE_RUN_RECEIPT_STATE_READY,
402            updated_at,
403            None,
404            None,
405        )
406    }
407
408    // Map one operation event into a compact audit receipt.
409    fn from_operation(
410        event: &'static str,
411        operation: RestoreApplyJournalOperation,
412        state: &'static str,
413        updated_at: Option<String>,
414        command: Option<RestoreApplyRunnerCommand>,
415        status: Option<String>,
416    ) -> Self {
417        Self {
418            event,
419            sequence: operation.sequence,
420            operation: operation.operation,
421            target_canister: operation.target_canister,
422            state,
423            updated_at,
424            command,
425            status,
426        }
427    }
428}
429
430impl RestoreRunExecutedOperation {
431    // Build a completed executed-operation summary row from a runner operation.
432    fn completed(
433        operation: RestoreApplyJournalOperation,
434        command: RestoreApplyRunnerCommand,
435        status: String,
436    ) -> Self {
437        Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_COMPLETED)
438    }
439
440    // Build a failed executed-operation summary row from a runner operation.
441    fn failed(
442        operation: RestoreApplyJournalOperation,
443        command: RestoreApplyRunnerCommand,
444        status: String,
445    ) -> Self {
446        Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_FAILED)
447    }
448
449    // Map a journal operation into the compact runner execution row.
450    fn from_operation(
451        operation: RestoreApplyJournalOperation,
452        command: RestoreApplyRunnerCommand,
453        status: String,
454        state: &'static str,
455    ) -> Self {
456        Self {
457            sequence: operation.sequence,
458            operation: operation.operation,
459            target_canister: operation.target_canister,
460            command,
461            status,
462            state,
463        }
464    }
465}
466
467///
468/// RestoreRunResponseMode
469///
470
471struct RestoreRunResponseMode {
472    run_mode: &'static str,
473    dry_run: bool,
474    execute: bool,
475    unclaim_pending: bool,
476    stopped_reason: &'static str,
477    next_action: &'static str,
478}
479
480impl RestoreRunResponseMode {
481    // Build a response mode from the stable JSON mode flags and action labels.
482    const fn new(
483        run_mode: &'static str,
484        dry_run: bool,
485        execute: bool,
486        unclaim_pending: bool,
487        stopped_reason: &'static str,
488        next_action: &'static str,
489    ) -> Self {
490        Self {
491            run_mode,
492            dry_run,
493            execute,
494            unclaim_pending,
495            stopped_reason,
496            next_action,
497        }
498    }
499
500    // Build a dry-run response mode with a computed stop reason and action.
501    const fn dry_run(stopped_reason: &'static str, next_action: &'static str) -> Self {
502        Self::new(
503            RESTORE_RUN_MODE_DRY_RUN,
504            true,
505            false,
506            false,
507            stopped_reason,
508            next_action,
509        )
510    }
511
512    // Build an execute response mode with a computed stop reason and action.
513    const fn execute(stopped_reason: &'static str, next_action: &'static str) -> Self {
514        Self::new(
515            RESTORE_RUN_MODE_EXECUTE,
516            false,
517            true,
518            false,
519            stopped_reason,
520            next_action,
521        )
522    }
523
524    // Build the pending-operation recovery response mode.
525    const fn unclaim_pending(next_action: &'static str) -> Self {
526        Self::new(
527            RESTORE_RUN_MODE_UNCLAIM_PENDING,
528            false,
529            false,
530            true,
531            RESTORE_RUN_STOPPED_RECOVERED_PENDING,
532            next_action,
533        )
534    }
535}
536
537///
538/// RestoreRunPreparedOperation
539///
540
541struct RestoreRunPreparedOperation {
542    operation: RestoreApplyJournalOperation,
543    command: RestoreApplyRunnerCommand,
544    sequence: usize,
545    attempt: usize,
546}
547
548///
549/// RestoreRunStepOutcome
550///
551
552enum RestoreRunStepOutcome {
553    Completed {
554        executed_operation: RestoreRunExecutedOperation,
555        operation_receipt: RestoreRunOperationReceipt,
556    },
557    Failed {
558        executed_operation: RestoreRunExecutedOperation,
559        operation_receipt: RestoreRunOperationReceipt,
560        status: String,
561    },
562}
563
564/// Build a no-mutation native restore runner preview from a journal file.
565pub fn restore_run_dry_run(
566    config: &RestoreRunnerConfig,
567) -> Result<RestoreRunResponse, RestoreRunnerError> {
568    let journal = read_apply_journal_file(&config.journal)?;
569    let report = journal.report();
570    let preview = journal.next_command_preview_with_config(&config.command);
571    let stopped_reason = restore_run_stopped_reason(&report, false, false);
572    let next_action = restore_run_next_action(&report, false);
573
574    let mut response = RestoreRunResponse::from_report(
575        journal.backup_id,
576        report,
577        RestoreRunResponseMode::dry_run(stopped_reason, next_action),
578    );
579    response.set_requested_state_updated_at(config.updated_at.as_ref());
580    response.operation_available = Some(preview.operation_available);
581    response.command_available = Some(preview.command_available);
582    response.command = preview.command;
583    Ok(response)
584}
585
586/// Recover an interrupted restore runner by unclaiming the pending operation.
587pub fn restore_run_unclaim_pending(
588    config: &RestoreRunnerConfig,
589) -> Result<RestoreRunResponse, RestoreRunnerError> {
590    let _lock = RestoreJournalLock::acquire(&config.journal)?;
591    let mut journal = read_apply_journal_file(&config.journal)?;
592    let recovered_operation = journal
593        .next_transition_operation()
594        .filter(|operation| operation.state == RestoreApplyOperationState::Pending)
595        .cloned()
596        .ok_or(RestoreApplyJournalError::NoPendingOperation)?;
597
598    let recovered_updated_at = state_updated_at(config.updated_at.as_ref());
599    journal.mark_next_operation_ready_at(Some(recovered_updated_at.clone()))?;
600    write_apply_journal_file(&config.journal, &journal)?;
601
602    let report = journal.report();
603    let next_action = restore_run_next_action(&report, true);
604    let mut response = RestoreRunResponse::from_report(
605        journal.backup_id,
606        report,
607        RestoreRunResponseMode::unclaim_pending(next_action),
608    );
609    response.set_requested_state_updated_at(config.updated_at.as_ref());
610    response.set_operation_receipts(vec![RestoreRunOperationReceipt::recovered_pending(
611        recovered_operation.clone(),
612        Some(recovered_updated_at),
613    )]);
614    response.recovered_operation = Some(recovered_operation);
615    Ok(response)
616}
617
618/// Execute ready restore apply journal operations through an injected command executor.
619pub fn restore_run_execute_with_executor(
620    config: &RestoreRunnerConfig,
621    executor: &mut impl RestoreRunnerCommandExecutor,
622) -> Result<RestoreRunResponse, RestoreRunnerError> {
623    let run = restore_run_execute_result_with_executor(config, executor)?;
624    if let Some(error) = run.error {
625        return Err(error);
626    }
627
628    Ok(run.response)
629}
630
631// Execute ready restore apply operations and retain any deferred runner error.
632pub fn restore_run_execute_result_with_executor(
633    config: &RestoreRunnerConfig,
634    executor: &mut impl RestoreRunnerCommandExecutor,
635) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
636    let _lock = RestoreJournalLock::acquire(&config.journal)?;
637    let mut journal = read_apply_journal_file(&config.journal)?;
638    let mut executed_operations = Vec::new();
639    let mut operation_receipts = Vec::new();
640
641    loop {
642        let report = journal.report();
643        let max_steps_reached =
644            restore_run_max_steps_reached(config, executed_operations.len(), &report);
645        if report.complete || max_steps_reached {
646            return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
647                &journal,
648                executed_operations,
649                operation_receipts,
650                max_steps_reached,
651                config.updated_at.as_ref(),
652            )));
653        }
654
655        enforce_restore_run_executable(&journal, &report)?;
656        let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
657        let sequence = prepared.sequence;
658        match restore_run_execute_prepared_operation(config, executor, &mut journal, prepared)? {
659            RestoreRunStepOutcome::Completed {
660                executed_operation,
661                operation_receipt,
662            } => {
663                executed_operations.push(executed_operation);
664                operation_receipts.push(operation_receipt);
665            }
666            RestoreRunStepOutcome::Failed {
667                executed_operation,
668                operation_receipt,
669                status,
670            } => {
671                executed_operations.push(executed_operation);
672                operation_receipts.push(operation_receipt);
673                let response = restore_run_execute_summary(
674                    &journal,
675                    executed_operations,
676                    operation_receipts,
677                    false,
678                    config.updated_at.as_ref(),
679                );
680                return Ok(RestoreRunnerOutcome {
681                    response,
682                    error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
683                });
684            }
685        }
686    }
687}
688
689// Claim the next renderable operation and persist the pending state.
690fn restore_run_prepare_next_operation(
691    config: &RestoreRunnerConfig,
692    journal: &mut RestoreApplyJournal,
693) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
694    let preview = journal.next_command_preview_with_config(&config.command);
695    enforce_restore_run_command_available(&preview)?;
696
697    let operation = preview
698        .operation
699        .clone()
700        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
701    let command = preview
702        .command
703        .clone()
704        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
705    let sequence = operation.sequence;
706    let attempt = journal
707        .operation_receipts
708        .iter()
709        .filter(|receipt| receipt.sequence == sequence)
710        .count()
711        + 1;
712
713    enforce_apply_claim_sequence(sequence, journal)?;
714    journal
715        .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
716    write_apply_journal_file(&config.journal, journal)?;
717
718    Ok(RestoreRunPreparedOperation {
719        operation,
720        command,
721        sequence,
722        attempt,
723    })
724}
725
726// Execute one claimed operation and commit either success or failure.
727fn restore_run_execute_prepared_operation(
728    config: &RestoreRunnerConfig,
729    executor: &mut impl RestoreRunnerCommandExecutor,
730    journal: &mut RestoreApplyJournal,
731    prepared: RestoreRunPreparedOperation,
732) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
733    if prepared.command.requires_stopped_canister
734        && let Some(outcome) = enforce_stopped_canister_precondition(
735            config,
736            executor,
737            &prepared.operation,
738            prepared.attempt,
739            config.updated_at.as_ref(),
740        )?
741    {
742        return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
743    }
744
745    let output = executor.execute(&prepared.command)?;
746    let status_label = output.status;
747    let output_pair = RestoreApplyCommandOutputPair::from_bytes(
748        &output.stdout,
749        &output.stderr,
750        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
751    );
752
753    if output.success {
754        let uploaded_snapshot_id =
755            parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout));
756        return restore_run_commit_command_success(
757            config,
758            journal,
759            prepared,
760            status_label,
761            output_pair,
762            uploaded_snapshot_id,
763        );
764    }
765
766    restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
767}
768
769// Commit a stopped-canister precondition failure for a claimed operation.
770fn restore_run_commit_precondition_failure(
771    config: &RestoreRunnerConfig,
772    journal: &mut RestoreApplyJournal,
773    prepared: RestoreRunPreparedOperation,
774    outcome: RestoreStoppedPreconditionFailure,
775) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
776    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
777    journal.mark_operation_failed_at(
778        prepared.sequence,
779        outcome.failure_reason.clone(),
780        Some(failed_updated_at.clone()),
781    )?;
782    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
783        &prepared.operation,
784        outcome.command.clone(),
785        outcome.status_label.clone(),
786        Some(failed_updated_at.clone()),
787        outcome.output,
788        prepared.attempt,
789        outcome.failure_reason,
790    ))?;
791    write_apply_journal_file(&config.journal, journal)?;
792
793    Ok(RestoreRunStepOutcome::Failed {
794        executed_operation: RestoreRunExecutedOperation::failed(
795            prepared.operation.clone(),
796            outcome.command.clone(),
797            outcome.status_label.clone(),
798        ),
799        operation_receipt: RestoreRunOperationReceipt::failed(
800            prepared.operation,
801            outcome.command,
802            outcome.status_label,
803            Some(failed_updated_at),
804        ),
805        status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
806    })
807}
808
809// Commit a successful process command for a claimed operation.
810fn restore_run_commit_command_success(
811    config: &RestoreRunnerConfig,
812    journal: &mut RestoreApplyJournal,
813    prepared: RestoreRunPreparedOperation,
814    status_label: String,
815    output_pair: RestoreApplyCommandOutputPair,
816    uploaded_snapshot_id: Option<String>,
817) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
818    let completed_updated_at = state_updated_at(config.updated_at.as_ref());
819    journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
820    if prepared.operation.operation != RestoreApplyOperationKind::UploadSnapshot
821        || uploaded_snapshot_id.is_some()
822    {
823        journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
824            &prepared.operation,
825            prepared.command.clone(),
826            status_label.clone(),
827            Some(completed_updated_at.clone()),
828            output_pair,
829            prepared.attempt,
830            uploaded_snapshot_id,
831        ))?;
832    }
833    write_apply_journal_file(&config.journal, journal)?;
834
835    Ok(RestoreRunStepOutcome::Completed {
836        executed_operation: RestoreRunExecutedOperation::completed(
837            prepared.operation.clone(),
838            prepared.command.clone(),
839            status_label.clone(),
840        ),
841        operation_receipt: RestoreRunOperationReceipt::completed(
842            prepared.operation,
843            prepared.command,
844            status_label,
845            Some(completed_updated_at),
846        ),
847    })
848}
849
850// Commit a failed process command for a claimed operation.
851fn restore_run_commit_command_failure(
852    config: &RestoreRunnerConfig,
853    journal: &mut RestoreApplyJournal,
854    prepared: RestoreRunPreparedOperation,
855    status_label: String,
856    output_pair: RestoreApplyCommandOutputPair,
857) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
858    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
859    let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
860    journal.mark_operation_failed_at(
861        prepared.sequence,
862        failure_reason.clone(),
863        Some(failed_updated_at.clone()),
864    )?;
865    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
866        &prepared.operation,
867        prepared.command.clone(),
868        status_label.clone(),
869        Some(failed_updated_at.clone()),
870        output_pair,
871        prepared.attempt,
872        failure_reason,
873    ))?;
874    write_apply_journal_file(&config.journal, journal)?;
875
876    Ok(RestoreRunStepOutcome::Failed {
877        executed_operation: RestoreRunExecutedOperation::failed(
878            prepared.operation.clone(),
879            prepared.command.clone(),
880            status_label.clone(),
881        ),
882        operation_receipt: RestoreRunOperationReceipt::failed(
883            prepared.operation,
884            prepared.command,
885            status_label.clone(),
886            Some(failed_updated_at),
887        ),
888        status: status_label,
889    })
890}
891
892// Verify a stopped-canister command precondition before running a mutating load.
893fn enforce_stopped_canister_precondition(
894    config: &RestoreRunnerConfig,
895    executor: &mut impl RestoreRunnerCommandExecutor,
896    operation: &RestoreApplyJournalOperation,
897    attempt: usize,
898    updated_at: Option<&String>,
899) -> Result<Option<RestoreStoppedPreconditionFailure>, RestoreRunnerError> {
900    let command = stopped_canister_status_command(config, operation);
901    let output = executor.execute(&command)?;
902    let status_label = output.status;
903    let output_pair = RestoreApplyCommandOutputPair::from_bytes(
904        &output.stdout,
905        &output.stderr,
906        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
907    );
908    if output.success && status_output_reports_stopped(&output_pair) {
909        return Ok(None);
910    }
911
912    Ok(Some(RestoreStoppedPreconditionFailure {
913        command,
914        status_label,
915        output: output_pair,
916        failure_reason: format!(
917            "{RESTORE_RUN_STOPPED_PRECONDITION_FAILED}-attempt-{attempt}-{}",
918            state_updated_at(updated_at)
919        ),
920    }))
921}
922
923// Build the non-mutating status command used to prove stopped-canister state.
924fn stopped_canister_status_command(
925    config: &RestoreRunnerConfig,
926    operation: &RestoreApplyJournalOperation,
927) -> RestoreApplyRunnerCommand {
928    let mut args = vec!["canister".to_string()];
929    if let Some(network) = &config.command.network {
930        args.push("--network".to_string());
931        args.push(network.clone());
932    }
933    args.push("status".to_string());
934    args.push(operation.target_canister.clone());
935
936    RestoreApplyRunnerCommand {
937        program: config.command.program.clone(),
938        args,
939        mutates: false,
940        requires_stopped_canister: false,
941        note: "proves the target canister is stopped before snapshot load".to_string(),
942    }
943}
944
945// Detect stopped status from bounded dfx status output.
946fn status_output_reports_stopped(output: &RestoreApplyCommandOutputPair) -> bool {
947    output.stdout.text.contains("Status: Stopped")
948        || output.stdout.text.contains("status: stopped")
949        || output.stderr.text.contains("Status: Stopped")
950        || output.stderr.text.contains("status: stopped")
951}
952
953// Check whether execute mode has reached its requested operation batch size.
954fn restore_run_max_steps_reached(
955    config: &RestoreRunnerConfig,
956    executed_operation_count: usize,
957    report: &RestoreApplyJournalReport,
958) -> bool {
959    config.max_steps == Some(executed_operation_count) && !report.complete
960}
961
962// Build the final native runner execution summary.
963fn restore_run_execute_summary(
964    journal: &RestoreApplyJournal,
965    executed_operations: Vec<RestoreRunExecutedOperation>,
966    operation_receipts: Vec<RestoreRunOperationReceipt>,
967    max_steps_reached: bool,
968    requested_state_updated_at: Option<&String>,
969) -> RestoreRunResponse {
970    let report = journal.report();
971    let executed_operation_count = executed_operations.len();
972    let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
973    let next_action = restore_run_next_action(&report, false);
974
975    let mut response = RestoreRunResponse::from_report(
976        journal.backup_id.clone(),
977        report,
978        RestoreRunResponseMode::execute(stopped_reason, next_action),
979    );
980    response.set_requested_state_updated_at(requested_state_updated_at);
981    response.max_steps_reached = Some(max_steps_reached);
982    response.executed_operation_count = Some(executed_operation_count);
983    response.executed_operations = executed_operations;
984    response.set_operation_receipts(operation_receipts);
985    response
986}
987
988// Classify why the native runner stopped for operator summaries.
989const fn restore_run_stopped_reason(
990    report: &RestoreApplyJournalReport,
991    max_steps_reached: bool,
992    executed: bool,
993) -> &'static str {
994    if report.complete {
995        return RESTORE_RUN_STOPPED_COMPLETE;
996    }
997    if report.failed_operations > 0 {
998        return RESTORE_RUN_STOPPED_COMMAND_FAILED;
999    }
1000    if report.pending_operations > 0 {
1001        return RESTORE_RUN_STOPPED_PENDING;
1002    }
1003    if !report.ready || report.blocked_operations > 0 {
1004        return RESTORE_RUN_STOPPED_BLOCKED;
1005    }
1006    if max_steps_reached {
1007        return RESTORE_RUN_STOPPED_MAX_STEPS;
1008    }
1009    if executed {
1010        return RESTORE_RUN_STOPPED_READY;
1011    }
1012    RESTORE_RUN_STOPPED_PREVIEW
1013}
1014
1015// Recommend the next operator action for the native runner summary.
1016const fn restore_run_next_action(
1017    report: &RestoreApplyJournalReport,
1018    recovered_pending: bool,
1019) -> &'static str {
1020    if report.complete {
1021        return RESTORE_RUN_ACTION_DONE;
1022    }
1023    if report.failed_operations > 0 {
1024        return RESTORE_RUN_ACTION_INSPECT_FAILED;
1025    }
1026    if report.pending_operations > 0 {
1027        return RESTORE_RUN_ACTION_UNCLAIM_PENDING;
1028    }
1029    if !report.ready || report.blocked_operations > 0 {
1030        return RESTORE_RUN_ACTION_FIX_BLOCKED;
1031    }
1032    if recovered_pending {
1033        return RESTORE_RUN_ACTION_RERUN;
1034    }
1035    RESTORE_RUN_ACTION_RERUN
1036}
1037
1038// Ensure the journal can be advanced by the native restore runner.
1039fn enforce_restore_run_executable(
1040    journal: &RestoreApplyJournal,
1041    report: &RestoreApplyJournalReport,
1042) -> Result<(), RestoreRunnerError> {
1043    if report.pending_operations > 0 {
1044        return Err(RestoreRunnerError::Pending {
1045            backup_id: report.backup_id.clone(),
1046            pending_operations: report.pending_operations,
1047            next_transition_sequence: report
1048                .next_transition
1049                .as_ref()
1050                .map(|operation| operation.sequence),
1051        });
1052    }
1053
1054    if report.failed_operations > 0 {
1055        return Err(RestoreRunnerError::Failed {
1056            backup_id: report.backup_id.clone(),
1057            failed_operations: report.failed_operations,
1058        });
1059    }
1060
1061    if report.ready {
1062        return Ok(());
1063    }
1064
1065    Err(RestoreRunnerError::NotReady {
1066        backup_id: journal.backup_id.clone(),
1067        reasons: report.blocked_reasons.clone(),
1068    })
1069}
1070
1071// Convert an unavailable native runner command into the shared fail-closed error.
1072fn enforce_restore_run_command_available(
1073    preview: &RestoreApplyCommandPreview,
1074) -> Result<(), RestoreRunnerError> {
1075    if preview.command_available {
1076        return Ok(());
1077    }
1078
1079    Err(restore_command_unavailable_error(preview))
1080}
1081
1082// Build a shared command-unavailable error from a preview.
1083fn restore_command_unavailable_error(preview: &RestoreApplyCommandPreview) -> RestoreRunnerError {
1084    RestoreRunnerError::CommandUnavailable {
1085        backup_id: preview.backup_id.clone(),
1086        operation_available: preview.operation_available,
1087        complete: preview.complete,
1088        blocked_reasons: preview.blocked_reasons.clone(),
1089    }
1090}
1091
1092// Extract the uploaded target snapshot ID from dfx upload output.
1093pub fn parse_uploaded_snapshot_id(output: &str) -> Option<String> {
1094    output
1095        .lines()
1096        .filter_map(|line| line.split_once(':').map(|(_, value)| value.trim()))
1097        .find(|value| !value.is_empty())
1098        .map(str::to_string)
1099}
1100
1101// Ensure a runner claim still matches the operation it previewed.
1102fn enforce_apply_claim_sequence(
1103    expected: usize,
1104    journal: &RestoreApplyJournal,
1105) -> Result<(), RestoreRunnerError> {
1106    let actual = journal
1107        .next_transition_operation()
1108        .map(|operation| operation.sequence);
1109
1110    if actual == Some(expected) {
1111        return Ok(());
1112    }
1113
1114    Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
1115}
1116
1117// Read and validate a restore apply journal from disk.
1118fn read_apply_journal_file(path: &Path) -> Result<RestoreApplyJournal, RestoreRunnerError> {
1119    let data = fs::read_to_string(path)?;
1120    let journal: RestoreApplyJournal = serde_json::from_str(&data)?;
1121    journal.validate()?;
1122    Ok(journal)
1123}
1124
1125// Return the caller-supplied journal update marker or the current timestamp.
1126fn state_updated_at(updated_at: Option<&String>) -> String {
1127    updated_at.cloned().unwrap_or_else(current_timestamp_marker)
1128}
1129
1130// Persist the restore apply journal to its canonical runner path.
1131fn write_apply_journal_file(
1132    path: &Path,
1133    journal: &RestoreApplyJournal,
1134) -> Result<(), RestoreRunnerError> {
1135    let data = serde_json::to_vec_pretty(journal)?;
1136    fs::write(path, data)?;
1137    Ok(())
1138}
1139
1140///
1141/// RestoreJournalLock
1142///
1143
1144struct RestoreJournalLock {
1145    path: PathBuf,
1146}
1147
1148impl RestoreJournalLock {
1149    // Acquire an atomic sidecar lock for mutating restore runner operations.
1150    fn acquire(journal_path: &Path) -> Result<Self, RestoreRunnerError> {
1151        let path = journal_lock_path(journal_path);
1152        match fs::OpenOptions::new()
1153            .write(true)
1154            .create_new(true)
1155            .open(&path)
1156        {
1157            Ok(mut file) => {
1158                writeln!(file, "pid={}", std::process::id())?;
1159                Ok(Self { path })
1160            }
1161            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
1162                Err(RestoreRunnerError::JournalLocked {
1163                    lock_path: path.to_string_lossy().to_string(),
1164                })
1165            }
1166            Err(error) => Err(error.into()),
1167        }
1168    }
1169}
1170
1171impl Drop for RestoreJournalLock {
1172    // Release the sidecar lock when the mutating command completes or fails.
1173    fn drop(&mut self) {
1174        let _ = fs::remove_file(&self.path);
1175    }
1176}
1177
1178// Derive the sidecar lock path for one apply journal.
1179fn journal_lock_path(path: &Path) -> PathBuf {
1180    let mut lock_path = path.as_os_str().to_os_string();
1181    lock_path.push(".lock");
1182    PathBuf::from(lock_path)
1183}
1184
1185#[cfg(test)]
1186mod tests {
1187    use super::*;
1188
1189    // Ensure stopped-canister status parsing accepts current dfx-style output.
1190    #[test]
1191    fn status_output_reports_stopped_status() {
1192        let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Stopped\n", b"", 1024);
1193
1194        assert!(status_output_reports_stopped(&output));
1195    }
1196
1197    // Ensure running status output does not satisfy snapshot-load preconditions.
1198    #[test]
1199    fn status_output_rejects_running_status() {
1200        let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Running\n", b"", 1024);
1201
1202        assert!(!status_output_reports_stopped(&output));
1203    }
1204}