1use super::{
2 RestoreApplyCommandOutputPair, RestoreApplyJournal, RestoreApplyOperationKind,
3 RestoreApplyOperationReceipt,
4 constants::{
5 RESTORE_RUN_COMMAND_EXIT_PREFIX, RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID,
6 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT, RESTORE_RUN_STOPPED_PRECONDITION_FAILED,
7 },
8 io::{RestoreJournalLock, read_apply_journal_file, state_updated_at, write_apply_journal_file},
9 precondition::enforce_stopped_canister_precondition,
10 status::{
11 enforce_restore_run_command_available, enforce_restore_run_executable,
12 parse_uploaded_snapshot_id, restore_command_unavailable_error,
13 restore_run_max_steps_reached, restore_run_next_action, restore_run_stopped_reason,
14 },
15 types::{
16 RestoreRunExecutedOperation, RestoreRunOperationReceipt, RestoreRunPreparedOperation,
17 RestoreRunResponse, RestoreRunResponseMode, RestoreRunStepOutcome,
18 RestoreRunnerCommandExecutor, RestoreRunnerConfig, RestoreRunnerError,
19 RestoreRunnerOutcome, RestoreStoppedPreconditionFailure,
20 },
21};
22
23pub fn restore_run_execute_with_executor(
25 config: &RestoreRunnerConfig,
26 executor: &mut impl RestoreRunnerCommandExecutor,
27) -> Result<RestoreRunResponse, RestoreRunnerError> {
28 let run = restore_run_execute_result_with_executor(config, executor)?;
29 if let Some(error) = run.error {
30 return Err(error);
31 }
32
33 Ok(run.response)
34}
35
36pub fn restore_run_execute_result_with_executor(
38 config: &RestoreRunnerConfig,
39 executor: &mut impl RestoreRunnerCommandExecutor,
40) -> Result<RestoreRunnerOutcome, RestoreRunnerError> {
41 let _lock = RestoreJournalLock::acquire(&config.journal)?;
42 let mut journal = read_apply_journal_file(&config.journal)?;
43 let mut executed_operations = Vec::new();
44 let mut operation_receipts = Vec::new();
45
46 loop {
47 let report = journal.report();
48 let max_steps_reached =
49 restore_run_max_steps_reached(config, executed_operations.len(), &report);
50 if report.complete || max_steps_reached {
51 return Ok(RestoreRunnerOutcome::ok(restore_run_execute_summary(
52 &journal,
53 executed_operations,
54 operation_receipts,
55 max_steps_reached,
56 config.updated_at.as_ref(),
57 )));
58 }
59
60 enforce_restore_run_executable(&journal, &report)?;
61 let prepared = restore_run_prepare_next_operation(config, &mut journal)?;
62 let sequence = prepared.sequence;
63 match restore_run_execute_prepared_operation(config, executor, &mut journal, prepared)? {
64 RestoreRunStepOutcome::Completed {
65 executed_operation,
66 operation_receipt,
67 } => {
68 executed_operations.push(executed_operation);
69 operation_receipts.push(operation_receipt);
70 }
71 RestoreRunStepOutcome::Failed {
72 executed_operation,
73 operation_receipt,
74 status,
75 } => {
76 executed_operations.push(executed_operation);
77 operation_receipts.push(operation_receipt);
78 let response = restore_run_execute_summary(
79 &journal,
80 executed_operations,
81 operation_receipts,
82 false,
83 config.updated_at.as_ref(),
84 );
85 return Ok(RestoreRunnerOutcome {
86 response,
87 error: Some(RestoreRunnerError::CommandFailed { sequence, status }),
88 });
89 }
90 }
91 }
92}
93
94fn restore_run_prepare_next_operation(
96 config: &RestoreRunnerConfig,
97 journal: &mut RestoreApplyJournal,
98) -> Result<RestoreRunPreparedOperation, RestoreRunnerError> {
99 let preview = journal.next_command_preview_with_config(&config.command);
100 enforce_restore_run_command_available(&preview)?;
101
102 let operation = preview
103 .operation
104 .clone()
105 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
106 let command = preview
107 .command
108 .clone()
109 .ok_or_else(|| restore_command_unavailable_error(&preview))?;
110 let sequence = operation.sequence;
111 let attempt = journal
112 .operation_receipts
113 .iter()
114 .filter(|receipt| receipt.sequence == sequence)
115 .count()
116 + 1;
117
118 enforce_apply_claim_sequence(sequence, journal)?;
119 journal
120 .mark_operation_pending_at(sequence, Some(state_updated_at(config.updated_at.as_ref())))?;
121 write_apply_journal_file(&config.journal, journal)?;
122
123 Ok(RestoreRunPreparedOperation {
124 operation,
125 command,
126 sequence,
127 attempt,
128 })
129}
130
131fn restore_run_execute_prepared_operation(
133 config: &RestoreRunnerConfig,
134 executor: &mut impl RestoreRunnerCommandExecutor,
135 journal: &mut RestoreApplyJournal,
136 prepared: RestoreRunPreparedOperation,
137) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
138 if prepared.command.requires_stopped_canister
139 && let Some(outcome) = enforce_stopped_canister_precondition(
140 config,
141 executor,
142 &prepared.operation,
143 prepared.attempt,
144 config.updated_at.as_ref(),
145 )?
146 {
147 return restore_run_commit_precondition_failure(config, journal, prepared, outcome);
148 }
149
150 let output = executor.execute(&prepared.command)?;
151 let status_label = output.status;
152 let output_pair = RestoreApplyCommandOutputPair::from_bytes(
153 &output.stdout,
154 &output.stderr,
155 RESTORE_RUN_OUTPUT_RECEIPT_LIMIT,
156 );
157
158 if output.success {
159 let is_upload_snapshot =
160 prepared.operation.operation == RestoreApplyOperationKind::UploadSnapshot;
161 let uploaded_snapshot_id = is_upload_snapshot
162 .then(|| parse_uploaded_snapshot_id(&String::from_utf8_lossy(&output.stdout)))
163 .flatten();
164 if is_upload_snapshot && uploaded_snapshot_id.is_none() {
165 return restore_run_commit_missing_uploaded_snapshot_id(
166 config,
167 journal,
168 prepared,
169 output_pair,
170 );
171 }
172
173 return restore_run_commit_command_success(
174 config,
175 journal,
176 prepared,
177 status_label,
178 output_pair,
179 uploaded_snapshot_id,
180 );
181 }
182
183 restore_run_commit_command_failure(config, journal, prepared, status_label, output_pair)
184}
185
186fn restore_run_commit_missing_uploaded_snapshot_id(
188 config: &RestoreRunnerConfig,
189 journal: &mut RestoreApplyJournal,
190 prepared: RestoreRunPreparedOperation,
191 output_pair: RestoreApplyCommandOutputPair,
192) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
193 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
194 let status = RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string();
195 journal.mark_operation_failed_at(
196 prepared.sequence,
197 status.clone(),
198 Some(failed_updated_at.clone()),
199 )?;
200 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
201 &prepared.operation,
202 prepared.command.clone(),
203 status.clone(),
204 Some(failed_updated_at.clone()),
205 output_pair,
206 prepared.attempt,
207 status.clone(),
208 ))?;
209 write_apply_journal_file(&config.journal, journal)?;
210
211 Ok(RestoreRunStepOutcome::Failed {
212 executed_operation: RestoreRunExecutedOperation::failed(
213 prepared.operation.clone(),
214 prepared.command.clone(),
215 RESTORE_RUN_MISSING_UPLOADED_SNAPSHOT_ID.to_string(),
216 ),
217 operation_receipt: RestoreRunOperationReceipt::failed(
218 prepared.operation,
219 prepared.command,
220 status.clone(),
221 Some(failed_updated_at),
222 ),
223 status,
224 })
225}
226
227fn restore_run_commit_precondition_failure(
229 config: &RestoreRunnerConfig,
230 journal: &mut RestoreApplyJournal,
231 prepared: RestoreRunPreparedOperation,
232 outcome: RestoreStoppedPreconditionFailure,
233) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
234 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
235 journal.mark_operation_failed_at(
236 prepared.sequence,
237 outcome.failure_reason.clone(),
238 Some(failed_updated_at.clone()),
239 )?;
240 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
241 &prepared.operation,
242 outcome.command.clone(),
243 outcome.status_label.clone(),
244 Some(failed_updated_at.clone()),
245 outcome.output,
246 prepared.attempt,
247 outcome.failure_reason,
248 ))?;
249 write_apply_journal_file(&config.journal, journal)?;
250
251 Ok(RestoreRunStepOutcome::Failed {
252 executed_operation: RestoreRunExecutedOperation::failed(
253 prepared.operation.clone(),
254 outcome.command.clone(),
255 outcome.status_label.clone(),
256 ),
257 operation_receipt: RestoreRunOperationReceipt::failed(
258 prepared.operation,
259 outcome.command,
260 outcome.status_label,
261 Some(failed_updated_at),
262 ),
263 status: RESTORE_RUN_STOPPED_PRECONDITION_FAILED.to_string(),
264 })
265}
266
267fn restore_run_commit_command_success(
269 config: &RestoreRunnerConfig,
270 journal: &mut RestoreApplyJournal,
271 prepared: RestoreRunPreparedOperation,
272 status_label: String,
273 output_pair: RestoreApplyCommandOutputPair,
274 uploaded_snapshot_id: Option<String>,
275) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
276 let completed_updated_at = state_updated_at(config.updated_at.as_ref());
277 journal.mark_operation_completed_at(prepared.sequence, Some(completed_updated_at.clone()))?;
278 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_completed(
279 &prepared.operation,
280 prepared.command.clone(),
281 status_label.clone(),
282 Some(completed_updated_at.clone()),
283 output_pair,
284 prepared.attempt,
285 uploaded_snapshot_id,
286 ))?;
287 write_apply_journal_file(&config.journal, journal)?;
288
289 Ok(RestoreRunStepOutcome::Completed {
290 executed_operation: RestoreRunExecutedOperation::completed(
291 prepared.operation.clone(),
292 prepared.command.clone(),
293 status_label.clone(),
294 ),
295 operation_receipt: RestoreRunOperationReceipt::completed(
296 prepared.operation,
297 prepared.command,
298 status_label,
299 Some(completed_updated_at),
300 ),
301 })
302}
303
304fn restore_run_commit_command_failure(
306 config: &RestoreRunnerConfig,
307 journal: &mut RestoreApplyJournal,
308 prepared: RestoreRunPreparedOperation,
309 status_label: String,
310 output_pair: RestoreApplyCommandOutputPair,
311) -> Result<RestoreRunStepOutcome, RestoreRunnerError> {
312 let failed_updated_at = state_updated_at(config.updated_at.as_ref());
313 let failure_reason = format!("{RESTORE_RUN_COMMAND_EXIT_PREFIX}-{status_label}");
314 journal.mark_operation_failed_at(
315 prepared.sequence,
316 failure_reason.clone(),
317 Some(failed_updated_at.clone()),
318 )?;
319 journal.record_operation_receipt(RestoreApplyOperationReceipt::command_failed(
320 &prepared.operation,
321 prepared.command.clone(),
322 status_label.clone(),
323 Some(failed_updated_at.clone()),
324 output_pair,
325 prepared.attempt,
326 failure_reason,
327 ))?;
328 write_apply_journal_file(&config.journal, journal)?;
329
330 Ok(RestoreRunStepOutcome::Failed {
331 executed_operation: RestoreRunExecutedOperation::failed(
332 prepared.operation.clone(),
333 prepared.command.clone(),
334 status_label.clone(),
335 ),
336 operation_receipt: RestoreRunOperationReceipt::failed(
337 prepared.operation,
338 prepared.command,
339 status_label.clone(),
340 Some(failed_updated_at),
341 ),
342 status: status_label,
343 })
344}
345
346fn restore_run_execute_summary(
348 journal: &RestoreApplyJournal,
349 executed_operations: Vec<RestoreRunExecutedOperation>,
350 operation_receipts: Vec<RestoreRunOperationReceipt>,
351 max_steps_reached: bool,
352 requested_state_updated_at: Option<&String>,
353) -> RestoreRunResponse {
354 let report = journal.report();
355 let executed_operation_count = executed_operations.len();
356 let stopped_reason = restore_run_stopped_reason(&report, max_steps_reached, true);
357 let next_action = restore_run_next_action(&report);
358
359 let mut response = RestoreRunResponse::from_report(
360 journal.backup_id.clone(),
361 report,
362 RestoreRunResponseMode::execute(stopped_reason, next_action),
363 );
364 response.set_requested_state_updated_at(requested_state_updated_at);
365 response.max_steps_reached = Some(max_steps_reached);
366 response.executed_operation_count = Some(executed_operation_count);
367 response.executed_operations = executed_operations;
368 response.set_operation_receipts(operation_receipts);
369 response
370}
371
372fn enforce_apply_claim_sequence(
374 expected: usize,
375 journal: &RestoreApplyJournal,
376) -> Result<(), RestoreRunnerError> {
377 let actual = journal
378 .next_transition_operation()
379 .map(|operation| operation.sequence);
380
381 if actual == Some(expected) {
382 return Ok(());
383 }
384
385 Err(RestoreRunnerError::ClaimSequenceMismatch { expected, actual })
386}