Skip to main content

canic_backup/restore/runner/
execute.rs

1use super::{
2    RestoreApplyCommandOutputPair, RestoreApplyJournal, RestoreApplyOperationKind,
3    RestoreApplyOperationReceipt,
4    constants::{
5        RESTORE_RUN_COMMAND_EXIT_PREFIX, RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID,
6        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT, RESTORE_RUN_STOPPED_PRECONDITION_FAILED,
7    },
8    io::{RestoreJournalLock, read_apply_journal_file, state_updated_at, write_apply_journal_file},
9    precondition::enforce_stopped_canister_precondition,
10    status::{
11        enforce_restore_run_command_available, enforce_restore_run_executable,
12        parse_uploaded_snapshot_id, restore_command_unavailable_error,
13        restore_run_max_steps_reached, restore_run_next_action, restore_run_stopped_reason,
14    },
15    types::{
16        RestoreRunExecutedOperation, RestoreRunOperationReceipt, RestoreRunPreparedOperation,
17        RestoreRunResponse, RestoreRunResponseMode, RestoreRunStepOutcome,
18        RestoreRunnerCommandExecutor, RestoreRunnerConfig, RestoreRunnerError,
19        RestoreRunnerOutcome, RestoreStoppedPreconditionFailure,
20    },
21};
22
23/// Execute ready restore apply journal operations through an injected command executor.
24pub fn restore_run_execute_with_executor(
25    config: &RestoreRunnerConfig,
26    executor: &mut impl RestoreRunnerCommandExecutor,
27) -> Result<RestoreRunResponse, RestoreRunnerError> {
28    let run = restore_run_execute_result_with_executor(config, executor)?;
29    if let Some(error) = run.error {
30        return Err(error);
31    }
32
33    Ok(run.response)
34}
35
36// Execute ready restore apply operations and retain any deferred runner error.
37pub fn restore_run_execute_result_with_executor(
38    config: &RestoreRunnerConfig,
39    executor: &mut impl RestoreRunnerCommandExecutor,
40) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
41    let _lock = RestoreJournalLock::acquire(&config.journal)?;
42    let mut journal = read_apply_journal_file(&config.journal)?;
43    let mut executed_operations = Vec::new();
44    let mut operation_receipts = Vec::new();
45
46    loop {
47        let report = journal.report();
48        let max_steps_reached =
49            restore_run_max_steps_reached(config, executed_operations.len(), &report);
50        if report.complete || max_steps_reached {
51            return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
52                &journal,
53                executed_operations,
54                operation_receipts,
55                max_steps_reached,
56                config.updated_at.as_ref(),
57            )));
58        }
59
60        enforce_restore_run_executable(&journal, &report)?;
61        let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
62        let sequence = prepared.sequence;
63        match restore_run_execute_prepared_operation(config, executor, &mut journal, prepared)? {
64            RestoreRunStepOutcome::Completed {
65                executed_operation,
66                operation_receipt,
67            } => {
68                executed_operations.push(executed_operation);
69                operation_receipts.push(operation_receipt);
70            }
71            RestoreRunStepOutcome::Failed {
72                executed_operation,
73                operation_receipt,
74                status,
75            } => {
76                executed_operations.push(executed_operation);
77                operation_receipts.push(operation_receipt);
78                let response = restore_run_execute_summary(
79                    &journal,
80                    executed_operations,
81                    operation_receipts,
82                    false,
83                    config.updated_at.as_ref(),
84                );
85                return Ok(RestoreRunnerOutcome {
86                    response,
87                    error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
88                });
89            }
90        }
91    }
92}
93
94// Claim the next renderable operation and persist the pending state.
95fn restore_run_prepare_next_operation(
96    config: &RestoreRunnerConfig,
97    journal: &mut RestoreApplyJournal,
98) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
99    let preview = journal.next_command_preview_with_config(&config.command);
100    enforce_restore_run_command_available(&preview)?;
101
102    let operation = preview
103        .operation
104        .clone()
105        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
106    let command = preview
107        .command
108        .clone()
109        .ok_or_else(|| restore_command_unavailable_error(&preview))?;
110    let sequence = operation.sequence;
111    let attempt = journal
112        .operation_receipts
113        .iter()
114        .filter(|receipt| receipt.sequence == sequence)
115        .count()
116        + 1;
117
118    enforce_apply_claim_sequence(sequence, journal)?;
119    journal
120        .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
121    write_apply_journal_file(&config.journal, journal)?;
122
123    Ok(RestoreRunPreparedOperation {
124        operation,
125        command,
126        sequence,
127        attempt,
128    })
129}
130
131// Execute one claimed operation and commit either success or failure.
132fn restore_run_execute_prepared_operation(
133    config: &RestoreRunnerConfig,
134    executor: &mut impl RestoreRunnerCommandExecutor,
135    journal: &mut RestoreApplyJournal,
136    prepared: RestoreRunPreparedOperation,
137) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
138    if prepared.command.requires_stopped_canister
139        && let Some(outcome) = enforce_stopped_canister_precondition(
140            config,
141            executor,
142            &prepared.operation,
143            prepared.attempt,
144            config.updated_at.as_ref(),
145        )?
146    {
147        return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
148    }
149
150    let output = executor.execute(&prepared.command)?;
151    let status_label = output.status;
152    let output_pair = RestoreApplyCommandOutputPair::from_bytes(
153        &output.stdout,
154        &output.stderr,
155        RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
156    );
157
158    if output.success {
159        let is_upload_snapshot =
160            prepared.operation.operation == RestoreApplyOperationKind::UploadSnapshot;
161        let uploaded_snapshot_id = is_upload_snapshot
162            .then(|| parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout)))
163            .flatten();
164        if is_upload_snapshot && uploaded_snapshot_id.is_none() {
165            return restore_run_commit_missing_uploaded_snapshot_id(
166                config,
167                journal,
168                prepared,
169                output_pair,
170            );
171        }
172
173        return restore_run_commit_command_success(
174            config,
175            journal,
176            prepared,
177            status_label,
178            output_pair,
179            uploaded_snapshot_id,
180        );
181    }
182
183    restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
184}
185
186// Commit a successful upload command whose output is missing the required snapshot id.
187fn restore_run_commit_missing_uploaded_snapshot_id(
188    config: &RestoreRunnerConfig,
189    journal: &mut RestoreApplyJournal,
190    prepared: RestoreRunPreparedOperation,
191    output_pair: RestoreApplyCommandOutputPair,
192) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
193    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
194    let status = RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string();
195    journal.mark_operation_failed_at(
196        prepared.sequence,
197        status.clone(),
198        Some(failed_updated_at.clone()),
199    )?;
200    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
201        &prepared.operation,
202        prepared.command.clone(),
203        status.clone(),
204        Some(failed_updated_at.clone()),
205        output_pair,
206        prepared.attempt,
207        status.clone(),
208    ))?;
209    write_apply_journal_file(&config.journal, journal)?;
210
211    Ok(RestoreRunStepOutcome::Failed {
212        executed_operation: RestoreRunExecutedOperation::failed(
213            prepared.operation.clone(),
214            prepared.command.clone(),
215            RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string(),
216        ),
217        operation_receipt: RestoreRunOperationReceipt::failed(
218            prepared.operation,
219            prepared.command,
220            status.clone(),
221            Some(failed_updated_at),
222        ),
223        status,
224    })
225}
226
227// Commit a stopped-canister precondition failure for a claimed operation.
228fn restore_run_commit_precondition_failure(
229    config: &RestoreRunnerConfig,
230    journal: &mut RestoreApplyJournal,
231    prepared: RestoreRunPreparedOperation,
232    outcome: RestoreStoppedPreconditionFailure,
233) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
234    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
235    journal.mark_operation_failed_at(
236        prepared.sequence,
237        outcome.failure_reason.clone(),
238        Some(failed_updated_at.clone()),
239    )?;
240    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
241        &prepared.operation,
242        outcome.command.clone(),
243        outcome.status_label.clone(),
244        Some(failed_updated_at.clone()),
245        outcome.output,
246        prepared.attempt,
247        outcome.failure_reason,
248    ))?;
249    write_apply_journal_file(&config.journal, journal)?;
250
251    Ok(RestoreRunStepOutcome::Failed {
252        executed_operation: RestoreRunExecutedOperation::failed(
253            prepared.operation.clone(),
254            outcome.command.clone(),
255            outcome.status_label.clone(),
256        ),
257        operation_receipt: RestoreRunOperationReceipt::failed(
258            prepared.operation,
259            outcome.command,
260            outcome.status_label,
261            Some(failed_updated_at),
262        ),
263        status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
264    })
265}
266
267// Commit a successful process command for a claimed operation.
268fn restore_run_commit_command_success(
269    config: &RestoreRunnerConfig,
270    journal: &mut RestoreApplyJournal,
271    prepared: RestoreRunPreparedOperation,
272    status_label: String,
273    output_pair: RestoreApplyCommandOutputPair,
274    uploaded_snapshot_id: Option<String>,
275) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
276    let completed_updated_at = state_updated_at(config.updated_at.as_ref());
277    journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
278    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
279        &prepared.operation,
280        prepared.command.clone(),
281        status_label.clone(),
282        Some(completed_updated_at.clone()),
283        output_pair,
284        prepared.attempt,
285        uploaded_snapshot_id,
286    ))?;
287    write_apply_journal_file(&config.journal, journal)?;
288
289    Ok(RestoreRunStepOutcome::Completed {
290        executed_operation: RestoreRunExecutedOperation::completed(
291            prepared.operation.clone(),
292            prepared.command.clone(),
293            status_label.clone(),
294        ),
295        operation_receipt: RestoreRunOperationReceipt::completed(
296            prepared.operation,
297            prepared.command,
298            status_label,
299            Some(completed_updated_at),
300        ),
301    })
302}
303
304// Commit a failed process command for a claimed operation.
305fn restore_run_commit_command_failure(
306    config: &RestoreRunnerConfig,
307    journal: &mut RestoreApplyJournal,
308    prepared: RestoreRunPreparedOperation,
309    status_label: String,
310    output_pair: RestoreApplyCommandOutputPair,
311) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
312    let failed_updated_at = state_updated_at(config.updated_at.as_ref());
313    let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
314    journal.mark_operation_failed_at(
315        prepared.sequence,
316        failure_reason.clone(),
317        Some(failed_updated_at.clone()),
318    )?;
319    journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
320        &prepared.operation,
321        prepared.command.clone(),
322        status_label.clone(),
323        Some(failed_updated_at.clone()),
324        output_pair,
325        prepared.attempt,
326        failure_reason,
327    ))?;
328    write_apply_journal_file(&config.journal, journal)?;
329
330    Ok(RestoreRunStepOutcome::Failed {
331        executed_operation: RestoreRunExecutedOperation::failed(
332            prepared.operation.clone(),
333            prepared.command.clone(),
334            status_label.clone(),
335        ),
336        operation_receipt: RestoreRunOperationReceipt::failed(
337            prepared.operation,
338            prepared.command,
339            status_label.clone(),
340            Some(failed_updated_at),
341        ),
342        status: status_label,
343    })
344}
345
346// Build the final native runner execution summary.
347fn restore_run_execute_summary(
348    journal: &RestoreApplyJournal,
349    executed_operations: Vec<RestoreRunExecutedOperation>,
350    operation_receipts: Vec<RestoreRunOperationReceipt>,
351    max_steps_reached: bool,
352    requested_state_updated_at: Option<&String>,
353) -> RestoreRunResponse {
354    let report = journal.report();
355    let executed_operation_count = executed_operations.len();
356    let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
357    let next_action = restore_run_next_action(&report);
358
359    let mut response = RestoreRunResponse::from_report(
360        journal.backup_id.clone(),
361        report,
362        RestoreRunResponseMode::execute(stopped_reason, next_action),
363    );
364    response.set_requested_state_updated_at(requested_state_updated_at);
365    response.max_steps_reached = Some(max_steps_reached);
366    response.executed_operation_count = Some(executed_operation_count);
367    response.executed_operations = executed_operations;
368    response.set_operation_receipts(operation_receipts);
369    response
370}
371
372// Ensure a runner claim still matches the operation it previewed.
373fn enforce_apply_claim_sequence(
374    expected: usize,
375    journal: &RestoreApplyJournal,
376) -> Result<(), RestoreRunnerError> {
377    let actual = journal
378        .next_transition_operation()
379        .map(|operation| operation.sequence);
380
381    if actual == Some(expected) {
382        return Ok(());
383    }
384
385    Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
386}