Skip to main content

canic_backup/restore/runner/
mod.rs

1use super::{
2    RestoreApplyCommandConfig, RestoreApplyCommandOutputPair, RestoreApplyCommandPreview,
3    RestoreApplyJournal, RestoreApplyJournalError, RestoreApplyJournalOperation,
4    RestoreApplyJournalReport, RestoreApplyOperationKind, RestoreApplyOperationKindCounts,
5    RestoreApplyOperationReceipt, RestoreApplyOperationState, RestoreApplyPendingSummary,
6    RestoreApplyProgressSummary, RestoreApplyReportOperation, RestoreApplyReportOutcome,
7    RestoreApplyRunnerCommand,
8};
9use crate::timestamp::current_timestamp_marker;
10use serde::Serialize;
11use std::{
12    fs,
13    io::{self, Write},
14    path::{Path, PathBuf},
15    process::Command as ProcessCommand,
16};
17use thiserror::Error as ThisError;
18
19const RESTORE_RUN_MODE_DRY_RUN: &str = "dry-run";
20const RESTORE_RUN_MODE_EXECUTE: &str = "execute";
21const RESTORE_RUN_MODE_UNCLAIM_PENDING: &str = "unclaim-pending";
22
23const RESTORE_RUN_STOPPED_BLOCKED: &str = "blocked";
24const RESTORE_RUN_STOPPED_COMMAND_FAILED: &str = "command-failed";
25const RESTORE_RUN_STOPPED_COMPLETE: &str = "complete";
26const RESTORE_RUN_STOPPED_MAX_STEPS: &str = "max-steps-reached";
27const RESTORE_RUN_STOPPED_PENDING: &str = "pending";
28const RESTORE_RUN_STOPPED_PREVIEW: &str = "preview";
29const RESTORE_RUN_STOPPED_READY: &str = "ready";
30const RESTORE_RUN_STOPPED_RECOVERED_PENDING: &str = "recovered-pending";
31
32const RESTORE_RUN_ACTION_DONE: &str = "done";
33const RESTORE_RUN_ACTION_FIX_BLOCKED: &str = "fix-blocked-journal";
34const RESTORE_RUN_ACTION_INSPECT_FAILED: &str = "inspect-failed-operation";
35const RESTORE_RUN_ACTION_RERUN: &str = "rerun";
36const RESTORE_RUN_ACTION_UNCLAIM_PENDING: &str = "unclaim-pending";
37
38pub const RESTORE_RUN_RECEIPT_COMPLETED: &str = "command-completed";
39pub const RESTORE_RUN_RECEIPT_FAILED: &str = "command-failed";
40pub const RESTORE_RUN_RECEIPT_RECOVERED_PENDING: &str = "pending-recovered";
41
42const RESTORE_RUN_EXECUTED_COMPLETED: &str = "completed";
43const RESTORE_RUN_EXECUTED_FAILED: &str = "failed";
44const RESTORE_RUN_RECEIPT_STATE_READY: &str = "ready";
45const RESTORE_RUN_COMMAND_EXIT_PREFIX: &str = "runner-command-exit";
46const RESTORE_RUN_STOPPED_PRECONDITION_FAILED: &str = "stopped-precondition-failed";
47const RESTORE_RUN_RESPONSE_VERSION: u16 = 1;
48const RESTORE_RUN_OUTPUT_RECEIPT_LIMIT: usize = 64 * 1024;
49
50///
51/// RestoreRunnerConfig
52///
53
54#[derive(Clone, Debug, Eq, PartialEq)]
55pub struct RestoreRunnerConfig {
56    pub journal: PathBuf,
57    pub command: RestoreApplyCommandConfig,
58    pub max_steps: Option<usize>,
59    pub updated_at: Option<String>,
60}
61
62///
63/// RestoreRunnerError
64///
65
66#[derive(Debug, ThisError)]
67pub enum RestoreRunnerError {
68    #[error("restore run command failed for operation {sequence}: status={status}")]
69    CommandFailed { sequence: usize, status: String },
70
71    #[error("restore apply journal is locked: {lock_path}")]
72    JournalLocked { lock_path: String },
73
74    #[error(
75        "restore apply journal for backup {backup_id} has pending operations: pending={pending_operations}, next={next_transition_sequence:?}"
76    )]
77    Pending {
78        backup_id: String,
79        pending_operations: usize,
80        next_transition_sequence: Option<usize>,
81    },
82
83    #[error(
84        "restore apply journal for backup {backup_id} has failed operations: failed={failed_operations}"
85    )]
86    Failed {
87        backup_id: String,
88        failed_operations: usize,
89    },
90
91    #[error("restore apply journal for backup {backup_id} is not ready: reasons={reasons:?}")]
92    NotReady {
93        backup_id: String,
94        reasons: Vec<String>,
95    },
96
97    #[error(
98        "restore apply journal for backup {backup_id} has no executable command: operation_available={operation_available}, complete={complete}, blocked_reasons={blocked_reasons:?}"
99    )]
100    CommandUnavailable {
101        backup_id: String,
102        operation_available: bool,
103        complete: bool,
104        blocked_reasons: Vec<String>,
105    },
106
107    #[error(
108        "restore apply journal next operation changed before claim: expected={expected}, actual={actual:?}"
109    )]
110    ClaimSequenceMismatch {
111        expected: usize,
112        actual: Option<usize>,
113    },
114
115    #[error(transparent)]
116    Io(#[from] std::io::Error),
117
118    #[error(transparent)]
119    Json(#[from] serde_json::Error),
120
121    #[error(transparent)]
122    Journal(#[from] RestoreApplyJournalError),
123}
124
125///
126/// RestoreRunResponse
127///
128
129#[derive(Clone, Debug, Serialize)]
130#[expect(
131    clippy::struct_excessive_bools,
132    reason = "Runner response exposes stable JSON status flags for operators and CI"
133)]
134pub struct RestoreRunResponse {
135    pub run_version: u16,
136    pub backup_id: String,
137    pub run_mode: &'static str,
138    pub dry_run: bool,
139    pub execute: bool,
140    pub unclaim_pending: bool,
141    pub stopped_reason: &'static str,
142    pub next_action: &'static str,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub requested_state_updated_at: Option<String>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub max_steps_reached: Option<bool>,
147    #[serde(default, skip_serializing_if = "Vec::is_empty")]
148    pub executed_operations: Vec<RestoreRunExecutedOperation>,
149    #[serde(default, skip_serializing_if = "Vec::is_empty")]
150    pub operation_receipts: Vec<RestoreRunOperationReceipt>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub operation_receipt_count: Option<usize>,
153    pub operation_receipt_summary: RestoreRunReceiptSummary,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub executed_operation_count: Option<usize>,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub recovered_operation: Option<RestoreApplyJournalOperation>,
158    pub ready: bool,
159    pub complete: bool,
160    pub attention_required: bool,
161    pub outcome: RestoreApplyReportOutcome,
162    pub operation_count: usize,
163    pub operation_counts: RestoreApplyOperationKindCounts,
164    pub progress: RestoreApplyProgressSummary,
165    pub pending_summary: RestoreApplyPendingSummary,
166    pub pending_operations: usize,
167    pub ready_operations: usize,
168    pub blocked_operations: usize,
169    pub completed_operations: usize,
170    pub failed_operations: usize,
171    pub blocked_reasons: Vec<String>,
172    pub next_transition: Option<RestoreApplyReportOperation>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub operation_available: Option<bool>,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub command_available: Option<bool>,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub command: Option<RestoreApplyRunnerCommand>,
179}
180
181impl RestoreRunResponse {
182    // Build the shared native runner response fields from an apply journal report.
183    fn from_report(
184        backup_id: String,
185        report: RestoreApplyJournalReport,
186        mode: RestoreRunResponseMode,
187    ) -> Self {
188        Self {
189            run_version: RESTORE_RUN_RESPONSE_VERSION,
190            backup_id,
191            run_mode: mode.run_mode,
192            dry_run: mode.dry_run,
193            execute: mode.execute,
194            unclaim_pending: mode.unclaim_pending,
195            stopped_reason: mode.stopped_reason,
196            next_action: mode.next_action,
197            requested_state_updated_at: None,
198            max_steps_reached: None,
199            executed_operations: Vec::new(),
200            operation_receipts: Vec::new(),
201            operation_receipt_count: Some(0),
202            operation_receipt_summary: RestoreRunReceiptSummary::default(),
203            executed_operation_count: None,
204            recovered_operation: None,
205            ready: report.ready,
206            complete: report.complete,
207            attention_required: report.attention_required,
208            outcome: report.outcome,
209            operation_count: report.operation_count,
210            operation_counts: report.operation_counts,
211            progress: report.progress,
212            pending_summary: report.pending_summary,
213            pending_operations: report.pending_operations,
214            ready_operations: report.ready_operations,
215            blocked_operations: report.blocked_operations,
216            completed_operations: report.completed_operations,
217            failed_operations: report.failed_operations,
218            blocked_reasons: report.blocked_reasons,
219            next_transition: report.next_transition,
220            operation_available: None,
221            command_available: None,
222            command: None,
223        }
224    }
225
226    // Replace the detailed receipt stream and refresh the compact counters.
227    fn set_operation_receipts(&mut self, receipts: Vec<RestoreRunOperationReceipt>) {
228        self.operation_receipt_summary = RestoreRunReceiptSummary::from_receipts(&receipts);
229        self.operation_receipt_count = Some(receipts.len());
230        self.operation_receipts = receipts;
231    }
232
233    // Echo the caller-provided state marker for receipt-free runner summaries.
234    fn set_requested_state_updated_at(&mut self, updated_at: Option<&String>) {
235        self.requested_state_updated_at = updated_at.cloned();
236    }
237}
238
239///
240/// RestoreRunReceiptSummary
241///
242
243#[derive(Clone, Debug, Default, Serialize)]
244pub struct RestoreRunReceiptSummary {
245    pub total_receipts: usize,
246    pub command_completed: usize,
247    pub command_failed: usize,
248    pub pending_recovered: usize,
249}
250
251///
252/// RestoreRunOperationReceipt
253///
254
255#[derive(Clone, Debug, Serialize)]
256pub struct RestoreRunOperationReceipt {
257    pub event: &'static str,
258    pub sequence: usize,
259    pub operation: RestoreApplyOperationKind,
260    pub target_canister: String,
261    pub state: &'static str,
262    #[serde(skip_serializing_if = "Option::is_none")]
263    pub updated_at: Option<String>,
264    #[serde(skip_serializing_if = "Option::is_none")]
265    pub command: Option<RestoreApplyRunnerCommand>,
266    #[serde(skip_serializing_if = "Option::is_none")]
267    pub status: Option<String>,
268}
269
270///
271/// RestoreRunExecutedOperation
272///
273
274#[derive(Clone, Debug, Serialize)]
275pub struct RestoreRunExecutedOperation {
276    pub sequence: usize,
277    pub operation: RestoreApplyOperationKind,
278    pub target_canister: String,
279    pub command: RestoreApplyRunnerCommand,
280    pub status: String,
281    pub state: &'static str,
282}
283
284///
285/// RestoreRunnerOutcome
286///
287
288pub struct RestoreRunnerOutcome {
289    pub response: RestoreRunResponse,
290    pub error: Option<RestoreRunnerError>,
291}
292
293impl RestoreRunnerOutcome {
294    // Build a successful runner response with no deferred error.
295    const fn ok(response: RestoreRunResponse) -> Self {
296        Self {
297            response,
298            error: None,
299        }
300    }
301}
302
303///
304/// RestoreStoppedPreconditionFailure
305///
306
307struct RestoreStoppedPreconditionFailure {
308    command: RestoreApplyRunnerCommand,
309    status_label: String,
310    output: RestoreApplyCommandOutputPair,
311    failure_reason: String,
312}
313
314impl RestoreRunReceiptSummary {
315    // Count restore runner receipt classes for script-friendly summaries.
316    fn from_receipts(receipts: &[RestoreRunOperationReceipt]) -> Self {
317        let mut summary = Self {
318            total_receipts: receipts.len(),
319            ..Self::default()
320        };
321
322        for receipt in receipts {
323            match receipt.event {
324                RESTORE_RUN_RECEIPT_COMPLETED => summary.command_completed += 1,
325                RESTORE_RUN_RECEIPT_FAILED => summary.command_failed += 1,
326                RESTORE_RUN_RECEIPT_RECOVERED_PENDING => summary.pending_recovered += 1,
327                _ => {}
328            }
329        }
330
331        summary
332    }
333}
334
335impl RestoreRunOperationReceipt {
336    // Build a receipt for a completed runner command.
337    fn completed(
338        operation: RestoreApplyJournalOperation,
339        command: RestoreApplyRunnerCommand,
340        status: String,
341        updated_at: Option<String>,
342    ) -> Self {
343        Self::from_operation(
344            RESTORE_RUN_RECEIPT_COMPLETED,
345            operation,
346            RESTORE_RUN_EXECUTED_COMPLETED,
347            updated_at,
348            Some(command),
349            Some(status),
350        )
351    }
352
353    // Build a receipt for a failed runner command.
354    fn failed(
355        operation: RestoreApplyJournalOperation,
356        command: RestoreApplyRunnerCommand,
357        status: String,
358        updated_at: Option<String>,
359    ) -> Self {
360        Self::from_operation(
361            RESTORE_RUN_RECEIPT_FAILED,
362            operation,
363            RESTORE_RUN_EXECUTED_FAILED,
364            updated_at,
365            Some(command),
366            Some(status),
367        )
368    }
369
370    // Build a receipt for a recovered pending operation.
371    fn recovered_pending(
372        operation: RestoreApplyJournalOperation,
373        updated_at: Option<String>,
374    ) -> Self {
375        Self::from_operation(
376            RESTORE_RUN_RECEIPT_RECOVERED_PENDING,
377            operation,
378            RESTORE_RUN_RECEIPT_STATE_READY,
379            updated_at,
380            None,
381            None,
382        )
383    }
384
385    // Map one operation event into a compact audit receipt.
386    fn from_operation(
387        event: &'static str,
388        operation: RestoreApplyJournalOperation,
389        state: &'static str,
390        updated_at: Option<String>,
391        command: Option<RestoreApplyRunnerCommand>,
392        status: Option<String>,
393    ) -> Self {
394        Self {
395            event,
396            sequence: operation.sequence,
397            operation: operation.operation,
398            target_canister: operation.target_canister,
399            state,
400            updated_at,
401            command,
402            status,
403        }
404    }
405}
406
407impl RestoreRunExecutedOperation {
408    // Build a completed executed-operation summary row from a runner operation.
409    fn completed(
410        operation: RestoreApplyJournalOperation,
411        command: RestoreApplyRunnerCommand,
412        status: String,
413    ) -> Self {
414        Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_COMPLETED)
415    }
416
417    // Build a failed executed-operation summary row from a runner operation.
418    fn failed(
419        operation: RestoreApplyJournalOperation,
420        command: RestoreApplyRunnerCommand,
421        status: String,
422    ) -> Self {
423        Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_FAILED)
424    }
425
426    // Map a journal operation into the compact runner execution row.
427    fn from_operation(
428        operation: RestoreApplyJournalOperation,
429        command: RestoreApplyRunnerCommand,
430        status: String,
431        state: &'static str,
432    ) -> Self {
433        Self {
434            sequence: operation.sequence,
435            operation: operation.operation,
436            target_canister: operation.target_canister,
437            command,
438            status,
439            state,
440        }
441    }
442}
443
444///
445/// RestoreRunResponseMode
446///
447
448struct RestoreRunResponseMode {
449    run_mode: &'static str,
450    dry_run: bool,
451    execute: bool,
452    unclaim_pending: bool,
453    stopped_reason: &'static str,
454    next_action: &'static str,
455}
456
457impl RestoreRunResponseMode {
458    // Build a response mode from the stable JSON mode flags and action labels.
459    const fn new(
460        run_mode: &'static str,
461        dry_run: bool,
462        execute: bool,
463        unclaim_pending: bool,
464        stopped_reason: &'static str,
465        next_action: &'static str,
466    ) -> Self {
467        Self {
468            run_mode,
469            dry_run,
470            execute,
471            unclaim_pending,
472            stopped_reason,
473            next_action,
474        }
475    }
476
477    // Build a dry-run response mode with a computed stop reason and action.
478    const fn dry_run(stopped_reason: &'static str, next_action: &'static str) -> Self {
479        Self::new(
480            RESTORE_RUN_MODE_DRY_RUN,
481            true,
482            false,
483            false,
484            stopped_reason,
485            next_action,
486        )
487    }
488
489    // Build an execute response mode with a computed stop reason and action.
490    const fn execute(stopped_reason: &'static str, next_action: &'static str) -> Self {
491        Self::new(
492            RESTORE_RUN_MODE_EXECUTE,
493            false,
494            true,
495            false,
496            stopped_reason,
497            next_action,
498        )
499    }
500
501    // Build the pending-operation recovery response mode.
502    const fn unclaim_pending(next_action: &'static str) -> Self {
503        Self::new(
504            RESTORE_RUN_MODE_UNCLAIM_PENDING,
505            false,
506            false,
507            true,
508            RESTORE_RUN_STOPPED_RECOVERED_PENDING,
509            next_action,
510        )
511    }
512}
513
514///
515/// RestoreRunPreparedOperation
516///
517
518struct RestoreRunPreparedOperation {
519    operation: RestoreApplyJournalOperation,
520    command: RestoreApplyRunnerCommand,
521    sequence: usize,
522    attempt: usize,
523}
524
525///
526/// RestoreRunStepOutcome
527///
528
529enum RestoreRunStepOutcome {
530    Completed {
531        executed_operation: RestoreRunExecutedOperation,
532        operation_receipt: RestoreRunOperationReceipt,
533    },
534    Failed {
535        executed_operation: RestoreRunExecutedOperation,
536        operation_receipt: RestoreRunOperationReceipt,
537        status: String,
538    },
539}
540
541/// Build a no-mutation native restore runner preview from a journal file.
542pub fn restore_run_dry_run(
543    config: &RestoreRunnerConfig,
544) -> Result<RestoreRunResponse, RestoreRunnerError> {
545    let journal = read_apply_journal_file(&config.journal)?;
546    let report = journal.report();
547    let preview = journal.next_command_preview_with_config(&config.command);
548    let stopped_reason = restore_run_stopped_reason(&report, false, false);
549    let next_action = restore_run_next_action(&report, false);
550
551    let mut response = RestoreRunResponse::from_report(
552        journal.backup_id,
553        report,
554        RestoreRunResponseMode::dry_run(stopped_reason, next_action),
555    );
556    response.set_requested_state_updated_at(config.updated_at.as_ref());
557    response.operation_available = Some(preview.operation_available);
558    response.command_available = Some(preview.command_available);
559    response.command = preview.command;
560    Ok(response)
561}
562
563/// Recover an interrupted restore runner by unclaiming the pending operation.
564pub fn restore_run_unclaim_pending(
565    config: &RestoreRunnerConfig,
566) -> Result<RestoreRunResponse, RestoreRunnerError> {
567    let _lock = RestoreJournalLock::acquire(&config.journal)?;
568    let mut journal = read_apply_journal_file(&config.journal)?;
569    let recovered_operation = journal
570        .next_transition_operation()
571        .filter(|operation| operation.state == RestoreApplyOperationState::Pending)
572        .cloned()
573        .ok_or(RestoreApplyJournalError::NoPendingOperation)?;
574
575    let recovered_updated_at = state_updated_at(config.updated_at.as_ref());
576    journal.mark_next_operation_ready_at(Some(recovered_updated_at.clone()))?;
577    write_apply_journal_file(&config.journal, &journal)?;
578
579    let report = journal.report();
580    let next_action = restore_run_next_action(&report, true);
581    let mut response = RestoreRunResponse::from_report(
582        journal.backup_id,
583        report,
584        RestoreRunResponseMode::unclaim_pending(next_action),
585    );
586    response.set_requested_state_updated_at(config.updated_at.as_ref());
587    response.set_operation_receipts(vec![RestoreRunOperationReceipt::recovered_pending(
588        recovered_operation.clone(),
589        Some(recovered_updated_at),
590    )]);
591    response.recovered_operation = Some(recovered_operation);
592    Ok(response)
593}
594
595/// Execute ready restore apply journal operations through generated runner commands.
596pub fn restore_run_execute(
597    config: &RestoreRunnerConfig,
598) -> Result<RestoreRunResponse, RestoreRunnerError> {
599    let run = restore_run_execute_result(config)?;
600    if let Some(error) = run.error {
601        return Err(error);
602    }
603
604    Ok(run.response)
605}
606
607// Execute ready restore apply operations and retain any deferred runner error.
608pub fn restore_run_execute_result(
609    config: &RestoreRunnerConfig,
610) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
611    let _lock = RestoreJournalLock::acquire(&config.journal)?;
612    let mut journal = read_apply_journal_file(&config.journal)?;
613    let mut executed_operations = Vec::new();
614    let mut operation_receipts = Vec::new();
615
616    loop {
617        let report = journal.report();
618        let max_steps_reached =
619            restore_run_max_steps_reached(config, executed_operations.len(), &report);
620        if report.complete || max_steps_reached {
621            return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
622                &journal,
623                executed_operations,
624                operation_receipts,
625                max_steps_reached,
626                config.updated_at.as_ref(),
627            )));
628        }
629
630        enforce_restore_run_executable(&journal, &report)?;
631        let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
632        let sequence = prepared.sequence;
633        match restore_run_execute_prepared_operation(config, &mut journal, prepared)? {
634            RestoreRunStepOutcome::Completed {
635                executed_operation,
636                operation_receipt,
637            } => {
638                executed_operations.push(executed_operation);
639                operation_receipts.push(operation_receipt);
640            }
641            RestoreRunStepOutcome::Failed {
642                executed_operation,
643                operation_receipt,
644                status,
645            } => {
646                executed_operations.push(executed_operation);
647                operation_receipts.push(operation_receipt);
648                let response = restore_run_execute_summary(
649                    &journal,
650                    executed_operations,
651                    operation_receipts,
652                    false,
653                    config.updated_at.as_ref(),
654                );
655                return Ok(RestoreRunnerOutcome {
656                    response,
657                    error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
658                });
659            }
660        }
661    }
662}
663
664// Claim the next renderable operation and persist the pending state.
665fn restore_run_prepare_next_operation(
666    config: &RestoreRunnerConfig,
667    journal: &mut RestoreApplyJournal,
668) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
669    let preview = journal.next_command_preview_with_config(&config.command);
670    enforce_restore_run_command_available(&preview)?;
671
672    let operation = preview
673        .operation
674        .clone()
675        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
676    let command = preview
677        .command
678        .clone()
679        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
680    let sequence = operation.sequence;
681    let attempt = journal
682        .operation_receipts
683        .iter()
684        .filter(|receipt| receipt.sequence == sequence)
685        .count()
686        + 1;
687
688    enforce_apply_claim_sequence(sequence, journal)?;
689    journal
690        .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
691    write_apply_journal_file(&config.journal, journal)?;
692
693    Ok(RestoreRunPreparedOperation {
694        operation,
695        command,
696        sequence,
697        attempt,
698    })
699}
700
701// Execute one claimed operation and commit either success or failure.
702fn restore_run_execute_prepared_operation(
703    config: &RestoreRunnerConfig,
704    journal: &mut RestoreApplyJournal,
705    prepared: RestoreRunPreparedOperation,
706) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
707    if prepared.command.requires_stopped_canister
708        && let Some(outcome) = enforce_stopped_canister_precondition(
709            config,
710            &prepared.operation,
711            prepared.attempt,
712            config.updated_at.as_ref(),
713        )?
714    {
715        return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
716    }
717
718    let output = ProcessCommand::new(&prepared.command.program)
719        .args(&prepared.command.args)
720        .output()?;
721    let status_label = exit_status_label(output.status);
722    let output_pair = RestoreApplyCommandOutputPair::from_bytes(
723        &output.stdout,
724        &output.stderr,
725        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
726    );
727
728    if output.status.success() {
729        let uploaded_snapshot_id =
730            parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout));
731        return restore_run_commit_command_success(
732            config,
733            journal,
734            prepared,
735            status_label,
736            output_pair,
737            uploaded_snapshot_id,
738        );
739    }
740
741    restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
742}
743
744// Commit a stopped-canister precondition failure for a claimed operation.
745fn restore_run_commit_precondition_failure(
746    config: &RestoreRunnerConfig,
747    journal: &mut RestoreApplyJournal,
748    prepared: RestoreRunPreparedOperation,
749    outcome: RestoreStoppedPreconditionFailure,
750) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
751    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
752    journal.mark_operation_failed_at(
753        prepared.sequence,
754        outcome.failure_reason.clone(),
755        Some(failed_updated_at.clone()),
756    )?;
757    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
758        &prepared.operation,
759        outcome.command.clone(),
760        outcome.status_label.clone(),
761        Some(failed_updated_at.clone()),
762        outcome.output,
763        prepared.attempt,
764        outcome.failure_reason,
765    ))?;
766    write_apply_journal_file(&config.journal, journal)?;
767
768    Ok(RestoreRunStepOutcome::Failed {
769        executed_operation: RestoreRunExecutedOperation::failed(
770            prepared.operation.clone(),
771            outcome.command.clone(),
772            outcome.status_label.clone(),
773        ),
774        operation_receipt: RestoreRunOperationReceipt::failed(
775            prepared.operation,
776            outcome.command,
777            outcome.status_label,
778            Some(failed_updated_at),
779        ),
780        status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
781    })
782}
783
784// Commit a successful process command for a claimed operation.
785fn restore_run_commit_command_success(
786    config: &RestoreRunnerConfig,
787    journal: &mut RestoreApplyJournal,
788    prepared: RestoreRunPreparedOperation,
789    status_label: String,
790    output_pair: RestoreApplyCommandOutputPair,
791    uploaded_snapshot_id: Option<String>,
792) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
793    let completed_updated_at = state_updated_at(config.updated_at.as_ref());
794    journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
795    if prepared.operation.operation != RestoreApplyOperationKind::UploadSnapshot
796        || uploaded_snapshot_id.is_some()
797    {
798        journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
799            &prepared.operation,
800            prepared.command.clone(),
801            status_label.clone(),
802            Some(completed_updated_at.clone()),
803            output_pair,
804            prepared.attempt,
805            uploaded_snapshot_id,
806        ))?;
807    }
808    write_apply_journal_file(&config.journal, journal)?;
809
810    Ok(RestoreRunStepOutcome::Completed {
811        executed_operation: RestoreRunExecutedOperation::completed(
812            prepared.operation.clone(),
813            prepared.command.clone(),
814            status_label.clone(),
815        ),
816        operation_receipt: RestoreRunOperationReceipt::completed(
817            prepared.operation,
818            prepared.command,
819            status_label,
820            Some(completed_updated_at),
821        ),
822    })
823}
824
825// Commit a failed process command for a claimed operation.
826fn restore_run_commit_command_failure(
827    config: &RestoreRunnerConfig,
828    journal: &mut RestoreApplyJournal,
829    prepared: RestoreRunPreparedOperation,
830    status_label: String,
831    output_pair: RestoreApplyCommandOutputPair,
832) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
833    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
834    let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
835    journal.mark_operation_failed_at(
836        prepared.sequence,
837        failure_reason.clone(),
838        Some(failed_updated_at.clone()),
839    )?;
840    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
841        &prepared.operation,
842        prepared.command.clone(),
843        status_label.clone(),
844        Some(failed_updated_at.clone()),
845        output_pair,
846        prepared.attempt,
847        failure_reason,
848    ))?;
849    write_apply_journal_file(&config.journal, journal)?;
850
851    Ok(RestoreRunStepOutcome::Failed {
852        executed_operation: RestoreRunExecutedOperation::failed(
853            prepared.operation.clone(),
854            prepared.command.clone(),
855            status_label.clone(),
856        ),
857        operation_receipt: RestoreRunOperationReceipt::failed(
858            prepared.operation,
859            prepared.command,
860            status_label.clone(),
861            Some(failed_updated_at),
862        ),
863        status: status_label,
864    })
865}
866
867// Verify a stopped-canister command precondition before running a mutating load.
868fn enforce_stopped_canister_precondition(
869    config: &RestoreRunnerConfig,
870    operation: &RestoreApplyJournalOperation,
871    attempt: usize,
872    updated_at: Option<&String>,
873) -> Result<Option<RestoreStoppedPreconditionFailure>, RestoreRunnerError> {
874    let command = stopped_canister_status_command(config, operation);
875    let output = ProcessCommand::new(&command.program)
876        .args(&command.args)
877        .output()?;
878    let status_label = exit_status_label(output.status);
879    let output_pair = RestoreApplyCommandOutputPair::from_bytes(
880        &output.stdout,
881        &output.stderr,
882        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
883    );
884    if output.status.success() && status_output_reports_stopped(&output_pair) {
885        return Ok(None);
886    }
887
888    Ok(Some(RestoreStoppedPreconditionFailure {
889        command,
890        status_label,
891        output: output_pair,
892        failure_reason: format!(
893            "{RESTORE_RUN_STOPPED_PRECONDITION_FAILED}-attempt-{attempt}-{}",
894            state_updated_at(updated_at)
895        ),
896    }))
897}
898
899// Build the non-mutating status command used to prove stopped-canister state.
900fn stopped_canister_status_command(
901    config: &RestoreRunnerConfig,
902    operation: &RestoreApplyJournalOperation,
903) -> RestoreApplyRunnerCommand {
904    let mut args = vec!["canister".to_string()];
905    if let Some(network) = &config.command.network {
906        args.push("--network".to_string());
907        args.push(network.clone());
908    }
909    args.push("status".to_string());
910    args.push(operation.target_canister.clone());
911
912    RestoreApplyRunnerCommand {
913        program: config.command.program.clone(),
914        args,
915        mutates: false,
916        requires_stopped_canister: false,
917        note: "proves the target canister is stopped before snapshot load".to_string(),
918    }
919}
920
921// Detect stopped status from bounded dfx status output.
922fn status_output_reports_stopped(output: &RestoreApplyCommandOutputPair) -> bool {
923    output.stdout.text.contains("Status: Stopped")
924        || output.stdout.text.contains("status: stopped")
925        || output.stderr.text.contains("Status: Stopped")
926        || output.stderr.text.contains("status: stopped")
927}
928
929// Check whether execute mode has reached its requested operation batch size.
930fn restore_run_max_steps_reached(
931    config: &RestoreRunnerConfig,
932    executed_operation_count: usize,
933    report: &RestoreApplyJournalReport,
934) -> bool {
935    config.max_steps == Some(executed_operation_count) && !report.complete
936}
937
938// Build the final native runner execution summary.
939fn restore_run_execute_summary(
940    journal: &RestoreApplyJournal,
941    executed_operations: Vec<RestoreRunExecutedOperation>,
942    operation_receipts: Vec<RestoreRunOperationReceipt>,
943    max_steps_reached: bool,
944    requested_state_updated_at: Option<&String>,
945) -> RestoreRunResponse {
946    let report = journal.report();
947    let executed_operation_count = executed_operations.len();
948    let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
949    let next_action = restore_run_next_action(&report, false);
950
951    let mut response = RestoreRunResponse::from_report(
952        journal.backup_id.clone(),
953        report,
954        RestoreRunResponseMode::execute(stopped_reason, next_action),
955    );
956    response.set_requested_state_updated_at(requested_state_updated_at);
957    response.max_steps_reached = Some(max_steps_reached);
958    response.executed_operation_count = Some(executed_operation_count);
959    response.executed_operations = executed_operations;
960    response.set_operation_receipts(operation_receipts);
961    response
962}
963
964// Classify why the native runner stopped for operator summaries.
965const fn restore_run_stopped_reason(
966    report: &RestoreApplyJournalReport,
967    max_steps_reached: bool,
968    executed: bool,
969) -> &'static str {
970    if report.complete {
971        return RESTORE_RUN_STOPPED_COMPLETE;
972    }
973    if report.failed_operations > 0 {
974        return RESTORE_RUN_STOPPED_COMMAND_FAILED;
975    }
976    if report.pending_operations > 0 {
977        return RESTORE_RUN_STOPPED_PENDING;
978    }
979    if !report.ready || report.blocked_operations > 0 {
980        return RESTORE_RUN_STOPPED_BLOCKED;
981    }
982    if max_steps_reached {
983        return RESTORE_RUN_STOPPED_MAX_STEPS;
984    }
985    if executed {
986        return RESTORE_RUN_STOPPED_READY;
987    }
988    RESTORE_RUN_STOPPED_PREVIEW
989}
990
991// Recommend the next operator action for the native runner summary.
992const fn restore_run_next_action(
993    report: &RestoreApplyJournalReport,
994    recovered_pending: bool,
995) -> &'static str {
996    if report.complete {
997        return RESTORE_RUN_ACTION_DONE;
998    }
999    if report.failed_operations > 0 {
1000        return RESTORE_RUN_ACTION_INSPECT_FAILED;
1001    }
1002    if report.pending_operations > 0 {
1003        return RESTORE_RUN_ACTION_UNCLAIM_PENDING;
1004    }
1005    if !report.ready || report.blocked_operations > 0 {
1006        return RESTORE_RUN_ACTION_FIX_BLOCKED;
1007    }
1008    if recovered_pending {
1009        return RESTORE_RUN_ACTION_RERUN;
1010    }
1011    RESTORE_RUN_ACTION_RERUN
1012}
1013
1014// Ensure the journal can be advanced by the native restore runner.
1015fn enforce_restore_run_executable(
1016    journal: &RestoreApplyJournal,
1017    report: &RestoreApplyJournalReport,
1018) -> Result<(), RestoreRunnerError> {
1019    if report.pending_operations > 0 {
1020        return Err(RestoreRunnerError::Pending {
1021            backup_id: report.backup_id.clone(),
1022            pending_operations: report.pending_operations,
1023            next_transition_sequence: report
1024                .next_transition
1025                .as_ref()
1026                .map(|operation| operation.sequence),
1027        });
1028    }
1029
1030    if report.failed_operations > 0 {
1031        return Err(RestoreRunnerError::Failed {
1032            backup_id: report.backup_id.clone(),
1033            failed_operations: report.failed_operations,
1034        });
1035    }
1036
1037    if report.ready {
1038        return Ok(());
1039    }
1040
1041    Err(RestoreRunnerError::NotReady {
1042        backup_id: journal.backup_id.clone(),
1043        reasons: report.blocked_reasons.clone(),
1044    })
1045}
1046
1047// Convert an unavailable native runner command into the shared fail-closed error.
1048fn enforce_restore_run_command_available(
1049    preview: &RestoreApplyCommandPreview,
1050) -> Result<(), RestoreRunnerError> {
1051    if preview.command_available {
1052        return Ok(());
1053    }
1054
1055    Err(restore_command_unavailable_error(preview))
1056}
1057
1058// Build a shared command-unavailable error from a preview.
1059fn restore_command_unavailable_error(preview: &RestoreApplyCommandPreview) -> RestoreRunnerError {
1060    RestoreRunnerError::CommandUnavailable {
1061        backup_id: preview.backup_id.clone(),
1062        operation_available: preview.operation_available,
1063        complete: preview.complete,
1064        blocked_reasons: preview.blocked_reasons.clone(),
1065    }
1066}
1067
1068// Render process exit status without relying on platform-specific internals.
1069fn exit_status_label(status: std::process::ExitStatus) -> String {
1070    status
1071        .code()
1072        .map_or_else(|| "signal".to_string(), |code| code.to_string())
1073}
1074
1075// Extract the uploaded target snapshot ID from dfx upload output.
1076pub fn parse_uploaded_snapshot_id(output: &str) -> Option<String> {
1077    output
1078        .lines()
1079        .filter_map(|line| line.split_once(':').map(|(_, value)| value.trim()))
1080        .find(|value| !value.is_empty())
1081        .map(str::to_string)
1082}
1083
1084// Ensure a runner claim still matches the operation it previewed.
1085fn enforce_apply_claim_sequence(
1086    expected: usize,
1087    journal: &RestoreApplyJournal,
1088) -> Result<(), RestoreRunnerError> {
1089    let actual = journal
1090        .next_transition_operation()
1091        .map(|operation| operation.sequence);
1092
1093    if actual == Some(expected) {
1094        return Ok(());
1095    }
1096
1097    Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
1098}
1099
1100// Read and validate a restore apply journal from disk.
1101fn read_apply_journal_file(path: &Path) -> Result<RestoreApplyJournal, RestoreRunnerError> {
1102    let data = fs::read_to_string(path)?;
1103    let journal: RestoreApplyJournal = serde_json::from_str(&data)?;
1104    journal.validate()?;
1105    Ok(journal)
1106}
1107
1108// Return the caller-supplied journal update marker or the current timestamp.
1109fn state_updated_at(updated_at: Option<&String>) -> String {
1110    updated_at.cloned().unwrap_or_else(current_timestamp_marker)
1111}
1112
1113// Persist the restore apply journal to its canonical runner path.
1114fn write_apply_journal_file(
1115    path: &Path,
1116    journal: &RestoreApplyJournal,
1117) -> Result<(), RestoreRunnerError> {
1118    let data = serde_json::to_vec_pretty(journal)?;
1119    fs::write(path, data)?;
1120    Ok(())
1121}
1122
1123///
1124/// RestoreJournalLock
1125///
1126
1127struct RestoreJournalLock {
1128    path: PathBuf,
1129}
1130
1131impl RestoreJournalLock {
1132    // Acquire an atomic sidecar lock for mutating restore runner operations.
1133    fn acquire(journal_path: &Path) -> Result<Self, RestoreRunnerError> {
1134        let path = journal_lock_path(journal_path);
1135        match fs::OpenOptions::new()
1136            .write(true)
1137            .create_new(true)
1138            .open(&path)
1139        {
1140            Ok(mut file) => {
1141                writeln!(file, "pid={}", std::process::id())?;
1142                Ok(Self { path })
1143            }
1144            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
1145                Err(RestoreRunnerError::JournalLocked {
1146                    lock_path: path.to_string_lossy().to_string(),
1147                })
1148            }
1149            Err(error) => Err(error.into()),
1150        }
1151    }
1152}
1153
1154impl Drop for RestoreJournalLock {
1155    // Release the sidecar lock when the mutating command completes or fails.
1156    fn drop(&mut self) {
1157        let _ = fs::remove_file(&self.path);
1158    }
1159}
1160
1161// Derive the sidecar lock path for one apply journal.
1162fn journal_lock_path(path: &Path) -> PathBuf {
1163    let mut lock_path = path.as_os_str().to_os_string();
1164    lock_path.push(".lock");
1165    PathBuf::from(lock_path)
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170    use super::*;
1171
1172    // Ensure stopped-canister status parsing accepts current dfx-style output.
1173    #[test]
1174    fn status_output_reports_stopped_status() {
1175        let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Stopped\n", b"", 1024);
1176
1177        assert!(status_output_reports_stopped(&output));
1178    }
1179
1180    // Ensure running status output does not satisfy snapshot-load preconditions.
1181    #[test]
1182    fn status_output_rejects_running_status() {
1183        let output = RestoreApplyCommandOutputPair::from_bytes(b"Status: Running\n", b"", 1024);
1184
1185        assert!(!status_output_reports_stopped(&output));
1186    }
1187}