canic_backup/runner/
mod.rs1mod manifest;
2mod operations;
3mod support;
4mod types;
5
6pub use types::*;
7
8use crate::{
9 execution::{BackupExecutionJournal, BackupExecutionOperationState},
10 persistence::BackupLayout,
11 plan::BackupPlan,
12};
13use operations::execute_operation_receipt;
14use support::{BackupRunLock, state_updated_at, timestamp_marker, timestamp_seconds};
15
16const PREFLIGHT_TTL_SECONDS: u64 = 300;
17
18pub fn backup_run_execute_with_executor(
20 config: &BackupRunnerConfig,
21 executor: &mut impl BackupRunnerExecutor,
22) -> Result<BackupRunResponse, BackupRunnerError> {
23 let layout = BackupLayout::new(config.out.clone());
24 let _lock = BackupRunLock::acquire(&layout.execution_journal_path())?;
25 let mut plan = layout.read_backup_plan()?;
26 let mut journal = if layout.execution_journal_path().is_file() {
27 layout.read_execution_journal()?
28 } else {
29 let journal = BackupExecutionJournal::from_plan(&plan)?;
30 layout.write_execution_journal(&journal)?;
31 journal
32 };
33 layout.verify_execution_integrity()?;
34
35 accept_preflight_if_needed(config, executor, &layout, &mut plan, &mut journal)?;
36 execute_ready_operations(config, executor, &layout, &plan, &mut journal)
37}
38
39fn accept_preflight_if_needed(
40 config: &BackupRunnerConfig,
41 executor: &mut impl BackupRunnerExecutor,
42 layout: &BackupLayout,
43 plan: &mut BackupPlan,
44 journal: &mut BackupExecutionJournal,
45) -> Result<(), BackupRunnerError> {
46 if journal.preflight_accepted {
47 return Ok(());
48 }
49
50 let validated_at = state_updated_at(config.updated_at.as_ref());
51 let expires_at = timestamp_marker(timestamp_seconds(&validated_at) + PREFLIGHT_TTL_SECONDS);
52 let preflight_id = format!("preflight-{}", plan.run_id);
53 let receipts = executor
54 .preflight_receipts(plan, &preflight_id, &validated_at, &expires_at)
55 .map_err(|error| BackupRunnerError::PreflightFailed {
56 status: error.status,
57 message: error.message,
58 })?;
59 plan.apply_execution_preflight_receipts(&receipts, &validated_at)?;
60 layout.write_backup_plan(plan)?;
61 journal.accept_preflight_receipts_at(&receipts, Some(validated_at))?;
62 layout.write_execution_journal(journal)?;
63 Ok(())
64}
65
66fn execute_ready_operations(
67 config: &BackupRunnerConfig,
68 executor: &mut impl BackupRunnerExecutor,
69 layout: &BackupLayout,
70 plan: &BackupPlan,
71 journal: &mut BackupExecutionJournal,
72) -> Result<BackupRunResponse, BackupRunnerError> {
73 let mut executed = Vec::new();
74
75 loop {
76 let summary = journal.resume_summary();
77 if summary.completed_operations + summary.skipped_operations == summary.total_operations {
78 return Ok(run_response(plan, journal, executed, false));
79 }
80 if config
81 .max_steps
82 .is_some_and(|max_steps| executed.len() >= max_steps)
83 {
84 return Ok(run_response(plan, journal, executed, true));
85 }
86
87 let operation = journal
88 .next_ready_operation()
89 .cloned()
90 .ok_or(BackupRunnerError::NoReadyOperation)?;
91 if operation.state == BackupExecutionOperationState::Blocked {
92 return Err(BackupRunnerError::Blocked {
93 reasons: operation.blocking_reasons,
94 });
95 }
96
97 if operation.state != BackupExecutionOperationState::Pending {
98 journal.mark_operation_pending_at(
99 operation.sequence,
100 Some(state_updated_at(config.updated_at.as_ref())),
101 )?;
102 layout.write_execution_journal(journal)?;
103 }
104
105 match execute_operation_receipt(config, executor, layout, plan, journal, &operation) {
106 Ok(receipt) => {
107 journal.record_operation_receipt(receipt)?;
108 layout.write_execution_journal(journal)?;
109 executed.push(BackupRunExecutedOperation::completed(&operation));
110 }
111 Err(error) => {
112 let receipt = crate::execution::BackupExecutionOperationReceipt::failed(
113 journal,
114 &operation,
115 Some(state_updated_at(config.updated_at.as_ref())),
116 error.to_string(),
117 );
118 journal.record_operation_receipt(receipt)?;
119 layout.write_execution_journal(journal)?;
120 executed.push(BackupRunExecutedOperation::failed(&operation));
121 return Err(error);
122 }
123 }
124 }
125}
126
127fn run_response(
128 plan: &BackupPlan,
129 journal: &BackupExecutionJournal,
130 executed: Vec<BackupRunExecutedOperation>,
131 max_steps_reached: bool,
132) -> BackupRunResponse {
133 let execution = journal.resume_summary();
134 BackupRunResponse {
135 run_id: plan.run_id.clone(),
136 plan_id: plan.plan_id.clone(),
137 backup_id: plan.run_id.clone(),
138 complete: execution.completed_operations + execution.skipped_operations
139 == execution.total_operations,
140 max_steps_reached,
141 executed_operation_count: executed.len(),
142 executed_operations: executed,
143 execution,
144 }
145}
146
147#[cfg(test)]
148mod tests;