Skip to main content

canic_backup/runner/
mod.rs

1mod types;
2
3pub use types::*;
4
5use crate::{
6    artifacts::ArtifactChecksum,
7    execution::{
8        BackupExecutionJournal, BackupExecutionJournalOperation, BackupExecutionOperationReceipt,
9        BackupExecutionOperationState,
10    },
11    journal::{ArtifactJournalEntry, ArtifactState, DownloadJournal, DownloadOperationMetrics},
12    manifest::{
13        BackupUnit, BackupUnitKind, ConsistencySection, FleetBackupManifest, FleetMember,
14        FleetSection, SourceMetadata, SourceSnapshot, ToolMetadata, VerificationCheck,
15        VerificationPlan,
16    },
17    persistence::BackupLayout,
18    plan::{BackupOperationKind, BackupPlan, ControlAuthoritySource},
19    timestamp::current_timestamp_marker,
20};
21use std::{
22    fs,
23    io::{self, Write},
24    path::{Path, PathBuf},
25    time::{SystemTime, UNIX_EPOCH},
26};
27
28const PREFLIGHT_TTL_SECONDS: u64 = 300;
29
30/// Execute a persisted backup plan through an injected host executor.
31pub fn backup_run_execute_with_executor(
32    config: &BackupRunnerConfig,
33    executor: &mut impl BackupRunnerExecutor,
34) -> Result<BackupRunResponse, BackupRunnerError> {
35    let layout = BackupLayout::new(config.out.clone());
36    let _lock = BackupRunLock::acquire(&layout.execution_journal_path())?;
37    let mut plan = layout.read_backup_plan()?;
38    let mut journal = if layout.execution_journal_path().is_file() {
39        layout.read_execution_journal()?
40    } else {
41        let journal = BackupExecutionJournal::from_plan(&plan)?;
42        layout.write_execution_journal(&journal)?;
43        journal
44    };
45    layout.verify_execution_integrity()?;
46
47    accept_preflight_if_needed(config, executor, &layout, &mut plan, &mut journal)?;
48    execute_ready_operations(config, executor, &layout, &plan, &mut journal)
49}
50
51fn accept_preflight_if_needed(
52    config: &BackupRunnerConfig,
53    executor: &mut impl BackupRunnerExecutor,
54    layout: &BackupLayout,
55    plan: &mut BackupPlan,
56    journal: &mut BackupExecutionJournal,
57) -> Result<(), BackupRunnerError> {
58    if journal.preflight_accepted {
59        return Ok(());
60    }
61
62    let validated_at = state_updated_at(config.updated_at.as_ref());
63    let expires_at = timestamp_marker(timestamp_seconds(&validated_at) + PREFLIGHT_TTL_SECONDS);
64    let preflight_id = format!("preflight-{}", plan.run_id);
65    let receipts = executor
66        .preflight_receipts(plan, &preflight_id, &validated_at, &expires_at)
67        .map_err(|error| BackupRunnerError::PreflightFailed {
68            status: error.status,
69            message: error.message,
70        })?;
71    plan.apply_execution_preflight_receipts(&receipts, &validated_at)?;
72    layout.write_backup_plan(plan)?;
73    journal.accept_preflight_receipts_at(&receipts, Some(validated_at))?;
74    layout.write_execution_journal(journal)?;
75    Ok(())
76}
77
78fn execute_ready_operations(
79    config: &BackupRunnerConfig,
80    executor: &mut impl BackupRunnerExecutor,
81    layout: &BackupLayout,
82    plan: &BackupPlan,
83    journal: &mut BackupExecutionJournal,
84) -> Result<BackupRunResponse, BackupRunnerError> {
85    let mut executed = Vec::new();
86
87    loop {
88        let summary = journal.resume_summary();
89        if summary.completed_operations + summary.skipped_operations == summary.total_operations {
90            return Ok(run_response(plan, journal, executed, false));
91        }
92        if config
93            .max_steps
94            .is_some_and(|max_steps| executed.len() >= max_steps)
95        {
96            return Ok(run_response(plan, journal, executed, true));
97        }
98
99        let operation = journal
100            .next_ready_operation()
101            .cloned()
102            .ok_or(BackupRunnerError::NoReadyOperation)?;
103        if operation.state == BackupExecutionOperationState::Blocked {
104            return Err(BackupRunnerError::Blocked {
105                reasons: operation.blocking_reasons,
106            });
107        }
108
109        if operation.state != BackupExecutionOperationState::Pending {
110            journal.mark_operation_pending_at(
111                operation.sequence,
112                Some(state_updated_at(config.updated_at.as_ref())),
113            )?;
114            layout.write_execution_journal(journal)?;
115        }
116
117        match execute_operation_receipt(config, executor, layout, plan, journal, &operation) {
118            Ok(receipt) => {
119                journal.record_operation_receipt(receipt)?;
120                layout.write_execution_journal(journal)?;
121                executed.push(BackupRunExecutedOperation::completed(&operation));
122            }
123            Err(error) => {
124                let receipt = BackupExecutionOperationReceipt::failed(
125                    journal,
126                    &operation,
127                    Some(state_updated_at(config.updated_at.as_ref())),
128                    error.to_string(),
129                );
130                journal.record_operation_receipt(receipt)?;
131                layout.write_execution_journal(journal)?;
132                executed.push(BackupRunExecutedOperation::failed(&operation));
133                return Err(error);
134            }
135        }
136    }
137}
138
139fn execute_operation_receipt(
140    config: &BackupRunnerConfig,
141    executor: &mut impl BackupRunnerExecutor,
142    layout: &BackupLayout,
143    plan: &BackupPlan,
144    journal: &BackupExecutionJournal,
145    operation: &BackupExecutionJournalOperation,
146) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
147    match operation.kind {
148        BackupOperationKind::Stop => execute_stop(executor, journal, operation),
149        BackupOperationKind::CreateSnapshot => {
150            execute_create_snapshot(executor, layout, plan, journal, operation)
151        }
152        BackupOperationKind::Start => execute_start(executor, journal, operation),
153        BackupOperationKind::DownloadSnapshot => {
154            execute_download_snapshot(executor, layout, journal, operation)
155        }
156        BackupOperationKind::VerifyArtifact => execute_verify_artifact(layout, journal, operation),
157        BackupOperationKind::FinalizeManifest => {
158            execute_finalize_manifest(config, layout, plan, journal, operation)
159        }
160        BackupOperationKind::ValidateTopology
161        | BackupOperationKind::ValidateControlAuthority
162        | BackupOperationKind::ValidateSnapshotReadAuthority
163        | BackupOperationKind::ValidateQuiescencePolicy => {
164            Ok(BackupExecutionOperationReceipt::completed(
165                journal,
166                operation,
167                Some(state_updated_at(config.updated_at.as_ref())),
168            ))
169        }
170    }
171}
172
173fn execute_stop(
174    executor: &mut impl BackupRunnerExecutor,
175    journal: &BackupExecutionJournal,
176    operation: &BackupExecutionJournalOperation,
177) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
178    let target = operation_target(operation)?;
179    executor
180        .stop_canister(&target)
181        .map_err(|error| command_failed(operation.sequence, error))?;
182    Ok(BackupExecutionOperationReceipt::completed(
183        journal,
184        operation,
185        Some(current_timestamp_marker()),
186    ))
187}
188
189fn execute_start(
190    executor: &mut impl BackupRunnerExecutor,
191    journal: &BackupExecutionJournal,
192    operation: &BackupExecutionJournalOperation,
193) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
194    let target = operation_target(operation)?;
195    executor
196        .start_canister(&target)
197        .map_err(|error| command_failed(operation.sequence, error))?;
198    Ok(BackupExecutionOperationReceipt::completed(
199        journal,
200        operation,
201        Some(current_timestamp_marker()),
202    ))
203}
204
205fn execute_create_snapshot(
206    executor: &mut impl BackupRunnerExecutor,
207    layout: &BackupLayout,
208    plan: &BackupPlan,
209    journal: &BackupExecutionJournal,
210    operation: &BackupExecutionJournalOperation,
211) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
212    let target = operation_target(operation)?;
213    let snapshot_id = executor
214        .create_snapshot(&target)
215        .map_err(|error| command_failed(operation.sequence, error))?;
216    let mut receipt = BackupExecutionOperationReceipt::completed(
217        journal,
218        operation,
219        Some(current_timestamp_marker()),
220    );
221    receipt.snapshot_id = Some(snapshot_id.clone());
222
223    let mut download_journal = read_or_new_download_journal(layout, plan, journal)?;
224    upsert_artifact_entry(
225        &mut download_journal,
226        ArtifactJournalEntry {
227            canister_id: target.clone(),
228            snapshot_id,
229            state: ArtifactState::Created,
230            temp_path: None,
231            artifact_path: artifact_relative_path(&target),
232            checksum_algorithm: "sha256".to_string(),
233            checksum: None,
234            updated_at: current_timestamp_marker(),
235        },
236    );
237    layout.write_journal(&download_journal)?;
238    Ok(receipt)
239}
240
241fn execute_download_snapshot(
242    executor: &mut impl BackupRunnerExecutor,
243    layout: &BackupLayout,
244    journal: &BackupExecutionJournal,
245    operation: &BackupExecutionJournalOperation,
246) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
247    let target = operation_target(operation)?;
248    let snapshot_id = snapshot_id_for_target(journal, operation.sequence, &target)?;
249    let temp_path = artifact_temp_path(layout.root(), &target);
250    if temp_path.exists() {
251        fs::remove_dir_all(&temp_path)?;
252    }
253    fs::create_dir_all(&temp_path)?;
254    executor
255        .download_snapshot(&target, &snapshot_id, &temp_path)
256        .map_err(|error| command_failed(operation.sequence, error))?;
257
258    let mut download_journal = layout.read_journal()?;
259    let entry = artifact_entry_mut(&mut download_journal, operation.sequence, &target)?;
260    entry.temp_path = Some(temp_path.display().to_string());
261    entry.advance_to(ArtifactState::Downloaded, current_timestamp_marker())?;
262    layout.write_journal(&download_journal)?;
263
264    let mut receipt = BackupExecutionOperationReceipt::completed(
265        journal,
266        operation,
267        Some(current_timestamp_marker()),
268    );
269    receipt.artifact_path = Some(artifact_relative_path(&target));
270    Ok(receipt)
271}
272
273fn execute_verify_artifact(
274    layout: &BackupLayout,
275    journal: &BackupExecutionJournal,
276    operation: &BackupExecutionJournalOperation,
277) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
278    let target = operation_target(operation)?;
279    let mut download_journal = layout.read_journal()?;
280    let entry = artifact_entry_mut(&mut download_journal, operation.sequence, &target)?;
281    let temp_path =
282        entry
283            .temp_path
284            .as_deref()
285            .ok_or_else(|| BackupRunnerError::MissingArtifactEntry {
286                sequence: operation.sequence,
287                target_canister_id: target.clone(),
288            })?;
289    let checksum = ArtifactChecksum::from_path(Path::new(temp_path))?;
290    entry.checksum = Some(checksum.hash.clone());
291    entry.advance_to(ArtifactState::ChecksumVerified, current_timestamp_marker())?;
292    layout.write_journal(&download_journal)?;
293
294    let mut receipt = BackupExecutionOperationReceipt::completed(
295        journal,
296        operation,
297        Some(current_timestamp_marker()),
298    );
299    receipt.checksum = Some(checksum.hash);
300    Ok(receipt)
301}
302
303fn execute_finalize_manifest(
304    config: &BackupRunnerConfig,
305    layout: &BackupLayout,
306    plan: &BackupPlan,
307    journal: &BackupExecutionJournal,
308    operation: &BackupExecutionJournalOperation,
309) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
310    let mut download_journal = layout.read_journal()?;
311    for index in 0..download_journal.artifacts.len() {
312        if download_journal.artifacts[index].state == ArtifactState::Durable {
313            continue;
314        }
315        let canister_id = download_journal.artifacts[index].canister_id.clone();
316        let temp_path = download_journal.artifacts[index].temp_path.clone().ok_or(
317            BackupRunnerError::MissingArtifactEntry {
318                sequence: operation.sequence,
319                target_canister_id: canister_id,
320            },
321        )?;
322        let artifact_path = layout
323            .root()
324            .join(&download_journal.artifacts[index].artifact_path);
325        if artifact_path.exists() {
326            return Err(io::Error::new(
327                io::ErrorKind::AlreadyExists,
328                format!("artifact path already exists: {}", artifact_path.display()),
329            )
330            .into());
331        }
332        fs::rename(&temp_path, artifact_path)?;
333        download_journal.artifacts[index].temp_path = None;
334        download_journal.artifacts[index]
335            .advance_to(ArtifactState::Durable, current_timestamp_marker())?;
336        layout.write_journal(&download_journal)?;
337    }
338
339    let manifest = build_manifest(config, plan, &download_journal)?;
340    layout.write_manifest(&manifest)?;
341    Ok(BackupExecutionOperationReceipt::completed(
342        journal,
343        operation,
344        Some(current_timestamp_marker()),
345    ))
346}
347
348fn build_manifest(
349    config: &BackupRunnerConfig,
350    plan: &BackupPlan,
351    journal: &DownloadJournal,
352) -> Result<FleetBackupManifest, BackupRunnerError> {
353    let roles = plan
354        .targets
355        .iter()
356        .enumerate()
357        .map(|(index, target)| target_role(index, target.role.as_deref()))
358        .collect::<Vec<_>>();
359    let manifest = FleetBackupManifest {
360        manifest_version: 1,
361        backup_id: plan.run_id.clone(),
362        created_at: state_updated_at(config.updated_at.as_ref()),
363        tool: ToolMetadata {
364            name: config.tool_name.clone(),
365            version: config.tool_version.clone(),
366        },
367        source: SourceMetadata {
368            environment: plan.network.clone(),
369            root_canister: plan.root_canister_id.clone(),
370        },
371        consistency: ConsistencySection {
372            backup_units: vec![BackupUnit {
373                unit_id: "backup-selection".to_string(),
374                kind: if plan.targets.len() == 1 {
375                    BackupUnitKind::Single
376                } else {
377                    BackupUnitKind::Subtree
378                },
379                roles,
380            }],
381        },
382        fleet: FleetSection {
383            topology_hash_algorithm: "sha256".to_string(),
384            topology_hash_input: format!("canic-backup-plan:{}", plan.plan_id),
385            discovery_topology_hash: plan.topology_hash_before_quiesce.clone(),
386            pre_snapshot_topology_hash: plan.topology_hash_before_quiesce.clone(),
387            topology_hash: plan.topology_hash_before_quiesce.clone(),
388            members: plan
389                .targets
390                .iter()
391                .enumerate()
392                .map(|(index, target)| {
393                    let role = target_role(index, target.role.as_deref());
394                    let entry = journal
395                        .artifacts
396                        .iter()
397                        .find(|entry| {
398                            entry.canister_id == target.canister_id
399                                && entry.state == ArtifactState::Durable
400                        })
401                        .ok_or_else(|| BackupRunnerError::MissingArtifactEntry {
402                            sequence: usize::MAX,
403                            target_canister_id: target.canister_id.clone(),
404                        })?;
405                    Ok(FleetMember {
406                        role: role.clone(),
407                        canister_id: target.canister_id.clone(),
408                        parent_canister_id: target.parent_canister_id.clone(),
409                        subnet_canister_id: None,
410                        controller_hint: controller_hint(plan, target),
411                        identity_mode: target.identity_mode.clone(),
412                        verification_checks: vec![VerificationCheck {
413                            kind: "status".to_string(),
414                            roles: vec![role],
415                        }],
416                        source_snapshot: SourceSnapshot {
417                            snapshot_id: entry.snapshot_id.clone(),
418                            module_hash: target.expected_module_hash.clone(),
419                            code_version: None,
420                            artifact_path: entry.artifact_path.clone(),
421                            checksum_algorithm: entry.checksum_algorithm.clone(),
422                            checksum: entry.checksum.clone(),
423                        },
424                    })
425                })
426                .collect::<Result<Vec<_>, BackupRunnerError>>()?,
427        },
428        verification: VerificationPlan::default(),
429    };
430    manifest.validate()?;
431    Ok(manifest)
432}
433
434fn controller_hint(plan: &BackupPlan, target: &crate::plan::BackupTarget) -> Option<String> {
435    if matches!(
436        target.control_authority.source,
437        ControlAuthoritySource::RootController
438    ) {
439        Some(plan.root_canister_id.clone())
440    } else {
441        None
442    }
443}
444
445fn run_response(
446    plan: &BackupPlan,
447    journal: &BackupExecutionJournal,
448    executed: Vec<BackupRunExecutedOperation>,
449    max_steps_reached: bool,
450) -> BackupRunResponse {
451    let execution = journal.resume_summary();
452    BackupRunResponse {
453        run_id: plan.run_id.clone(),
454        plan_id: plan.plan_id.clone(),
455        backup_id: plan.run_id.clone(),
456        complete: execution.completed_operations + execution.skipped_operations
457            == execution.total_operations,
458        max_steps_reached,
459        executed_operation_count: executed.len(),
460        executed_operations: executed,
461        execution,
462    }
463}
464
465fn read_or_new_download_journal(
466    layout: &BackupLayout,
467    plan: &BackupPlan,
468    journal: &BackupExecutionJournal,
469) -> Result<DownloadJournal, BackupRunnerError> {
470    if layout.journal_path().is_file() {
471        let mut journal = layout.read_journal()?;
472        journal.discovery_topology_hash = Some(plan.topology_hash_before_quiesce.clone());
473        journal.pre_snapshot_topology_hash = Some(plan.topology_hash_before_quiesce.clone());
474        return Ok(journal);
475    }
476
477    Ok(DownloadJournal {
478        journal_version: 1,
479        backup_id: journal.run_id.clone(),
480        discovery_topology_hash: Some(plan.topology_hash_before_quiesce.clone()),
481        pre_snapshot_topology_hash: Some(plan.topology_hash_before_quiesce.clone()),
482        operation_metrics: DownloadOperationMetrics::default(),
483        artifacts: Vec::new(),
484    })
485}
486
487fn upsert_artifact_entry(journal: &mut DownloadJournal, entry: ArtifactJournalEntry) {
488    if let Some(existing) = journal
489        .artifacts
490        .iter_mut()
491        .find(|existing| existing.canister_id == entry.canister_id)
492    {
493        *existing = entry;
494    } else {
495        journal.operation_metrics.target_count = journal.artifacts.len() + 1;
496        journal.artifacts.push(entry);
497    }
498}
499
500fn artifact_entry_mut<'a>(
501    journal: &'a mut DownloadJournal,
502    sequence: usize,
503    target: &str,
504) -> Result<&'a mut ArtifactJournalEntry, BackupRunnerError> {
505    journal
506        .artifacts
507        .iter_mut()
508        .find(|entry| entry.canister_id == target)
509        .ok_or_else(|| BackupRunnerError::MissingArtifactEntry {
510            sequence,
511            target_canister_id: target.to_string(),
512        })
513}
514
515fn snapshot_id_for_target(
516    journal: &BackupExecutionJournal,
517    sequence: usize,
518    target: &str,
519) -> Result<String, BackupRunnerError> {
520    journal
521        .operation_receipts
522        .iter()
523        .rev()
524        .find(|receipt| {
525            receipt.kind == BackupOperationKind::CreateSnapshot
526                && receipt.target_canister_id.as_deref() == Some(target)
527                && receipt.snapshot_id.is_some()
528        })
529        .and_then(|receipt| receipt.snapshot_id.clone())
530        .ok_or_else(|| BackupRunnerError::MissingSnapshotId {
531            sequence,
532            target_canister_id: target.to_string(),
533        })
534}
535
536fn operation_target(
537    operation: &BackupExecutionJournalOperation,
538) -> Result<String, BackupRunnerError> {
539    operation
540        .target_canister_id
541        .clone()
542        .ok_or(BackupRunnerError::MissingOperationTarget {
543            sequence: operation.sequence,
544        })
545}
546
547fn command_failed(sequence: usize, error: BackupRunnerCommandError) -> BackupRunnerError {
548    BackupRunnerError::CommandFailed {
549        sequence,
550        status: error.status,
551        message: error.message,
552    }
553}
554
555fn artifact_relative_path(canister_id: &str) -> String {
556    safe_path_segment(canister_id)
557}
558
559fn artifact_temp_path(root: &Path, canister_id: &str) -> PathBuf {
560    root.join(format!("{}.tmp", safe_path_segment(canister_id)))
561}
562
563fn safe_path_segment(value: &str) -> String {
564    value
565        .chars()
566        .map(|ch| match ch {
567            'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
568            _ => '_',
569        })
570        .collect()
571}
572
573fn target_role(index: usize, role: Option<&str>) -> String {
574    role.map_or_else(|| format!("member-{index}"), str::to_string)
575}
576
577fn state_updated_at(updated_at: Option<&String>) -> String {
578    updated_at.cloned().unwrap_or_else(current_timestamp_marker)
579}
580
581fn timestamp_seconds(marker: &str) -> u64 {
582    marker
583        .strip_prefix("unix:")
584        .and_then(|seconds| seconds.parse::<u64>().ok())
585        .unwrap_or_else(current_unix_seconds)
586}
587
588fn timestamp_marker(seconds: u64) -> String {
589    format!("unix:{seconds}")
590}
591
592fn current_unix_seconds() -> u64 {
593    SystemTime::now()
594        .duration_since(UNIX_EPOCH)
595        .map_or(0, |duration| duration.as_secs())
596}
597
598struct BackupRunLock {
599    path: PathBuf,
600}
601
602impl BackupRunLock {
603    fn acquire(journal_path: &Path) -> Result<Self, BackupRunnerError> {
604        let path = journal_lock_path(journal_path);
605        match fs::OpenOptions::new()
606            .write(true)
607            .create_new(true)
608            .open(&path)
609        {
610            Ok(mut file) => {
611                writeln!(file, "pid={}", std::process::id())?;
612                Ok(Self { path })
613            }
614            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
615                Err(BackupRunnerError::JournalLocked {
616                    lock_path: path.to_string_lossy().to_string(),
617                })
618            }
619            Err(error) => Err(error.into()),
620        }
621    }
622}
623
624impl Drop for BackupRunLock {
625    fn drop(&mut self) {
626        let _ = fs::remove_file(&self.path);
627    }
628}
629
630fn journal_lock_path(path: &Path) -> PathBuf {
631    let mut lock_path = path.as_os_str().to_os_string();
632    lock_path.push(".lock");
633    PathBuf::from(lock_path)
634}
635
636#[cfg(test)]
637mod tests;