Skip to main content

canic_backup/restore/runner/
execute.rs

1//! Module: restore::runner::execute
2//!
3//! Responsibility: execute ready restore apply journal operations.
4//! Does not own: command rendering, apply journal validation, or restore planning.
5//! Boundary: claims operations, invokes an injected executor, and persists receipts.
6
7use super::{
8    RestoreApplyCommandOutputPair, RestoreApplyJournal, RestoreApplyOperationKind,
9    RestoreApplyOperationReceipt,
10    constants::{
11        RESTORE_RUN_COMMAND_EXIT_PREFIX, RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID,
12        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT, RESTORE_RUN_STOPPED_PRECONDITION_FAILED,
13    },
14    io::{RestoreJournalLock, read_apply_journal_file, state_updated_at, write_apply_journal_file},
15    precondition::enforce_stopped_canister_precondition,
16    status::{
17        enforce_restore_run_command_available, enforce_restore_run_executable,
18        parse_uploaded_snapshot_id, restore_command_unavailable_error,
19        restore_run_max_steps_reached, restore_run_next_action, restore_run_stopped_reason,
20    },
21    types::{
22        RestoreRunExecutedOperation, RestoreRunOperationReceipt, RestoreRunPreparedOperation,
23        RestoreRunResponse, RestoreRunResponseMode, RestoreRunStepOutcome,
24        RestoreRunnerCommandExecutor, RestoreRunnerConfig, RestoreRunnerError,
25        RestoreRunnerOutcome, RestoreStoppedPreconditionFailure,
26    },
27};
28
29/// Execute ready restore apply journal operations through an injected command executor.
30pub fn restore_run_execute_with_executor(
31    config: &RestoreRunnerConfig,
32    executor: &mut impl RestoreRunnerCommandExecutor,
33) -> Result<RestoreRunResponse, RestoreRunnerError> {
34    let run = restore_run_execute_result_with_executor(config, executor)?;
35    if let Some(error) = run.error {
36        return Err(error);
37    }
38
39    Ok(run.response)
40}
41
42pub fn restore_run_execute_result_with_executor(
43    config: &RestoreRunnerConfig,
44    executor: &mut impl RestoreRunnerCommandExecutor,
45) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
46    let _lock = RestoreJournalLock::acquire(&config.journal)?;
47    let mut journal = read_apply_journal_file(&config.journal)?;
48    let mut executed_operations = Vec::new();
49    let mut operation_receipts = Vec::new();
50
51    loop {
52        let report = journal.report();
53        let max_steps_reached =
54            restore_run_max_steps_reached(config, executed_operations.len(), &report);
55        if report.complete || max_steps_reached {
56            return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
57                &journal,
58                executed_operations,
59                operation_receipts,
60                max_steps_reached,
61                config.updated_at.as_ref(),
62            )));
63        }
64
65        enforce_restore_run_executable(&journal, &report)?;
66        let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
67        let sequence = prepared.sequence;
68        match restore_run_execute_prepared_operation(config, executor, &mut journal, prepared)? {
69            RestoreRunStepOutcome::Completed {
70                executed_operation,
71                operation_receipt,
72            } => {
73                executed_operations.push(executed_operation);
74                operation_receipts.push(operation_receipt);
75            }
76            RestoreRunStepOutcome::Failed {
77                executed_operation,
78                operation_receipt,
79                status,
80            } => {
81                executed_operations.push(executed_operation);
82                operation_receipts.push(operation_receipt);
83                let response = restore_run_execute_summary(
84                    &journal,
85                    executed_operations,
86                    operation_receipts,
87                    false,
88                    config.updated_at.as_ref(),
89                );
90                return Ok(RestoreRunnerOutcome {
91                    response,
92                    error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
93                });
94            }
95        }
96    }
97}
98
99fn restore_run_prepare_next_operation(
100    config: &RestoreRunnerConfig,
101    journal: &mut RestoreApplyJournal,
102) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
103    let preview = journal.next_command_preview_with_config(&config.command);
104    enforce_restore_run_command_available(&preview)?;
105
106    let operation = preview
107        .operation
108        .clone()
109        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
110    let command = preview
111        .command
112        .clone()
113        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
114    let sequence = operation.sequence;
115    let attempt = journal
116        .operation_receipts
117        .iter()
118        .filter(|receipt| receipt.sequence == sequence)
119        .count()
120        + 1;
121
122    enforce_apply_claim_sequence(sequence, journal)?;
123    journal
124        .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
125    write_apply_journal_file(&config.journal, journal)?;
126
127    Ok(RestoreRunPreparedOperation {
128        operation,
129        command,
130        sequence,
131        attempt,
132    })
133}
134
135fn restore_run_execute_prepared_operation(
136    config: &RestoreRunnerConfig,
137    executor: &mut impl RestoreRunnerCommandExecutor,
138    journal: &mut RestoreApplyJournal,
139    prepared: RestoreRunPreparedOperation,
140) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
141    if prepared.command.requires_stopped_canister
142        && let Some(outcome) = enforce_stopped_canister_precondition(
143            config,
144            executor,
145            &prepared.operation,
146            prepared.attempt,
147            config.updated_at.as_ref(),
148        )?
149    {
150        return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
151    }
152
153    let output = executor.execute(&prepared.command)?;
154    let status_label = output.status;
155    let output_pair = RestoreApplyCommandOutputPair::from_bytes(
156        &output.stdout,
157        &output.stderr,
158        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
159    );
160
161    if output.success {
162        let is_upload_snapshot =
163            prepared.operation.operation == RestoreApplyOperationKind::UploadSnapshot;
164        let uploaded_snapshot_id = is_upload_snapshot
165            .then(|| parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout)))
166            .flatten();
167        if is_upload_snapshot && uploaded_snapshot_id.is_none() {
168            return restore_run_commit_missing_uploaded_snapshot_id(
169                config,
170                journal,
171                prepared,
172                output_pair,
173            );
174        }
175
176        return restore_run_commit_command_success(
177            config,
178            journal,
179            prepared,
180            status_label,
181            output_pair,
182            uploaded_snapshot_id,
183        );
184    }
185
186    restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
187}
188
189fn restore_run_commit_missing_uploaded_snapshot_id(
190    config: &RestoreRunnerConfig,
191    journal: &mut RestoreApplyJournal,
192    prepared: RestoreRunPreparedOperation,
193    output_pair: RestoreApplyCommandOutputPair,
194) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
195    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
196    let status = RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string();
197    journal.mark_operation_failed_at(
198        prepared.sequence,
199        status.clone(),
200        Some(failed_updated_at.clone()),
201    )?;
202    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
203        &prepared.operation,
204        prepared.command.clone(),
205        status.clone(),
206        Some(failed_updated_at.clone()),
207        output_pair,
208        prepared.attempt,
209        status.clone(),
210    ))?;
211    write_apply_journal_file(&config.journal, journal)?;
212
213    Ok(RestoreRunStepOutcome::Failed {
214        executed_operation: RestoreRunExecutedOperation::failed(
215            prepared.operation.clone(),
216            prepared.command.clone(),
217            RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string(),
218        ),
219        operation_receipt: RestoreRunOperationReceipt::failed(
220            prepared.operation,
221            prepared.command,
222            status.clone(),
223            Some(failed_updated_at),
224        ),
225        status,
226    })
227}
228
229fn restore_run_commit_precondition_failure(
230    config: &RestoreRunnerConfig,
231    journal: &mut RestoreApplyJournal,
232    prepared: RestoreRunPreparedOperation,
233    outcome: RestoreStoppedPreconditionFailure,
234) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
235    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
236    journal.mark_operation_failed_at(
237        prepared.sequence,
238        outcome.failure_reason.clone(),
239        Some(failed_updated_at.clone()),
240    )?;
241    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
242        &prepared.operation,
243        outcome.command.clone(),
244        outcome.status_label.clone(),
245        Some(failed_updated_at.clone()),
246        outcome.output,
247        prepared.attempt,
248        outcome.failure_reason,
249    ))?;
250    write_apply_journal_file(&config.journal, journal)?;
251
252    Ok(RestoreRunStepOutcome::Failed {
253        executed_operation: RestoreRunExecutedOperation::failed(
254            prepared.operation.clone(),
255            outcome.command.clone(),
256            outcome.status_label.clone(),
257        ),
258        operation_receipt: RestoreRunOperationReceipt::failed(
259            prepared.operation,
260            outcome.command,
261            outcome.status_label,
262            Some(failed_updated_at),
263        ),
264        status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
265    })
266}
267
268fn restore_run_commit_command_success(
269    config: &RestoreRunnerConfig,
270    journal: &mut RestoreApplyJournal,
271    prepared: RestoreRunPreparedOperation,
272    status_label: String,
273    output_pair: RestoreApplyCommandOutputPair,
274    uploaded_snapshot_id: Option<String>,
275) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
276    let completed_updated_at = state_updated_at(config.updated_at.as_ref());
277    journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
278    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
279        &prepared.operation,
280        prepared.command.clone(),
281        status_label.clone(),
282        Some(completed_updated_at.clone()),
283        output_pair,
284        prepared.attempt,
285        uploaded_snapshot_id,
286    ))?;
287    write_apply_journal_file(&config.journal, journal)?;
288
289    Ok(RestoreRunStepOutcome::Completed {
290        executed_operation: RestoreRunExecutedOperation::completed(
291            prepared.operation.clone(),
292            prepared.command.clone(),
293            status_label.clone(),
294        ),
295        operation_receipt: RestoreRunOperationReceipt::completed(
296            prepared.operation,
297            prepared.command,
298            status_label,
299            Some(completed_updated_at),
300        ),
301    })
302}
303
304fn restore_run_commit_command_failure(
305    config: &RestoreRunnerConfig,
306    journal: &mut RestoreApplyJournal,
307    prepared: RestoreRunPreparedOperation,
308    status_label: String,
309    output_pair: RestoreApplyCommandOutputPair,
310) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
311    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
312    let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
313    journal.mark_operation_failed_at(
314        prepared.sequence,
315        failure_reason.clone(),
316        Some(failed_updated_at.clone()),
317    )?;
318    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
319        &prepared.operation,
320        prepared.command.clone(),
321        status_label.clone(),
322        Some(failed_updated_at.clone()),
323        output_pair,
324        prepared.attempt,
325        failure_reason,
326    ))?;
327    write_apply_journal_file(&config.journal, journal)?;
328
329    Ok(RestoreRunStepOutcome::Failed {
330        executed_operation: RestoreRunExecutedOperation::failed(
331            prepared.operation.clone(),
332            prepared.command.clone(),
333            status_label.clone(),
334        ),
335        operation_receipt: RestoreRunOperationReceipt::failed(
336            prepared.operation,
337            prepared.command,
338            status_label.clone(),
339            Some(failed_updated_at),
340        ),
341        status: status_label,
342    })
343}
344
345fn restore_run_execute_summary(
346    journal: &RestoreApplyJournal,
347    executed_operations: Vec<RestoreRunExecutedOperation>,
348    operation_receipts: Vec<RestoreRunOperationReceipt>,
349    max_steps_reached: bool,
350    requested_state_updated_at: Option<&String>,
351) -> RestoreRunResponse {
352    let report = journal.report();
353    let executed_operation_count = executed_operations.len();
354    let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
355    let next_action = restore_run_next_action(&report);
356
357    let mut response = RestoreRunResponse::from_report(
358        journal.backup_id.clone(),
359        report,
360        RestoreRunResponseMode::execute(stopped_reason, next_action),
361    );
362    response.set_requested_state_updated_at(requested_state_updated_at);
363    response.max_steps_reached = Some(max_steps_reached);
364    response.executed_operation_count = Some(executed_operation_count);
365    response.executed_operations = executed_operations;
366    response.set_operation_receipts(operation_receipts);
367    response
368}
369
370fn enforce_apply_claim_sequence(
371    expected: usize,
372    journal: &RestoreApplyJournal,
373) -> Result<(), RestoreRunnerError> {
374    let actual = journal
375        .next_transition_operation()
376        .map(|operation| operation.sequence);
377
378    if actual == Some(expected) {
379        return Ok(());
380    }
381
382    Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
383}