1mod types;
2
3pub use types::*;
4
5use crate::{
6 artifacts::ArtifactChecksum,
7 execution::{
8 BackupExecutionJournal, BackupExecutionJournalOperation, BackupExecutionOperationReceipt,
9 BackupExecutionOperationState,
10 },
11 journal::{ArtifactJournalEntry, ArtifactState, DownloadJournal, DownloadOperationMetrics},
12 manifest::{
13 BackupUnit, BackupUnitKind, ConsistencySection, FleetBackupManifest, FleetMember,
14 FleetSection, SourceMetadata, SourceSnapshot, ToolMetadata, VerificationCheck,
15 VerificationPlan,
16 },
17 persistence::BackupLayout,
18 plan::{BackupOperationKind, BackupPlan, ControlAuthoritySource},
19 timestamp::current_timestamp_marker,
20};
21use std::{
22 fs,
23 io::{self, Write},
24 path::{Path, PathBuf},
25 time::{SystemTime, UNIX_EPOCH},
26};
27
28const PREFLIGHT_TTL_SECONDS: u64 = 300;
29
30pub fn backup_run_execute_with_executor(
32 config: &BackupRunnerConfig,
33 executor: &mut impl BackupRunnerExecutor,
34) -> Result<BackupRunResponse, BackupRunnerError> {
35 let layout = BackupLayout::new(config.out.clone());
36 let _lock = BackupRunLock::acquire(&layout.execution_journal_path())?;
37 let mut plan = layout.read_backup_plan()?;
38 let mut journal = if layout.execution_journal_path().is_file() {
39 layout.read_execution_journal()?
40 } else {
41 let journal = BackupExecutionJournal::from_plan(&plan)?;
42 layout.write_execution_journal(&journal)?;
43 journal
44 };
45 layout.verify_execution_integrity()?;
46
47 accept_preflight_if_needed(config, executor, &layout, &mut plan, &mut journal)?;
48 execute_ready_operations(config, executor, &layout, &plan, &mut journal)
49}
50
51fn accept_preflight_if_needed(
52 config: &BackupRunnerConfig,
53 executor: &mut impl BackupRunnerExecutor,
54 layout: &BackupLayout,
55 plan: &mut BackupPlan,
56 journal: &mut BackupExecutionJournal,
57) -> Result<(), BackupRunnerError> {
58 if journal.preflight_accepted {
59 return Ok(());
60 }
61
62 let validated_at = state_updated_at(config.updated_at.as_ref());
63 let expires_at = timestamp_marker(timestamp_seconds(&validated_at) + PREFLIGHT_TTL_SECONDS);
64 let preflight_id = format!("preflight-{}", plan.run_id);
65 let receipts = executor
66 .preflight_receipts(plan, &preflight_id, &validated_at, &expires_at)
67 .map_err(|error| BackupRunnerError::PreflightFailed {
68 status: error.status,
69 message: error.message,
70 })?;
71 plan.apply_execution_preflight_receipts(&receipts, &validated_at)?;
72 layout.write_backup_plan(plan)?;
73 journal.accept_preflight_receipts_at(&receipts, Some(validated_at))?;
74 layout.write_execution_journal(journal)?;
75 Ok(())
76}
77
78fn execute_ready_operations(
79 config: &BackupRunnerConfig,
80 executor: &mut impl BackupRunnerExecutor,
81 layout: &BackupLayout,
82 plan: &BackupPlan,
83 journal: &mut BackupExecutionJournal,
84) -> Result<BackupRunResponse, BackupRunnerError> {
85 let mut executed = Vec::new();
86
87 loop {
88 let summary = journal.resume_summary();
89 if summary.completed_operations + summary.skipped_operations == summary.total_operations {
90 return Ok(run_response(plan, journal, executed, false));
91 }
92 if config
93 .max_steps
94 .is_some_and(|max_steps| executed.len() >= max_steps)
95 {
96 return Ok(run_response(plan, journal, executed, true));
97 }
98
99 let operation = journal
100 .next_ready_operation()
101 .cloned()
102 .ok_or(BackupRunnerError::NoReadyOperation)?;
103 if operation.state == BackupExecutionOperationState::Blocked {
104 return Err(BackupRunnerError::Blocked {
105 reasons: operation.blocking_reasons,
106 });
107 }
108
109 if operation.state != BackupExecutionOperationState::Pending {
110 journal.mark_operation_pending_at(
111 operation.sequence,
112 Some(state_updated_at(config.updated_at.as_ref())),
113 )?;
114 layout.write_execution_journal(journal)?;
115 }
116
117 match execute_operation_receipt(config, executor, layout, plan, journal, &operation) {
118 Ok(receipt) => {
119 journal.record_operation_receipt(receipt)?;
120 layout.write_execution_journal(journal)?;
121 executed.push(BackupRunExecutedOperation::completed(&operation));
122 }
123 Err(error) => {
124 let receipt = BackupExecutionOperationReceipt::failed(
125 journal,
126 &operation,
127 Some(state_updated_at(config.updated_at.as_ref())),
128 error.to_string(),
129 );
130 journal.record_operation_receipt(receipt)?;
131 layout.write_execution_journal(journal)?;
132 executed.push(BackupRunExecutedOperation::failed(&operation));
133 return Err(error);
134 }
135 }
136 }
137}
138
139fn execute_operation_receipt(
140 config: &BackupRunnerConfig,
141 executor: &mut impl BackupRunnerExecutor,
142 layout: &BackupLayout,
143 plan: &BackupPlan,
144 journal: &BackupExecutionJournal,
145 operation: &BackupExecutionJournalOperation,
146) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
147 match operation.kind {
148 BackupOperationKind::Stop => execute_stop(executor, journal, operation),
149 BackupOperationKind::CreateSnapshot => {
150 execute_create_snapshot(executor, layout, plan, journal, operation)
151 }
152 BackupOperationKind::Start => execute_start(executor, journal, operation),
153 BackupOperationKind::DownloadSnapshot => {
154 execute_download_snapshot(executor, layout, journal, operation)
155 }
156 BackupOperationKind::VerifyArtifact => execute_verify_artifact(layout, journal, operation),
157 BackupOperationKind::FinalizeManifest => {
158 execute_finalize_manifest(config, layout, plan, journal, operation)
159 }
160 BackupOperationKind::ValidateTopology
161 | BackupOperationKind::ValidateControlAuthority
162 | BackupOperationKind::ValidateSnapshotReadAuthority
163 | BackupOperationKind::ValidateQuiescencePolicy => {
164 Ok(BackupExecutionOperationReceipt::completed(
165 journal,
166 operation,
167 Some(state_updated_at(config.updated_at.as_ref())),
168 ))
169 }
170 }
171}
172
173fn execute_stop(
174 executor: &mut impl BackupRunnerExecutor,
175 journal: &BackupExecutionJournal,
176 operation: &BackupExecutionJournalOperation,
177) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
178 let target = operation_target(operation)?;
179 executor
180 .stop_canister(&target)
181 .map_err(|error| command_failed(operation.sequence, error))?;
182 Ok(BackupExecutionOperationReceipt::completed(
183 journal,
184 operation,
185 Some(current_timestamp_marker()),
186 ))
187}
188
189fn execute_start(
190 executor: &mut impl BackupRunnerExecutor,
191 journal: &BackupExecutionJournal,
192 operation: &BackupExecutionJournalOperation,
193) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
194 let target = operation_target(operation)?;
195 executor
196 .start_canister(&target)
197 .map_err(|error| command_failed(operation.sequence, error))?;
198 Ok(BackupExecutionOperationReceipt::completed(
199 journal,
200 operation,
201 Some(current_timestamp_marker()),
202 ))
203}
204
205fn execute_create_snapshot(
206 executor: &mut impl BackupRunnerExecutor,
207 layout: &BackupLayout,
208 plan: &BackupPlan,
209 journal: &BackupExecutionJournal,
210 operation: &BackupExecutionJournalOperation,
211) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
212 let target = operation_target(operation)?;
213 let snapshot_id = executor
214 .create_snapshot(&target)
215 .map_err(|error| command_failed(operation.sequence, error))?;
216 let mut receipt = BackupExecutionOperationReceipt::completed(
217 journal,
218 operation,
219 Some(current_timestamp_marker()),
220 );
221 receipt.snapshot_id = Some(snapshot_id.clone());
222
223 let mut download_journal = read_or_new_download_journal(layout, plan, journal)?;
224 upsert_artifact_entry(
225 &mut download_journal,
226 ArtifactJournalEntry {
227 canister_id: target.clone(),
228 snapshot_id,
229 state: ArtifactState::Created,
230 temp_path: None,
231 artifact_path: artifact_relative_path(&target),
232 checksum_algorithm: "sha256".to_string(),
233 checksum: None,
234 updated_at: current_timestamp_marker(),
235 },
236 );
237 layout.write_journal(&download_journal)?;
238 Ok(receipt)
239}
240
241fn execute_download_snapshot(
242 executor: &mut impl BackupRunnerExecutor,
243 layout: &BackupLayout,
244 journal: &BackupExecutionJournal,
245 operation: &BackupExecutionJournalOperation,
246) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
247 let target = operation_target(operation)?;
248 let snapshot_id = snapshot_id_for_target(journal, operation.sequence, &target)?;
249 let temp_path = artifact_temp_path(layout.root(), &target);
250 if temp_path.exists() {
251 fs::remove_dir_all(&temp_path)?;
252 }
253 fs::create_dir_all(&temp_path)?;
254 executor
255 .download_snapshot(&target, &snapshot_id, &temp_path)
256 .map_err(|error| command_failed(operation.sequence, error))?;
257
258 let mut download_journal = layout.read_journal()?;
259 let entry = artifact_entry_mut(&mut download_journal, operation.sequence, &target)?;
260 entry.temp_path = Some(temp_path.display().to_string());
261 entry.advance_to(ArtifactState::Downloaded, current_timestamp_marker())?;
262 layout.write_journal(&download_journal)?;
263
264 let mut receipt = BackupExecutionOperationReceipt::completed(
265 journal,
266 operation,
267 Some(current_timestamp_marker()),
268 );
269 receipt.artifact_path = Some(artifact_relative_path(&target));
270 Ok(receipt)
271}
272
273fn execute_verify_artifact(
274 layout: &BackupLayout,
275 journal: &BackupExecutionJournal,
276 operation: &BackupExecutionJournalOperation,
277) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
278 let target = operation_target(operation)?;
279 let mut download_journal = layout.read_journal()?;
280 let entry = artifact_entry_mut(&mut download_journal, operation.sequence, &target)?;
281 let temp_path =
282 entry
283 .temp_path
284 .as_deref()
285 .ok_or_else(|| BackupRunnerError::MissingArtifactEntry {
286 sequence: operation.sequence,
287 target_canister_id: target.clone(),
288 })?;
289 let checksum = ArtifactChecksum::from_path(Path::new(temp_path))?;
290 entry.checksum = Some(checksum.hash.clone());
291 entry.advance_to(ArtifactState::ChecksumVerified, current_timestamp_marker())?;
292 layout.write_journal(&download_journal)?;
293
294 let mut receipt = BackupExecutionOperationReceipt::completed(
295 journal,
296 operation,
297 Some(current_timestamp_marker()),
298 );
299 receipt.checksum = Some(checksum.hash);
300 Ok(receipt)
301}
302
303fn execute_finalize_manifest(
304 config: &BackupRunnerConfig,
305 layout: &BackupLayout,
306 plan: &BackupPlan,
307 journal: &BackupExecutionJournal,
308 operation: &BackupExecutionJournalOperation,
309) -> Result<BackupExecutionOperationReceipt, BackupRunnerError> {
310 let mut download_journal = layout.read_journal()?;
311 for index in 0..download_journal.artifacts.len() {
312 if download_journal.artifacts[index].state == ArtifactState::Durable {
313 continue;
314 }
315 let canister_id = download_journal.artifacts[index].canister_id.clone();
316 let temp_path = download_journal.artifacts[index].temp_path.clone().ok_or(
317 BackupRunnerError::MissingArtifactEntry {
318 sequence: operation.sequence,
319 target_canister_id: canister_id,
320 },
321 )?;
322 let artifact_path = layout
323 .root()
324 .join(&download_journal.artifacts[index].artifact_path);
325 if artifact_path.exists() {
326 return Err(io::Error::new(
327 io::ErrorKind::AlreadyExists,
328 format!("artifact path already exists: {}", artifact_path.display()),
329 )
330 .into());
331 }
332 fs::rename(&temp_path, artifact_path)?;
333 download_journal.artifacts[index].temp_path = None;
334 download_journal.artifacts[index]
335 .advance_to(ArtifactState::Durable, current_timestamp_marker())?;
336 layout.write_journal(&download_journal)?;
337 }
338
339 let manifest = build_manifest(config, plan, &download_journal)?;
340 layout.write_manifest(&manifest)?;
341 Ok(BackupExecutionOperationReceipt::completed(
342 journal,
343 operation,
344 Some(current_timestamp_marker()),
345 ))
346}
347
348fn build_manifest(
349 config: &BackupRunnerConfig,
350 plan: &BackupPlan,
351 journal: &DownloadJournal,
352) -> Result<FleetBackupManifest, BackupRunnerError> {
353 let roles = plan
354 .targets
355 .iter()
356 .enumerate()
357 .map(|(index, target)| target_role(index, target.role.as_deref()))
358 .collect::<Vec<_>>();
359 let manifest = FleetBackupManifest {
360 manifest_version: 1,
361 backup_id: plan.run_id.clone(),
362 created_at: state_updated_at(config.updated_at.as_ref()),
363 tool: ToolMetadata {
364 name: config.tool_name.clone(),
365 version: config.tool_version.clone(),
366 },
367 source: SourceMetadata {
368 environment: plan.network.clone(),
369 root_canister: plan.root_canister_id.clone(),
370 },
371 consistency: ConsistencySection {
372 backup_units: vec![BackupUnit {
373 unit_id: "backup-selection".to_string(),
374 kind: if plan.targets.len() == 1 {
375 BackupUnitKind::Single
376 } else {
377 BackupUnitKind::Subtree
378 },
379 roles,
380 }],
381 },
382 fleet: FleetSection {
383 topology_hash_algorithm: "sha256".to_string(),
384 topology_hash_input: format!("canic-backup-plan:{}", plan.plan_id),
385 discovery_topology_hash: plan.topology_hash_before_quiesce.clone(),
386 pre_snapshot_topology_hash: plan.topology_hash_before_quiesce.clone(),
387 topology_hash: plan.topology_hash_before_quiesce.clone(),
388 members: plan
389 .targets
390 .iter()
391 .enumerate()
392 .map(|(index, target)| {
393 let role = target_role(index, target.role.as_deref());
394 let entry = journal
395 .artifacts
396 .iter()
397 .find(|entry| {
398 entry.canister_id == target.canister_id
399 && entry.state == ArtifactState::Durable
400 })
401 .ok_or_else(|| BackupRunnerError::MissingArtifactEntry {
402 sequence: usize::MAX,
403 target_canister_id: target.canister_id.clone(),
404 })?;
405 Ok(FleetMember {
406 role: role.clone(),
407 canister_id: target.canister_id.clone(),
408 parent_canister_id: target.parent_canister_id.clone(),
409 subnet_canister_id: None,
410 controller_hint: controller_hint(plan, target),
411 identity_mode: target.identity_mode.clone(),
412 verification_checks: vec![VerificationCheck {
413 kind: "status".to_string(),
414 roles: vec![role],
415 }],
416 source_snapshot: SourceSnapshot {
417 snapshot_id: entry.snapshot_id.clone(),
418 module_hash: target.expected_module_hash.clone(),
419 code_version: None,
420 artifact_path: entry.artifact_path.clone(),
421 checksum_algorithm: entry.checksum_algorithm.clone(),
422 checksum: entry.checksum.clone(),
423 },
424 })
425 })
426 .collect::<Result<Vec<_>, BackupRunnerError>>()?,
427 },
428 verification: VerificationPlan::default(),
429 };
430 manifest.validate()?;
431 Ok(manifest)
432}
433
434fn controller_hint(plan: &BackupPlan, target: &crate::plan::BackupTarget) -> Option<String> {
435 if matches!(
436 target.control_authority.source,
437 ControlAuthoritySource::RootController
438 ) {
439 Some(plan.root_canister_id.clone())
440 } else {
441 None
442 }
443}
444
445fn run_response(
446 plan: &BackupPlan,
447 journal: &BackupExecutionJournal,
448 executed: Vec<BackupRunExecutedOperation>,
449 max_steps_reached: bool,
450) -> BackupRunResponse {
451 let execution = journal.resume_summary();
452 BackupRunResponse {
453 run_id: plan.run_id.clone(),
454 plan_id: plan.plan_id.clone(),
455 backup_id: plan.run_id.clone(),
456 complete: execution.completed_operations + execution.skipped_operations
457 == execution.total_operations,
458 max_steps_reached,
459 executed_operation_count: executed.len(),
460 executed_operations: executed,
461 execution,
462 }
463}
464
465fn read_or_new_download_journal(
466 layout: &BackupLayout,
467 plan: &BackupPlan,
468 journal: &BackupExecutionJournal,
469) -> Result<DownloadJournal, BackupRunnerError> {
470 if layout.journal_path().is_file() {
471 let mut journal = layout.read_journal()?;
472 journal.discovery_topology_hash = Some(plan.topology_hash_before_quiesce.clone());
473 journal.pre_snapshot_topology_hash = Some(plan.topology_hash_before_quiesce.clone());
474 return Ok(journal);
475 }
476
477 Ok(DownloadJournal {
478 journal_version: 1,
479 backup_id: journal.run_id.clone(),
480 discovery_topology_hash: Some(plan.topology_hash_before_quiesce.clone()),
481 pre_snapshot_topology_hash: Some(plan.topology_hash_before_quiesce.clone()),
482 operation_metrics: DownloadOperationMetrics::default(),
483 artifacts: Vec::new(),
484 })
485}
486
487fn upsert_artifact_entry(journal: &mut DownloadJournal, entry: ArtifactJournalEntry) {
488 if let Some(existing) = journal
489 .artifacts
490 .iter_mut()
491 .find(|existing| existing.canister_id == entry.canister_id)
492 {
493 *existing = entry;
494 } else {
495 journal.operation_metrics.target_count = journal.artifacts.len() + 1;
496 journal.artifacts.push(entry);
497 }
498}
499
500fn artifact_entry_mut<'a>(
501 journal: &'a mut DownloadJournal,
502 sequence: usize,
503 target: &str,
504) -> Result<&'a mut ArtifactJournalEntry, BackupRunnerError> {
505 journal
506 .artifacts
507 .iter_mut()
508 .find(|entry| entry.canister_id == target)
509 .ok_or_else(|| BackupRunnerError::MissingArtifactEntry {
510 sequence,
511 target_canister_id: target.to_string(),
512 })
513}
514
515fn snapshot_id_for_target(
516 journal: &BackupExecutionJournal,
517 sequence: usize,
518 target: &str,
519) -> Result<String, BackupRunnerError> {
520 journal
521 .operation_receipts
522 .iter()
523 .rev()
524 .find(|receipt| {
525 receipt.kind == BackupOperationKind::CreateSnapshot
526 && receipt.target_canister_id.as_deref() == Some(target)
527 && receipt.snapshot_id.is_some()
528 })
529 .and_then(|receipt| receipt.snapshot_id.clone())
530 .ok_or_else(|| BackupRunnerError::MissingSnapshotId {
531 sequence,
532 target_canister_id: target.to_string(),
533 })
534}
535
536fn operation_target(
537 operation: &BackupExecutionJournalOperation,
538) -> Result<String, BackupRunnerError> {
539 operation
540 .target_canister_id
541 .clone()
542 .ok_or(BackupRunnerError::MissingOperationTarget {
543 sequence: operation.sequence,
544 })
545}
546
547fn command_failed(sequence: usize, error: BackupRunnerCommandError) -> BackupRunnerError {
548 BackupRunnerError::CommandFailed {
549 sequence,
550 status: error.status,
551 message: error.message,
552 }
553}
554
555fn artifact_relative_path(canister_id: &str) -> String {
556 safe_path_segment(canister_id)
557}
558
559fn artifact_temp_path(root: &Path, canister_id: &str) -> PathBuf {
560 root.join(format!("{}.tmp", safe_path_segment(canister_id)))
561}
562
563fn safe_path_segment(value: &str) -> String {
564 value
565 .chars()
566 .map(|ch| match ch {
567 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch,
568 _ => '_',
569 })
570 .collect()
571}
572
573fn target_role(index: usize, role: Option<&str>) -> String {
574 role.map_or_else(|| format!("member-{index}"), str::to_string)
575}
576
577fn state_updated_at(updated_at: Option<&String>) -> String {
578 updated_at.cloned().unwrap_or_else(current_timestamp_marker)
579}
580
581fn timestamp_seconds(marker: &str) -> u64 {
582 marker
583 .strip_prefix("unix:")
584 .and_then(|seconds| seconds.parse::<u64>().ok())
585 .unwrap_or_else(current_unix_seconds)
586}
587
588fn timestamp_marker(seconds: u64) -> String {
589 format!("unix:{seconds}")
590}
591
592fn current_unix_seconds() -> u64 {
593 SystemTime::now()
594 .duration_since(UNIX_EPOCH)
595 .map_or(0, |duration| duration.as_secs())
596}
597
598struct BackupRunLock {
599 path: PathBuf,
600}
601
602impl BackupRunLock {
603 fn acquire(journal_path: &Path) -> Result<Self, BackupRunnerError> {
604 let path = journal_lock_path(journal_path);
605 match fs::OpenOptions::new()
606 .write(true)
607 .create_new(true)
608 .open(&path)
609 {
610 Ok(mut file) => {
611 writeln!(file, "pid={}", std::process::id())?;
612 Ok(Self { path })
613 }
614 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
615 Err(BackupRunnerError::JournalLocked {
616 lock_path: path.to_string_lossy().to_string(),
617 })
618 }
619 Err(error) => Err(error.into()),
620 }
621 }
622}
623
624impl Drop for BackupRunLock {
625 fn drop(&mut self) {
626 let _ = fs::remove_file(&self.path);
627 }
628}
629
630fn journal_lock_path(path: &Path) -> PathBuf {
631 let mut lock_path = path.as_os_str().to_os_string();
632 lock_path.push(".lock");
633 PathBuf::from(lock_path)
634}
635
636#[cfg(test)]
637mod tests;