Skip to main content

canic_backup/restore/runner/
mod.rs

1use super::{
2    RestoreApplyCommandConfig, RestoreApplyCommandOutputPair, RestoreApplyCommandPreview,
3    RestoreApplyJournal, RestoreApplyJournalError, RestoreApplyJournalOperation,
4    RestoreApplyJournalReport, RestoreApplyOperationKind, RestoreApplyOperationKindCounts,
5    RestoreApplyOperationReceipt, RestoreApplyOperationState, RestoreApplyPendingSummary,
6    RestoreApplyProgressSummary, RestoreApplyReportOperation, RestoreApplyReportOutcome,
7    RestoreApplyRunnerCommand,
8};
9use serde::Serialize;
10use std::{
11    fs,
12    io::{self, Write},
13    path::{Path, PathBuf},
14    process::Command as ProcessCommand,
15};
16use thiserror::Error as ThisError;
17
18const RESTORE_RUN_MODE_DRY_RUN: &str = "dry-run";
19const RESTORE_RUN_MODE_EXECUTE: &str = "execute";
20const RESTORE_RUN_MODE_UNCLAIM_PENDING: &str = "unclaim-pending";
21
22const RESTORE_RUN_STOPPED_BLOCKED: &str = "blocked";
23const RESTORE_RUN_STOPPED_COMMAND_FAILED: &str = "command-failed";
24const RESTORE_RUN_STOPPED_COMPLETE: &str = "complete";
25const RESTORE_RUN_STOPPED_MAX_STEPS: &str = "max-steps-reached";
26const RESTORE_RUN_STOPPED_PENDING: &str = "pending";
27const RESTORE_RUN_STOPPED_PREVIEW: &str = "preview";
28const RESTORE_RUN_STOPPED_READY: &str = "ready";
29const RESTORE_RUN_STOPPED_RECOVERED_PENDING: &str = "recovered-pending";
30
31const RESTORE_RUN_ACTION_DONE: &str = "done";
32const RESTORE_RUN_ACTION_FIX_BLOCKED: &str = "fix-blocked-journal";
33const RESTORE_RUN_ACTION_INSPECT_FAILED: &str = "inspect-failed-operation";
34const RESTORE_RUN_ACTION_RERUN: &str = "rerun";
35const RESTORE_RUN_ACTION_UNCLAIM_PENDING: &str = "unclaim-pending";
36
37pub const RESTORE_RUN_RECEIPT_COMPLETED: &str = "command-completed";
38pub const RESTORE_RUN_RECEIPT_FAILED: &str = "command-failed";
39pub const RESTORE_RUN_RECEIPT_RECOVERED_PENDING: &str = "pending-recovered";
40
41const RESTORE_RUN_EXECUTED_COMPLETED: &str = "completed";
42const RESTORE_RUN_EXECUTED_FAILED: &str = "failed";
43const RESTORE_RUN_RECEIPT_STATE_READY: &str = "ready";
44const RESTORE_RUN_COMMAND_EXIT_PREFIX: &str = "runner-command-exit";
45const RESTORE_RUN_RESPONSE_VERSION: u16 = 1;
46const RESTORE_RUN_OUTPUT_RECEIPT_LIMIT: usize = 64 * 1024;
47
48///
49/// RestoreRunnerConfig
50///
51
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub struct RestoreRunnerConfig {
54    pub journal: PathBuf,
55    pub command: RestoreApplyCommandConfig,
56    pub max_steps: Option<usize>,
57    pub updated_at: Option<String>,
58}
59
60///
61/// RestoreRunnerError
62///
63
64#[derive(Debug, ThisError)]
65pub enum RestoreRunnerError {
66    #[error("restore run command failed for operation {sequence}: status={status}")]
67    CommandFailed { sequence: usize, status: String },
68
69    #[error("restore apply journal is locked: {lock_path}")]
70    JournalLocked { lock_path: String },
71
72    #[error(
73        "restore apply journal for backup {backup_id} has pending operations: pending={pending_operations}, next={next_transition_sequence:?}"
74    )]
75    Pending {
76        backup_id: String,
77        pending_operations: usize,
78        next_transition_sequence: Option<usize>,
79    },
80
81    #[error(
82        "restore apply journal for backup {backup_id} has failed operations: failed={failed_operations}"
83    )]
84    Failed {
85        backup_id: String,
86        failed_operations: usize,
87    },
88
89    #[error("restore apply journal for backup {backup_id} is not ready: reasons={reasons:?}")]
90    NotReady {
91        backup_id: String,
92        reasons: Vec<String>,
93    },
94
95    #[error(
96        "restore apply journal for backup {backup_id} has no executable command: operation_available={operation_available}, complete={complete}, blocked_reasons={blocked_reasons:?}"
97    )]
98    CommandUnavailable {
99        backup_id: String,
100        operation_available: bool,
101        complete: bool,
102        blocked_reasons: Vec<String>,
103    },
104
105    #[error(
106        "restore apply journal next operation changed before claim: expected={expected}, actual={actual:?}"
107    )]
108    ClaimSequenceMismatch {
109        expected: usize,
110        actual: Option<usize>,
111    },
112
113    #[error(transparent)]
114    Io(#[from] std::io::Error),
115
116    #[error(transparent)]
117    Json(#[from] serde_json::Error),
118
119    #[error(transparent)]
120    Journal(#[from] RestoreApplyJournalError),
121}
122
123///
124/// RestoreRunResponse
125///
126
127#[derive(Clone, Debug, Serialize)]
128#[expect(
129    clippy::struct_excessive_bools,
130    reason = "Runner response exposes stable JSON status flags for operators and CI"
131)]
132pub struct RestoreRunResponse {
133    pub run_version: u16,
134    pub backup_id: String,
135    pub run_mode: &'static str,
136    pub dry_run: bool,
137    pub execute: bool,
138    pub unclaim_pending: bool,
139    pub stopped_reason: &'static str,
140    pub next_action: &'static str,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub requested_state_updated_at: Option<String>,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub max_steps_reached: Option<bool>,
145    #[serde(default, skip_serializing_if = "Vec::is_empty")]
146    pub executed_operations: Vec<RestoreRunExecutedOperation>,
147    #[serde(default, skip_serializing_if = "Vec::is_empty")]
148    pub operation_receipts: Vec<RestoreRunOperationReceipt>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub operation_receipt_count: Option<usize>,
151    pub operation_receipt_summary: RestoreRunReceiptSummary,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub executed_operation_count: Option<usize>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub recovered_operation: Option<RestoreApplyJournalOperation>,
156    pub batch_summary: RestoreRunBatchSummary,
157    pub ready: bool,
158    pub complete: bool,
159    pub attention_required: bool,
160    pub outcome: RestoreApplyReportOutcome,
161    pub operation_count: usize,
162    pub operation_counts: RestoreApplyOperationKindCounts,
163    pub operation_counts_supplied: bool,
164    pub progress: RestoreApplyProgressSummary,
165    pub pending_summary: RestoreApplyPendingSummary,
166    pub pending_operations: usize,
167    pub ready_operations: usize,
168    pub blocked_operations: usize,
169    pub completed_operations: usize,
170    pub failed_operations: usize,
171    pub blocked_reasons: Vec<String>,
172    pub next_transition: Option<RestoreApplyReportOperation>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub operation_available: Option<bool>,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub command_available: Option<bool>,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub command: Option<RestoreApplyRunnerCommand>,
179}
180
181impl RestoreRunResponse {
182    // Build the shared native runner response fields from an apply journal report.
183    fn from_report(
184        backup_id: String,
185        report: RestoreApplyJournalReport,
186        mode: RestoreRunResponseMode,
187    ) -> Self {
188        Self {
189            run_version: RESTORE_RUN_RESPONSE_VERSION,
190            backup_id,
191            run_mode: mode.run_mode,
192            dry_run: mode.dry_run,
193            execute: mode.execute,
194            unclaim_pending: mode.unclaim_pending,
195            stopped_reason: mode.stopped_reason,
196            next_action: mode.next_action,
197            requested_state_updated_at: None,
198            max_steps_reached: None,
199            executed_operations: Vec::new(),
200            operation_receipts: Vec::new(),
201            operation_receipt_count: Some(0),
202            operation_receipt_summary: RestoreRunReceiptSummary::default(),
203            executed_operation_count: None,
204            recovered_operation: None,
205            batch_summary: RestoreRunBatchSummary::from_counts(
206                RestoreRunBatchStart::new(
207                    None,
208                    report.ready_operations,
209                    report.progress.remaining_operations,
210                ),
211                0,
212                report.ready_operations,
213                report.progress.remaining_operations,
214                false,
215                report.complete,
216            ),
217            ready: report.ready,
218            complete: report.complete,
219            attention_required: report.attention_required,
220            outcome: report.outcome,
221            operation_count: report.operation_count,
222            operation_counts: report.operation_counts,
223            operation_counts_supplied: report.operation_counts_supplied,
224            progress: report.progress,
225            pending_summary: report.pending_summary,
226            pending_operations: report.pending_operations,
227            ready_operations: report.ready_operations,
228            blocked_operations: report.blocked_operations,
229            completed_operations: report.completed_operations,
230            failed_operations: report.failed_operations,
231            blocked_reasons: report.blocked_reasons,
232            next_transition: report.next_transition,
233            operation_available: None,
234            command_available: None,
235            command: None,
236        }
237    }
238
239    // Replace the detailed receipt stream and refresh the compact counters.
240    fn set_operation_receipts(&mut self, receipts: Vec<RestoreRunOperationReceipt>) {
241        self.operation_receipt_summary = RestoreRunReceiptSummary::from_receipts(&receipts);
242        self.operation_receipt_count = Some(receipts.len());
243        self.operation_receipts = receipts;
244    }
245
246    // Echo the caller-provided state marker for receipt-free runner summaries.
247    fn set_requested_state_updated_at(&mut self, updated_at: Option<&String>) {
248        self.requested_state_updated_at = updated_at.cloned();
249    }
250
251    // Refresh batch counters after a dry-run, recovery, or execute pass.
252    const fn set_batch_summary(
253        &mut self,
254        batch_start: RestoreRunBatchStart,
255        executed_operations: usize,
256        stopped_by_max_steps: bool,
257    ) {
258        self.batch_summary = RestoreRunBatchSummary::from_counts(
259            batch_start,
260            executed_operations,
261            self.ready_operations,
262            self.progress.remaining_operations,
263            stopped_by_max_steps,
264            self.complete,
265        );
266    }
267}
268
269///
270/// RestoreRunBatchSummary
271///
272
273#[derive(Clone, Debug, Serialize)]
274pub struct RestoreRunBatchSummary {
275    pub requested_max_steps: Option<usize>,
276    pub initial_ready_operations: usize,
277    pub initial_remaining_operations: usize,
278    pub executed_operations: usize,
279    pub remaining_ready_operations: usize,
280    pub remaining_operations: usize,
281    pub ready_operations_delta: isize,
282    pub remaining_operations_delta: isize,
283    pub stopped_by_max_steps: bool,
284    pub complete: bool,
285}
286
287///
288/// RestoreRunReceiptSummary
289///
290
291#[derive(Clone, Debug, Default, Serialize)]
292pub struct RestoreRunReceiptSummary {
293    pub total_receipts: usize,
294    pub command_completed: usize,
295    pub command_failed: usize,
296    pub pending_recovered: usize,
297}
298
299///
300/// RestoreRunOperationReceipt
301///
302
303#[derive(Clone, Debug, Serialize)]
304pub struct RestoreRunOperationReceipt {
305    pub event: &'static str,
306    pub sequence: usize,
307    pub operation: RestoreApplyOperationKind,
308    pub target_canister: String,
309    pub state: &'static str,
310    #[serde(skip_serializing_if = "Option::is_none")]
311    pub updated_at: Option<String>,
312    #[serde(skip_serializing_if = "Option::is_none")]
313    pub command: Option<RestoreApplyRunnerCommand>,
314    #[serde(skip_serializing_if = "Option::is_none")]
315    pub status: Option<String>,
316}
317
318///
319/// RestoreRunExecutedOperation
320///
321
322#[derive(Clone, Debug, Serialize)]
323pub struct RestoreRunExecutedOperation {
324    pub sequence: usize,
325    pub operation: RestoreApplyOperationKind,
326    pub target_canister: String,
327    pub command: RestoreApplyRunnerCommand,
328    pub status: String,
329    pub state: &'static str,
330}
331
332///
333/// RestoreRunnerOutcome
334///
335
336pub struct RestoreRunnerOutcome {
337    pub response: RestoreRunResponse,
338    pub error: Option<RestoreRunnerError>,
339}
340
341impl RestoreRunnerOutcome {
342    // Build a successful runner response with no deferred error.
343    const fn ok(response: RestoreRunResponse) -> Self {
344        Self {
345            response,
346            error: None,
347        }
348    }
349}
350
351///
352/// RestoreRunBatchStart
353///
354
355#[derive(Clone, Copy, Debug)]
356struct RestoreRunBatchStart {
357    requested_max_steps: Option<usize>,
358    initial_ready_operations: usize,
359    initial_remaining_operations: usize,
360}
361
362impl RestoreRunBatchStart {
363    // Capture the runner counters observed before a dry-run, recovery, or execute pass.
364    const fn new(
365        requested_max_steps: Option<usize>,
366        initial_ready_operations: usize,
367        initial_remaining_operations: usize,
368    ) -> Self {
369        Self {
370            requested_max_steps,
371            initial_ready_operations,
372            initial_remaining_operations,
373        }
374    }
375}
376
377impl RestoreRunBatchSummary {
378    // Build the compact batch counters shown in every native runner response.
379    const fn from_counts(
380        batch_start: RestoreRunBatchStart,
381        executed_operations: usize,
382        remaining_ready_operations: usize,
383        remaining_operations: usize,
384        stopped_by_max_steps: bool,
385        complete: bool,
386    ) -> Self {
387        Self {
388            requested_max_steps: batch_start.requested_max_steps,
389            initial_ready_operations: batch_start.initial_ready_operations,
390            initial_remaining_operations: batch_start.initial_remaining_operations,
391            executed_operations,
392            remaining_ready_operations,
393            remaining_operations,
394            ready_operations_delta: remaining_ready_operations.cast_signed()
395                - batch_start.initial_ready_operations.cast_signed(),
396            remaining_operations_delta: remaining_operations.cast_signed()
397                - batch_start.initial_remaining_operations.cast_signed(),
398            stopped_by_max_steps,
399            complete,
400        }
401    }
402}
403
404impl RestoreRunReceiptSummary {
405    // Count restore runner receipt classes for script-friendly summaries.
406    fn from_receipts(receipts: &[RestoreRunOperationReceipt]) -> Self {
407        let mut summary = Self {
408            total_receipts: receipts.len(),
409            ..Self::default()
410        };
411
412        for receipt in receipts {
413            match receipt.event {
414                RESTORE_RUN_RECEIPT_COMPLETED => summary.command_completed += 1,
415                RESTORE_RUN_RECEIPT_FAILED => summary.command_failed += 1,
416                RESTORE_RUN_RECEIPT_RECOVERED_PENDING => summary.pending_recovered += 1,
417                _ => {}
418            }
419        }
420
421        summary
422    }
423}
424
425impl RestoreRunOperationReceipt {
426    // Build a receipt for a completed runner command.
427    fn completed(
428        operation: RestoreApplyJournalOperation,
429        command: RestoreApplyRunnerCommand,
430        status: String,
431        updated_at: Option<String>,
432    ) -> Self {
433        Self::from_operation(
434            RESTORE_RUN_RECEIPT_COMPLETED,
435            operation,
436            RESTORE_RUN_EXECUTED_COMPLETED,
437            updated_at,
438            Some(command),
439            Some(status),
440        )
441    }
442
443    // Build a receipt for a failed runner command.
444    fn failed(
445        operation: RestoreApplyJournalOperation,
446        command: RestoreApplyRunnerCommand,
447        status: String,
448        updated_at: Option<String>,
449    ) -> Self {
450        Self::from_operation(
451            RESTORE_RUN_RECEIPT_FAILED,
452            operation,
453            RESTORE_RUN_EXECUTED_FAILED,
454            updated_at,
455            Some(command),
456            Some(status),
457        )
458    }
459
460    // Build a receipt for a recovered pending operation.
461    fn recovered_pending(
462        operation: RestoreApplyJournalOperation,
463        updated_at: Option<String>,
464    ) -> Self {
465        Self::from_operation(
466            RESTORE_RUN_RECEIPT_RECOVERED_PENDING,
467            operation,
468            RESTORE_RUN_RECEIPT_STATE_READY,
469            updated_at,
470            None,
471            None,
472        )
473    }
474
475    // Map one operation event into a compact audit receipt.
476    fn from_operation(
477        event: &'static str,
478        operation: RestoreApplyJournalOperation,
479        state: &'static str,
480        updated_at: Option<String>,
481        command: Option<RestoreApplyRunnerCommand>,
482        status: Option<String>,
483    ) -> Self {
484        Self {
485            event,
486            sequence: operation.sequence,
487            operation: operation.operation,
488            target_canister: operation.target_canister,
489            state,
490            updated_at,
491            command,
492            status,
493        }
494    }
495}
496
497impl RestoreRunExecutedOperation {
498    // Build a completed executed-operation summary row from a runner operation.
499    fn completed(
500        operation: RestoreApplyJournalOperation,
501        command: RestoreApplyRunnerCommand,
502        status: String,
503    ) -> Self {
504        Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_COMPLETED)
505    }
506
507    // Build a failed executed-operation summary row from a runner operation.
508    fn failed(
509        operation: RestoreApplyJournalOperation,
510        command: RestoreApplyRunnerCommand,
511        status: String,
512    ) -> Self {
513        Self::from_operation(operation, command, status, RESTORE_RUN_EXECUTED_FAILED)
514    }
515
516    // Map a journal operation into the compact runner execution row.
517    fn from_operation(
518        operation: RestoreApplyJournalOperation,
519        command: RestoreApplyRunnerCommand,
520        status: String,
521        state: &'static str,
522    ) -> Self {
523        Self {
524            sequence: operation.sequence,
525            operation: operation.operation,
526            target_canister: operation.target_canister,
527            command,
528            status,
529            state,
530        }
531    }
532}
533
534///
535/// RestoreRunResponseMode
536///
537
538struct RestoreRunResponseMode {
539    run_mode: &'static str,
540    dry_run: bool,
541    execute: bool,
542    unclaim_pending: bool,
543    stopped_reason: &'static str,
544    next_action: &'static str,
545}
546
547impl RestoreRunResponseMode {
548    // Build a response mode from the stable JSON mode flags and action labels.
549    const fn new(
550        run_mode: &'static str,
551        dry_run: bool,
552        execute: bool,
553        unclaim_pending: bool,
554        stopped_reason: &'static str,
555        next_action: &'static str,
556    ) -> Self {
557        Self {
558            run_mode,
559            dry_run,
560            execute,
561            unclaim_pending,
562            stopped_reason,
563            next_action,
564        }
565    }
566
567    // Build a dry-run response mode with a computed stop reason and action.
568    const fn dry_run(stopped_reason: &'static str, next_action: &'static str) -> Self {
569        Self::new(
570            RESTORE_RUN_MODE_DRY_RUN,
571            true,
572            false,
573            false,
574            stopped_reason,
575            next_action,
576        )
577    }
578
579    // Build an execute response mode with a computed stop reason and action.
580    const fn execute(stopped_reason: &'static str, next_action: &'static str) -> Self {
581        Self::new(
582            RESTORE_RUN_MODE_EXECUTE,
583            false,
584            true,
585            false,
586            stopped_reason,
587            next_action,
588        )
589    }
590
591    // Build the pending-operation recovery response mode.
592    const fn unclaim_pending(next_action: &'static str) -> Self {
593        Self::new(
594            RESTORE_RUN_MODE_UNCLAIM_PENDING,
595            false,
596            false,
597            true,
598            RESTORE_RUN_STOPPED_RECOVERED_PENDING,
599            next_action,
600        )
601    }
602}
603
604/// Build a no-mutation native restore runner preview from a journal file.
605pub fn restore_run_dry_run(
606    config: &RestoreRunnerConfig,
607) -> Result<RestoreRunResponse, RestoreRunnerError> {
608    let journal = read_apply_journal_file(&config.journal)?;
609    let report = journal.report();
610    let initial_ready_operations = report.ready_operations;
611    let initial_remaining_operations = report.progress.remaining_operations;
612    let preview = journal.next_command_preview_with_config(&config.command);
613    let stopped_reason = restore_run_stopped_reason(&report, false, false);
614    let next_action = restore_run_next_action(&report, false);
615
616    let mut response = RestoreRunResponse::from_report(
617        journal.backup_id,
618        report,
619        RestoreRunResponseMode::dry_run(stopped_reason, next_action),
620    );
621    response.set_requested_state_updated_at(config.updated_at.as_ref());
622    response.set_batch_summary(
623        RestoreRunBatchStart::new(
624            config.max_steps,
625            initial_ready_operations,
626            initial_remaining_operations,
627        ),
628        0,
629        false,
630    );
631    response.operation_available = Some(preview.operation_available);
632    response.command_available = Some(preview.command_available);
633    response.command = preview.command;
634    Ok(response)
635}
636
637/// Recover an interrupted restore runner by unclaiming the pending operation.
638pub fn restore_run_unclaim_pending(
639    config: &RestoreRunnerConfig,
640) -> Result<RestoreRunResponse, RestoreRunnerError> {
641    let _lock = RestoreJournalLock::acquire(&config.journal)?;
642    let mut journal = read_apply_journal_file(&config.journal)?;
643    let initial_report = journal.report();
644    let initial_ready_operations = initial_report.ready_operations;
645    let initial_remaining_operations = initial_report.progress.remaining_operations;
646    let recovered_operation = journal
647        .next_transition_operation()
648        .filter(|operation| operation.state == RestoreApplyOperationState::Pending)
649        .cloned()
650        .ok_or(RestoreApplyJournalError::NoPendingOperation)?;
651
652    let recovered_updated_at = state_updated_at(config.updated_at.as_ref());
653    journal.mark_next_operation_ready_at(Some(recovered_updated_at.clone()))?;
654    write_apply_journal_file(&config.journal, &journal)?;
655
656    let report = journal.report();
657    let next_action = restore_run_next_action(&report, true);
658    let mut response = RestoreRunResponse::from_report(
659        journal.backup_id,
660        report,
661        RestoreRunResponseMode::unclaim_pending(next_action),
662    );
663    response.set_requested_state_updated_at(config.updated_at.as_ref());
664    response.set_batch_summary(
665        RestoreRunBatchStart::new(
666            config.max_steps,
667            initial_ready_operations,
668            initial_remaining_operations,
669        ),
670        0,
671        false,
672    );
673    response.set_operation_receipts(vec![RestoreRunOperationReceipt::recovered_pending(
674        recovered_operation.clone(),
675        Some(recovered_updated_at),
676    )]);
677    response.recovered_operation = Some(recovered_operation);
678    Ok(response)
679}
680
681/// Execute ready restore apply journal operations through generated runner commands.
682pub fn restore_run_execute(
683    config: &RestoreRunnerConfig,
684) -> Result<RestoreRunResponse, RestoreRunnerError> {
685    let run = restore_run_execute_result(config)?;
686    if let Some(error) = run.error {
687        return Err(error);
688    }
689
690    Ok(run.response)
691}
692
693#[expect(
694    clippy::too_many_lines,
695    reason = "runner execution keeps claim, command, receipt, and journal commit steps together"
696)]
697// Execute ready restore apply operations and retain any deferred runner error.
698pub fn restore_run_execute_result(
699    config: &RestoreRunnerConfig,
700) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
701    let _lock = RestoreJournalLock::acquire(&config.journal)?;
702    let mut journal = read_apply_journal_file(&config.journal)?;
703    let initial_report = journal.report();
704    let batch_start = RestoreRunBatchStart::new(
705        config.max_steps,
706        initial_report.ready_operations,
707        initial_report.progress.remaining_operations,
708    );
709    let mut executed_operations = Vec::new();
710    let mut operation_receipts = Vec::new();
711
712    loop {
713        let report = journal.report();
714        let max_steps_reached =
715            restore_run_max_steps_reached(config, executed_operations.len(), &report);
716        if report.complete || max_steps_reached {
717            return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
718                &journal,
719                executed_operations,
720                operation_receipts,
721                max_steps_reached,
722                config.updated_at.as_ref(),
723                batch_start,
724            )));
725        }
726
727        enforce_restore_run_executable(&journal, &report)?;
728        let preview = journal.next_command_preview_with_config(&config.command);
729        enforce_restore_run_command_available(&preview)?;
730
731        let operation = preview
732            .operation
733            .clone()
734            .ok_or_else(|| restore_command_unavailable_error(&preview))?;
735        let command = preview
736            .command
737            .clone()
738            .ok_or_else(|| restore_command_unavailable_error(&preview))?;
739        let sequence = operation.sequence;
740        let attempt = journal
741            .operation_receipts
742            .iter()
743            .filter(|receipt| receipt.sequence == sequence)
744            .count()
745            + 1;
746
747        enforce_apply_claim_sequence(sequence, &journal)?;
748        journal.mark_operation_pending_at(
749            sequence,
750            Some(state_updated_at(config.updated_at.as_ref())),
751        )?;
752        write_apply_journal_file(&config.journal, &journal)?;
753
754        let output = ProcessCommand::new(&command.program)
755            .args(&command.args)
756            .output()?;
757        let status_label = exit_status_label(output.status);
758        let output_pair = RestoreApplyCommandOutputPair::from_bytes(
759            &output.stdout,
760            &output.stderr,
761            RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
762        );
763        if output.status.success() {
764            let uploaded_snapshot_id =
765                parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout));
766            let completed_updated_at = state_updated_at(config.updated_at.as_ref());
767            journal.mark_operation_completed_at(sequence, Some(completed_updated_at.clone()))?;
768            if operation.operation != RestoreApplyOperationKind::UploadSnapshot
769                || uploaded_snapshot_id.is_some()
770            {
771                journal.record_operation_receipt(
772                    RestoreApplyOperationReceipt::command_completed(
773                        &operation,
774                        command.clone(),
775                        status_label.clone(),
776                        Some(completed_updated_at.clone()),
777                        output_pair.clone(),
778                        attempt,
779                        uploaded_snapshot_id,
780                    ),
781                )?;
782            }
783            write_apply_journal_file(&config.journal, &journal)?;
784            executed_operations.push(RestoreRunExecutedOperation::completed(
785                operation.clone(),
786                command.clone(),
787                status_label.clone(),
788            ));
789            operation_receipts.push(RestoreRunOperationReceipt::completed(
790                operation,
791                command,
792                status_label,
793                Some(completed_updated_at),
794            ));
795            continue;
796        }
797
798        let failed_updated_at = state_updated_at(config.updated_at.as_ref());
799        let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
800        journal.mark_operation_failed_at(
801            sequence,
802            failure_reason.clone(),
803            Some(failed_updated_at.clone()),
804        )?;
805        journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
806            &operation,
807            command.clone(),
808            status_label.clone(),
809            Some(failed_updated_at.clone()),
810            output_pair,
811            attempt,
812            failure_reason,
813        ))?;
814        write_apply_journal_file(&config.journal, &journal)?;
815        executed_operations.push(RestoreRunExecutedOperation::failed(
816            operation.clone(),
817            command.clone(),
818            status_label.clone(),
819        ));
820        operation_receipts.push(RestoreRunOperationReceipt::failed(
821            operation,
822            command,
823            status_label.clone(),
824            Some(failed_updated_at),
825        ));
826        let response = restore_run_execute_summary(
827            &journal,
828            executed_operations,
829            operation_receipts,
830            false,
831            config.updated_at.as_ref(),
832            batch_start,
833        );
834        return Ok(RestoreRunnerOutcome {
835            response,
836            error: Some(RestoreRunnerError::CommandFailed {
837                sequence,
838                status: status_label,
839            }),
840        });
841    }
842}
843
844// Check whether execute mode has reached its requested operation batch size.
845fn restore_run_max_steps_reached(
846    config: &RestoreRunnerConfig,
847    executed_operation_count: usize,
848    report: &RestoreApplyJournalReport,
849) -> bool {
850    config.max_steps == Some(executed_operation_count) && !report.complete
851}
852
853// Build the final native runner execution summary.
854fn restore_run_execute_summary(
855    journal: &RestoreApplyJournal,
856    executed_operations: Vec<RestoreRunExecutedOperation>,
857    operation_receipts: Vec<RestoreRunOperationReceipt>,
858    max_steps_reached: bool,
859    requested_state_updated_at: Option<&String>,
860    batch_start: RestoreRunBatchStart,
861) -> RestoreRunResponse {
862    let report = journal.report();
863    let executed_operation_count = executed_operations.len();
864    let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
865    let next_action = restore_run_next_action(&report, false);
866
867    let mut response = RestoreRunResponse::from_report(
868        journal.backup_id.clone(),
869        report,
870        RestoreRunResponseMode::execute(stopped_reason, next_action),
871    );
872    response.set_requested_state_updated_at(requested_state_updated_at);
873    response.set_batch_summary(batch_start, executed_operation_count, max_steps_reached);
874    response.max_steps_reached = Some(max_steps_reached);
875    response.executed_operation_count = Some(executed_operation_count);
876    response.executed_operations = executed_operations;
877    response.set_operation_receipts(operation_receipts);
878    response
879}
880
881// Classify why the native runner stopped for operator summaries.
882const fn restore_run_stopped_reason(
883    report: &RestoreApplyJournalReport,
884    max_steps_reached: bool,
885    executed: bool,
886) -> &'static str {
887    if report.complete {
888        return RESTORE_RUN_STOPPED_COMPLETE;
889    }
890    if report.failed_operations > 0 {
891        return RESTORE_RUN_STOPPED_COMMAND_FAILED;
892    }
893    if report.pending_operations > 0 {
894        return RESTORE_RUN_STOPPED_PENDING;
895    }
896    if !report.ready || report.blocked_operations > 0 {
897        return RESTORE_RUN_STOPPED_BLOCKED;
898    }
899    if max_steps_reached {
900        return RESTORE_RUN_STOPPED_MAX_STEPS;
901    }
902    if executed {
903        return RESTORE_RUN_STOPPED_READY;
904    }
905    RESTORE_RUN_STOPPED_PREVIEW
906}
907
908// Recommend the next operator action for the native runner summary.
909const fn restore_run_next_action(
910    report: &RestoreApplyJournalReport,
911    recovered_pending: bool,
912) -> &'static str {
913    if report.complete {
914        return RESTORE_RUN_ACTION_DONE;
915    }
916    if report.failed_operations > 0 {
917        return RESTORE_RUN_ACTION_INSPECT_FAILED;
918    }
919    if report.pending_operations > 0 {
920        return RESTORE_RUN_ACTION_UNCLAIM_PENDING;
921    }
922    if !report.ready || report.blocked_operations > 0 {
923        return RESTORE_RUN_ACTION_FIX_BLOCKED;
924    }
925    if recovered_pending {
926        return RESTORE_RUN_ACTION_RERUN;
927    }
928    RESTORE_RUN_ACTION_RERUN
929}
930
931// Ensure the journal can be advanced by the native restore runner.
932fn enforce_restore_run_executable(
933    journal: &RestoreApplyJournal,
934    report: &RestoreApplyJournalReport,
935) -> Result<(), RestoreRunnerError> {
936    if report.pending_operations > 0 {
937        return Err(RestoreRunnerError::Pending {
938            backup_id: report.backup_id.clone(),
939            pending_operations: report.pending_operations,
940            next_transition_sequence: report
941                .next_transition
942                .as_ref()
943                .map(|operation| operation.sequence),
944        });
945    }
946
947    if report.failed_operations > 0 {
948        return Err(RestoreRunnerError::Failed {
949            backup_id: report.backup_id.clone(),
950            failed_operations: report.failed_operations,
951        });
952    }
953
954    if report.ready {
955        return Ok(());
956    }
957
958    Err(RestoreRunnerError::NotReady {
959        backup_id: journal.backup_id.clone(),
960        reasons: report.blocked_reasons.clone(),
961    })
962}
963
964// Convert an unavailable native runner command into the shared fail-closed error.
965fn enforce_restore_run_command_available(
966    preview: &RestoreApplyCommandPreview,
967) -> Result<(), RestoreRunnerError> {
968    if preview.command_available {
969        return Ok(());
970    }
971
972    Err(restore_command_unavailable_error(preview))
973}
974
975// Build a shared command-unavailable error from a preview.
976fn restore_command_unavailable_error(preview: &RestoreApplyCommandPreview) -> RestoreRunnerError {
977    RestoreRunnerError::CommandUnavailable {
978        backup_id: preview.backup_id.clone(),
979        operation_available: preview.operation_available,
980        complete: preview.complete,
981        blocked_reasons: preview.blocked_reasons.clone(),
982    }
983}
984
985// Render process exit status without relying on platform-specific internals.
986fn exit_status_label(status: std::process::ExitStatus) -> String {
987    status
988        .code()
989        .map_or_else(|| "signal".to_string(), |code| code.to_string())
990}
991
992// Extract the uploaded target snapshot ID from dfx upload output.
993pub fn parse_uploaded_snapshot_id(output: &str) -> Option<String> {
994    output
995        .lines()
996        .filter_map(|line| line.split_once(':').map(|(_, value)| value.trim()))
997        .find(|value| !value.is_empty())
998        .map(str::to_string)
999}
1000
1001// Ensure a runner claim still matches the operation it previewed.
1002fn enforce_apply_claim_sequence(
1003    expected: usize,
1004    journal: &RestoreApplyJournal,
1005) -> Result<(), RestoreRunnerError> {
1006    let actual = journal
1007        .next_transition_operation()
1008        .map(|operation| operation.sequence);
1009
1010    if actual == Some(expected) {
1011        return Ok(());
1012    }
1013
1014    Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
1015}
1016
1017// Read and validate a restore apply journal from disk.
1018fn read_apply_journal_file(path: &Path) -> Result<RestoreApplyJournal, RestoreRunnerError> {
1019    let data = fs::read_to_string(path)?;
1020    let journal: RestoreApplyJournal = serde_json::from_str(&data)?;
1021    journal.validate()?;
1022    Ok(journal)
1023}
1024
1025// Return the caller-supplied journal update marker or the current placeholder.
1026fn state_updated_at(updated_at: Option<&String>) -> String {
1027    updated_at.cloned().unwrap_or_else(timestamp_placeholder)
1028}
1029
1030// Return a placeholder timestamp until the runner owns a clock abstraction.
1031fn timestamp_placeholder() -> String {
1032    "unknown".to_string()
1033}
1034
1035// Persist the restore apply journal to its canonical runner path.
1036fn write_apply_journal_file(
1037    path: &Path,
1038    journal: &RestoreApplyJournal,
1039) -> Result<(), RestoreRunnerError> {
1040    let data = serde_json::to_vec_pretty(journal)?;
1041    fs::write(path, data)?;
1042    Ok(())
1043}
1044
1045///
1046/// RestoreJournalLock
1047///
1048
1049struct RestoreJournalLock {
1050    path: PathBuf,
1051}
1052
1053impl RestoreJournalLock {
1054    // Acquire an atomic sidecar lock for mutating restore runner operations.
1055    fn acquire(journal_path: &Path) -> Result<Self, RestoreRunnerError> {
1056        let path = journal_lock_path(journal_path);
1057        match fs::OpenOptions::new()
1058            .write(true)
1059            .create_new(true)
1060            .open(&path)
1061        {
1062            Ok(mut file) => {
1063                writeln!(file, "pid={}", std::process::id())?;
1064                Ok(Self { path })
1065            }
1066            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
1067                Err(RestoreRunnerError::JournalLocked {
1068                    lock_path: path.to_string_lossy().to_string(),
1069                })
1070            }
1071            Err(error) => Err(error.into()),
1072        }
1073    }
1074}
1075
1076impl Drop for RestoreJournalLock {
1077    // Release the sidecar lock when the mutating command completes or fails.
1078    fn drop(&mut self) {
1079        let _ = fs::remove_file(&self.path);
1080    }
1081}
1082
1083// Derive the sidecar lock path for one apply journal.
1084fn journal_lock_path(path: &Path) -> PathBuf {
1085    let mut lock_path = path.as_os_str().to_os_string();
1086    lock_path.push(".lock");
1087    PathBuf::from(lock_path)
1088}