Skip to main content

canic_backup/runner/
mod.rs

1mod manifest;
2mod operations;
3mod support;
4mod types;
5
6pub use types::*;
7
8use crate::{
9    execution::{BackupExecutionJournal, BackupExecutionOperationState},
10    persistence::BackupLayout,
11    plan::BackupPlan,
12};
13use operations::execute_operation_receipt;
14use support::{BackupRunLock, state_updated_at, timestamp_marker, timestamp_seconds};
15
16const PREFLIGHT_TTL_SECONDS: u64 = 300;
17
18/// Execute a persisted backup plan through an injected host executor.
19pub fn backup_run_execute_with_executor(
20    config: &BackupRunnerConfig,
21    executor: &mut impl BackupRunnerExecutor,
22) -> Result<BackupRunResponse, BackupRunnerError> {
23    let layout = BackupLayout::new(config.out.clone());
24    let _lock = BackupRunLock::acquire(&layout.execution_journal_path())?;
25    let mut plan = layout.read_backup_plan()?;
26    let mut journal = if layout.execution_journal_path().is_file() {
27        layout.read_execution_journal()?
28    } else {
29        let journal = BackupExecutionJournal::from_plan(&plan)?;
30        layout.write_execution_journal(&journal)?;
31        journal
32    };
33    layout.verify_execution_integrity()?;
34
35    accept_preflight_if_needed(config, executor, &layout, &mut plan, &mut journal)?;
36    execute_ready_operations(config, executor, &layout, &plan, &mut journal)
37}
38
39fn accept_preflight_if_needed(
40    config: &BackupRunnerConfig,
41    executor: &mut impl BackupRunnerExecutor,
42    layout: &BackupLayout,
43    plan: &mut BackupPlan,
44    journal: &mut BackupExecutionJournal,
45) -> Result<(), BackupRunnerError> {
46    if journal.preflight_accepted {
47        return Ok(());
48    }
49
50    let validated_at = state_updated_at(config.updated_at.as_ref());
51    let expires_at = timestamp_marker(timestamp_seconds(&validated_at) + PREFLIGHT_TTL_SECONDS);
52    let preflight_id = format!("preflight-{}", plan.run_id);
53    let receipts = executor
54        .preflight_receipts(plan, &preflight_id, &validated_at, &expires_at)
55        .map_err(|error| BackupRunnerError::PreflightFailed {
56            status: error.status,
57            message: error.message,
58        })?;
59    plan.apply_execution_preflight_receipts(&receipts, &validated_at)?;
60    layout.write_backup_plan(plan)?;
61    journal.accept_preflight_receipts_at(&receipts, Some(validated_at))?;
62    layout.write_execution_journal(journal)?;
63    Ok(())
64}
65
66fn execute_ready_operations(
67    config: &BackupRunnerConfig,
68    executor: &mut impl BackupRunnerExecutor,
69    layout: &BackupLayout,
70    plan: &BackupPlan,
71    journal: &mut BackupExecutionJournal,
72) -> Result<BackupRunResponse, BackupRunnerError> {
73    let mut executed = Vec::new();
74
75    loop {
76        let summary = journal.resume_summary();
77        if summary.completed_operations + summary.skipped_operations == summary.total_operations {
78            return Ok(run_response(plan, journal, executed, false));
79        }
80        if config
81            .max_steps
82            .is_some_and(|max_steps| executed.len() >= max_steps)
83        {
84            return Ok(run_response(plan, journal, executed, true));
85        }
86
87        let operation = journal
88            .next_ready_operation()
89            .cloned()
90            .ok_or(BackupRunnerError::NoReadyOperation)?;
91        if operation.state == BackupExecutionOperationState::Blocked {
92            return Err(BackupRunnerError::Blocked {
93                reasons: operation.blocking_reasons,
94            });
95        }
96
97        if operation.state != BackupExecutionOperationState::Pending {
98            journal.mark_operation_pending_at(
99                operation.sequence,
100                Some(state_updated_at(config.updated_at.as_ref())),
101            )?;
102            layout.write_execution_journal(journal)?;
103        }
104
105        match execute_operation_receipt(config, executor, layout, plan, journal, &operation) {
106            Ok(receipt) => {
107                journal.record_operation_receipt(receipt)?;
108                layout.write_execution_journal(journal)?;
109                executed.push(BackupRunExecutedOperation::completed(&operation));
110            }
111            Err(error) => {
112                let receipt = crate::execution::BackupExecutionOperationReceipt::failed(
113                    journal,
114                    &operation,
115                    Some(state_updated_at(config.updated_at.as_ref())),
116                    error.to_string(),
117                );
118                journal.record_operation_receipt(receipt)?;
119                layout.write_execution_journal(journal)?;
120                executed.push(BackupRunExecutedOperation::failed(&operation));
121                return Err(error);
122            }
123        }
124    }
125}
126
127fn run_response(
128    plan: &BackupPlan,
129    journal: &BackupExecutionJournal,
130    executed: Vec<BackupRunExecutedOperation>,
131    max_steps_reached: bool,
132) -> BackupRunResponse {
133    let execution = journal.resume_summary();
134    BackupRunResponse {
135        run_id: plan.run_id.clone(),
136        plan_id: plan.plan_id.clone(),
137        backup_id: plan.run_id.clone(),
138        complete: execution.completed_operations + execution.skipped_operations
139            == execution.total_operations,
140        max_steps_reached,
141        executed_operation_count: executed.len(),
142        executed_operations: executed,
143        execution,
144    }
145}
146
147#[cfg(test)]
148mod tests;