1use super::{
8 RestoreApplyCommandOutputPair, RestoreApplyJournal, RestoreApplyOperationKind,
9 RestoreApplyOperationReceipt,
10 constants::{
11 RESTORE_RUN_COMMAND_EXIT_PREFIX, RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID,
12 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT, RESTORE_RUN_STOPPED_PRECONDITION_FAILED,
13 },
14 io::{RestoreJournalLock, read_apply_journal_file, state_updated_at, write_apply_journal_file},
15 precondition::enforce_stopped_canister_precondition,
16 status::{
17 enforce_restore_run_command_available, enforce_restore_run_executable,
18 parse_uploaded_snapshot_id, restore_command_unavailable_error,
19 restore_run_max_steps_reached, restore_run_next_action, restore_run_stopped_reason,
20 },
21 types::{
22 RestoreRunExecutedOperation, RestoreRunOperationReceipt, RestoreRunPreparedOperation,
23 RestoreRunResponse, RestoreRunResponseMode, RestoreRunStepOutcome,
24 RestoreRunnerCommandExecutor, RestoreRunnerConfig, RestoreRunnerError,
25 RestoreRunnerOutcome, RestoreStoppedPreconditionFailure,
26 },
27};
28
29pub fn restore_run_execute_with_executor(
31 config: &RestoreRunnerConfig,
32 executor: &mut impl RestoreRunnerCommandExecutor,
33) -> Result<RestoreRunResponse, RestoreRunnerError> {
34 let run = restore_run_execute_result_with_executor(config, executor)?;
35 if let Some(error) = run.error {
36 return Err(error);
37 }
38
39 Ok(run.response)
40}
41
42pub fn restore_run_execute_result_with_executor(
43 config: &RestoreRunnerConfig,
44 executor: &mut impl RestoreRunnerCommandExecutor,
45) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
46 let _lock = RestoreJournalLock::acquire(&config.journal)?;
47 let mut journal = read_apply_journal_file(&config.journal)?;
48 let mut executed_operations = Vec::new();
49 let mut operation_receipts = Vec::new();
50
51 loop {
52 let report = journal.report();
53 let max_steps_reached =
54 restore_run_max_steps_reached(config, executed_operations.len(), &report);
55 if report.complete || max_steps_reached {
56 return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
57 &journal,
58 executed_operations,
59 operation_receipts,
60 max_steps_reached,
61 config.updated_at.as_ref(),
62 )));
63 }
64
65 enforce_restore_run_executable(&journal, &report)?;
66 let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
67 let sequence = prepared.sequence;
68 match restore_run_execute_prepared_operation(config, executor, &mut journal, prepared)? {
69 RestoreRunStepOutcome::Completed {
70 executed_operation,
71 operation_receipt,
72 } => {
73 executed_operations.push(executed_operation);
74 operation_receipts.push(operation_receipt);
75 }
76 RestoreRunStepOutcome::Failed {
77 executed_operation,
78 operation_receipt,
79 status,
80 } => {
81 executed_operations.push(executed_operation);
82 operation_receipts.push(operation_receipt);
83 let response = restore_run_execute_summary(
84 &journal,
85 executed_operations,
86 operation_receipts,
87 false,
88 config.updated_at.as_ref(),
89 );
90 return Ok(RestoreRunnerOutcome {
91 response,
92 error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
93 });
94 }
95 }
96 }
97}
98
99fn restore_run_prepare_next_operation(
100 config: &RestoreRunnerConfig,
101 journal: &mut RestoreApplyJournal,
102) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
103 let preview = journal.next_command_preview_with_config(&config.command);
104 enforce_restore_run_command_available(&preview)?;
105
106 let operation = preview
107 .operation
108 .clone()
109 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
110 let command = preview
111 .command
112 .clone()
113 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
114 let sequence = operation.sequence;
115 let attempt = journal
116 .operation_receipts
117 .iter()
118 .filter(|receipt| receipt.sequence == sequence)
119 .count()
120 + 1;
121
122 enforce_apply_claim_sequence(sequence, journal)?;
123 journal
124 .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
125 write_apply_journal_file(&config.journal, journal)?;
126
127 Ok(RestoreRunPreparedOperation {
128 operation,
129 command,
130 sequence,
131 attempt,
132 })
133}
134
135fn restore_run_execute_prepared_operation(
136 config: &RestoreRunnerConfig,
137 executor: &mut impl RestoreRunnerCommandExecutor,
138 journal: &mut RestoreApplyJournal,
139 prepared: RestoreRunPreparedOperation,
140) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
141 if prepared.command.requires_stopped_canister
142 && let Some(outcome) = enforce_stopped_canister_precondition(
143 config,
144 executor,
145 &prepared.operation,
146 prepared.attempt,
147 config.updated_at.as_ref(),
148 )?
149 {
150 return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
151 }
152
153 let output = executor.execute(&prepared.command)?;
154 let status_label = output.status;
155 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
156 &output.stdout,
157 &output.stderr,
158 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
159 );
160
161 if output.success {
162 let is_upload_snapshot =
163 prepared.operation.operation == RestoreApplyOperationKind::UploadSnapshot;
164 let uploaded_snapshot_id = is_upload_snapshot
165 .then(|| parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout)))
166 .flatten();
167 if is_upload_snapshot && uploaded_snapshot_id.is_none() {
168 return restore_run_commit_missing_uploaded_snapshot_id(
169 config,
170 journal,
171 prepared,
172 output_pair,
173 );
174 }
175
176 return restore_run_commit_command_success(
177 config,
178 journal,
179 prepared,
180 status_label,
181 output_pair,
182 uploaded_snapshot_id,
183 );
184 }
185
186 restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
187}
188
189fn restore_run_commit_missing_uploaded_snapshot_id(
190 config: &RestoreRunnerConfig,
191 journal: &mut RestoreApplyJournal,
192 prepared: RestoreRunPreparedOperation,
193 output_pair: RestoreApplyCommandOutputPair,
194) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
195 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
196 let status = RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string();
197 journal.mark_operation_failed_at(
198 prepared.sequence,
199 status.clone(),
200 Some(failed_updated_at.clone()),
201 )?;
202 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
203 &prepared.operation,
204 prepared.command.clone(),
205 status.clone(),
206 Some(failed_updated_at.clone()),
207 output_pair,
208 prepared.attempt,
209 status.clone(),
210 ))?;
211 write_apply_journal_file(&config.journal, journal)?;
212
213 Ok(RestoreRunStepOutcome::Failed {
214 executed_operation: RestoreRunExecutedOperation::failed(
215 prepared.operation.clone(),
216 prepared.command.clone(),
217 RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string(),
218 ),
219 operation_receipt: RestoreRunOperationReceipt::failed(
220 prepared.operation,
221 prepared.command,
222 status.clone(),
223 Some(failed_updated_at),
224 ),
225 status,
226 })
227}
228
229fn restore_run_commit_precondition_failure(
230 config: &RestoreRunnerConfig,
231 journal: &mut RestoreApplyJournal,
232 prepared: RestoreRunPreparedOperation,
233 outcome: RestoreStoppedPreconditionFailure,
234) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
235 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
236 journal.mark_operation_failed_at(
237 prepared.sequence,
238 outcome.failure_reason.clone(),
239 Some(failed_updated_at.clone()),
240 )?;
241 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
242 &prepared.operation,
243 outcome.command.clone(),
244 outcome.status_label.clone(),
245 Some(failed_updated_at.clone()),
246 outcome.output,
247 prepared.attempt,
248 outcome.failure_reason,
249 ))?;
250 write_apply_journal_file(&config.journal, journal)?;
251
252 Ok(RestoreRunStepOutcome::Failed {
253 executed_operation: RestoreRunExecutedOperation::failed(
254 prepared.operation.clone(),
255 outcome.command.clone(),
256 outcome.status_label.clone(),
257 ),
258 operation_receipt: RestoreRunOperationReceipt::failed(
259 prepared.operation,
260 outcome.command,
261 outcome.status_label,
262 Some(failed_updated_at),
263 ),
264 status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
265 })
266}
267
268fn restore_run_commit_command_success(
269 config: &RestoreRunnerConfig,
270 journal: &mut RestoreApplyJournal,
271 prepared: RestoreRunPreparedOperation,
272 status_label: String,
273 output_pair: RestoreApplyCommandOutputPair,
274 uploaded_snapshot_id: Option<String>,
275) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
276 let completed_updated_at = state_updated_at(config.updated_at.as_ref());
277 journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
278 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
279 &prepared.operation,
280 prepared.command.clone(),
281 status_label.clone(),
282 Some(completed_updated_at.clone()),
283 output_pair,
284 prepared.attempt,
285 uploaded_snapshot_id,
286 ))?;
287 write_apply_journal_file(&config.journal, journal)?;
288
289 Ok(RestoreRunStepOutcome::Completed {
290 executed_operation: RestoreRunExecutedOperation::completed(
291 prepared.operation.clone(),
292 prepared.command.clone(),
293 status_label.clone(),
294 ),
295 operation_receipt: RestoreRunOperationReceipt::completed(
296 prepared.operation,
297 prepared.command,
298 status_label,
299 Some(completed_updated_at),
300 ),
301 })
302}
303
304fn restore_run_commit_command_failure(
305 config: &RestoreRunnerConfig,
306 journal: &mut RestoreApplyJournal,
307 prepared: RestoreRunPreparedOperation,
308 status_label: String,
309 output_pair: RestoreApplyCommandOutputPair,
310) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
311 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
312 let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
313 journal.mark_operation_failed_at(
314 prepared.sequence,
315 failure_reason.clone(),
316 Some(failed_updated_at.clone()),
317 )?;
318 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
319 &prepared.operation,
320 prepared.command.clone(),
321 status_label.clone(),
322 Some(failed_updated_at.clone()),
323 output_pair,
324 prepared.attempt,
325 failure_reason,
326 ))?;
327 write_apply_journal_file(&config.journal, journal)?;
328
329 Ok(RestoreRunStepOutcome::Failed {
330 executed_operation: RestoreRunExecutedOperation::failed(
331 prepared.operation.clone(),
332 prepared.command.clone(),
333 status_label.clone(),
334 ),
335 operation_receipt: RestoreRunOperationReceipt::failed(
336 prepared.operation,
337 prepared.command,
338 status_label.clone(),
339 Some(failed_updated_at),
340 ),
341 status: status_label,
342 })
343}
344
345fn restore_run_execute_summary(
346 journal: &RestoreApplyJournal,
347 executed_operations: Vec<RestoreRunExecutedOperation>,
348 operation_receipts: Vec<RestoreRunOperationReceipt>,
349 max_steps_reached: bool,
350 requested_state_updated_at: Option<&String>,
351) -> RestoreRunResponse {
352 let report = journal.report();
353 let executed_operation_count = executed_operations.len();
354 let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
355 let next_action = restore_run_next_action(&report);
356
357 let mut response = RestoreRunResponse::from_report(
358 journal.backup_id.clone(),
359 report,
360 RestoreRunResponseMode::execute(stopped_reason, next_action),
361 );
362 response.set_requested_state_updated_at(requested_state_updated_at);
363 response.max_steps_reached = Some(max_steps_reached);
364 response.executed_operation_count = Some(executed_operation_count);
365 response.executed_operations = executed_operations;
366 response.set_operation_receipts(operation_receipts);
367 response
368}
369
370fn enforce_apply_claim_sequence(
371 expected: usize,
372 journal: &RestoreApplyJournal,
373) -> Result<(), RestoreRunnerError> {
374 let actual = journal
375 .next_transition_operation()
376 .map(|operation| operation.sequence);
377
378 if actual == Some(expected) {
379 return Ok(());
380 }
381
382 Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
383}