Skip to main content

canic_backup/journal/
mod.rs

1use candid::Principal;
2use serde::{Deserialize, Serialize};
3use std::{collections::BTreeSet, str::FromStr};
4use thiserror::Error as ThisError;
5
6const SUPPORTED_JOURNAL_VERSION: u16 = 1;
7const SHA256_ALGORITHM: &str = "sha256";
8
9///
10/// DownloadJournal
11///
12
13#[derive(Clone, Debug, Deserialize, Serialize)]
14pub struct DownloadJournal {
15    pub journal_version: u16,
16    pub backup_id: String,
17    #[serde(default)]
18    pub discovery_topology_hash: Option<String>,
19    #[serde(default)]
20    pub pre_snapshot_topology_hash: Option<String>,
21    #[serde(default)]
22    pub operation_metrics: DownloadOperationMetrics,
23    pub artifacts: Vec<ArtifactJournalEntry>,
24}
25
26impl DownloadJournal {
27    /// Validate resumable artifact state for one backup run.
28    pub fn validate(&self) -> Result<(), JournalValidationError> {
29        validate_journal_version(self.journal_version)?;
30        validate_nonempty("backup_id", &self.backup_id)?;
31        validate_optional_hash(
32            "discovery_topology_hash",
33            self.discovery_topology_hash.as_deref(),
34        )?;
35        validate_optional_hash(
36            "pre_snapshot_topology_hash",
37            self.pre_snapshot_topology_hash.as_deref(),
38        )?;
39
40        if self.artifacts.is_empty() {
41            return Err(JournalValidationError::EmptyCollection("artifacts"));
42        }
43
44        let mut keys = BTreeSet::new();
45        for artifact in &self.artifacts {
46            artifact.validate()?;
47            let key = (artifact.canister_id.clone(), artifact.snapshot_id.clone());
48            if !keys.insert(key) {
49                return Err(JournalValidationError::DuplicateArtifact {
50                    canister_id: artifact.canister_id.clone(),
51                    snapshot_id: artifact.snapshot_id.clone(),
52                });
53            }
54        }
55
56        Ok(())
57    }
58
59    /// Build a resumability report from the current journal state.
60    #[must_use]
61    pub fn resume_report(&self) -> JournalResumeReport {
62        let mut counts = JournalStateCounts::default();
63        let mut artifacts = Vec::with_capacity(self.artifacts.len());
64
65        for artifact in &self.artifacts {
66            counts.record(artifact.state, artifact.resume_action());
67            artifacts.push(ArtifactResumeReport {
68                canister_id: artifact.canister_id.clone(),
69                snapshot_id: artifact.snapshot_id.clone(),
70                state: artifact.state,
71                resume_action: artifact.resume_action(),
72                artifact_path: artifact.artifact_path.clone(),
73                temp_path: artifact.temp_path.clone(),
74                updated_at: artifact.updated_at.clone(),
75            });
76        }
77
78        JournalResumeReport {
79            backup_id: self.backup_id.clone(),
80            discovery_topology_hash: self.discovery_topology_hash.clone(),
81            pre_snapshot_topology_hash: self.pre_snapshot_topology_hash.clone(),
82            total_artifacts: self.artifacts.len(),
83            is_complete: counts.skip == self.artifacts.len(),
84            pending_artifacts: self.artifacts.len() - counts.skip,
85            counts,
86            operation_metrics: self.operation_metrics.clone(),
87            artifacts,
88        }
89    }
90}
91
92///
93/// DownloadOperationMetrics
94///
95
96#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
97pub struct DownloadOperationMetrics {
98    pub target_count: usize,
99    pub snapshot_create_started: usize,
100    pub snapshot_create_completed: usize,
101    pub snapshot_download_started: usize,
102    pub snapshot_download_completed: usize,
103    pub checksum_verify_started: usize,
104    pub checksum_verify_completed: usize,
105    pub artifact_finalize_started: usize,
106    pub artifact_finalize_completed: usize,
107}
108
109///
110/// ArtifactJournalEntry
111///
112
113#[derive(Clone, Debug, Deserialize, Serialize)]
114pub struct ArtifactJournalEntry {
115    pub canister_id: String,
116    pub snapshot_id: String,
117    pub state: ArtifactState,
118    pub temp_path: Option<String>,
119    pub artifact_path: String,
120    pub checksum_algorithm: String,
121    pub checksum: Option<String>,
122    pub updated_at: String,
123}
124
125impl ArtifactJournalEntry {
126    /// Return the idempotent action needed to resume this artifact.
127    #[must_use]
128    pub const fn resume_action(&self) -> ResumeAction {
129        match self.state {
130            ArtifactState::Created => ResumeAction::Download,
131            ArtifactState::Downloaded => ResumeAction::VerifyChecksum,
132            ArtifactState::ChecksumVerified => ResumeAction::Finalize,
133            ArtifactState::Durable => ResumeAction::Skip,
134        }
135    }
136
137    /// Advance this artifact to a later journal state.
138    pub fn advance_to(
139        &mut self,
140        next_state: ArtifactState,
141        updated_at: String,
142    ) -> Result<(), JournalValidationError> {
143        if !self.state.can_advance_to(next_state) {
144            return Err(JournalValidationError::InvalidStateTransition {
145                from: self.state,
146                to: next_state,
147            });
148        }
149
150        self.state = next_state;
151        self.updated_at = updated_at;
152        Ok(())
153    }
154
155    /// Validate one artifact's resumable state.
156    fn validate(&self) -> Result<(), JournalValidationError> {
157        validate_principal("artifacts[].canister_id", &self.canister_id)?;
158        validate_nonempty("artifacts[].snapshot_id", &self.snapshot_id)?;
159        validate_nonempty("artifacts[].artifact_path", &self.artifact_path)?;
160        validate_nonempty("artifacts[].checksum_algorithm", &self.checksum_algorithm)?;
161        validate_nonempty("artifacts[].updated_at", &self.updated_at)?;
162
163        if self.checksum_algorithm != SHA256_ALGORITHM {
164            return Err(JournalValidationError::UnsupportedHashAlgorithm(
165                self.checksum_algorithm.clone(),
166            ));
167        }
168
169        if matches!(
170            self.state,
171            ArtifactState::Downloaded | ArtifactState::ChecksumVerified
172        ) {
173            validate_required_option("artifacts[].temp_path", self.temp_path.as_deref())?;
174        }
175
176        if matches!(
177            self.state,
178            ArtifactState::ChecksumVerified | ArtifactState::Durable
179        ) {
180            validate_required_hash("artifacts[].checksum", self.checksum.as_deref())?;
181        }
182
183        Ok(())
184    }
185}
186
187///
188/// ArtifactState
189///
190
191#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
192#[serde(rename_all = "PascalCase")]
193pub enum ArtifactState {
194    Created,
195    Downloaded,
196    ChecksumVerified,
197    Durable,
198}
199
200impl ArtifactState {
201    /// Return whether this state can advance monotonically to `next`.
202    #[must_use]
203    pub const fn can_advance_to(self, next: Self) -> bool {
204        self.as_order() <= next.as_order()
205    }
206
207    /// Return the stable ordering used by the journal state machine.
208    const fn as_order(self) -> u8 {
209        match self {
210            Self::Created => 0,
211            Self::Downloaded => 1,
212            Self::ChecksumVerified => 2,
213            Self::Durable => 3,
214        }
215    }
216}
217
218///
219/// ResumeAction
220///
221
222#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
223#[serde(rename_all = "PascalCase")]
224pub enum ResumeAction {
225    Download,
226    VerifyChecksum,
227    Finalize,
228    Skip,
229}
230
231///
232/// JournalResumeReport
233///
234
235#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
236pub struct JournalResumeReport {
237    pub backup_id: String,
238    pub discovery_topology_hash: Option<String>,
239    pub pre_snapshot_topology_hash: Option<String>,
240    pub total_artifacts: usize,
241    pub is_complete: bool,
242    pub pending_artifacts: usize,
243    pub counts: JournalStateCounts,
244    pub operation_metrics: DownloadOperationMetrics,
245    pub artifacts: Vec<ArtifactResumeReport>,
246}
247
248///
249/// JournalStateCounts
250///
251
252#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
253pub struct JournalStateCounts {
254    pub created: usize,
255    pub downloaded: usize,
256    pub checksum_verified: usize,
257    pub durable: usize,
258    pub download: usize,
259    pub verify_checksum: usize,
260    pub finalize: usize,
261    pub skip: usize,
262}
263
264impl JournalStateCounts {
265    // Record one artifact's state and next idempotent resume action.
266    const fn record(&mut self, state: ArtifactState, action: ResumeAction) {
267        match state {
268            ArtifactState::Created => self.created += 1,
269            ArtifactState::Downloaded => self.downloaded += 1,
270            ArtifactState::ChecksumVerified => self.checksum_verified += 1,
271            ArtifactState::Durable => self.durable += 1,
272        }
273
274        match action {
275            ResumeAction::Download => self.download += 1,
276            ResumeAction::VerifyChecksum => self.verify_checksum += 1,
277            ResumeAction::Finalize => self.finalize += 1,
278            ResumeAction::Skip => self.skip += 1,
279        }
280    }
281}
282
283///
284/// ArtifactResumeReport
285///
286
287#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
288pub struct ArtifactResumeReport {
289    pub canister_id: String,
290    pub snapshot_id: String,
291    pub state: ArtifactState,
292    pub resume_action: ResumeAction,
293    pub artifact_path: String,
294    pub temp_path: Option<String>,
295    pub updated_at: String,
296}
297
298///
299/// JournalValidationError
300///
301
302#[derive(Debug, ThisError)]
303pub enum JournalValidationError {
304    #[error("unsupported journal version {0}")]
305    UnsupportedJournalVersion(u16),
306
307    #[error("field {0} must not be empty")]
308    EmptyField(&'static str),
309
310    #[error("collection {0} must not be empty")]
311    EmptyCollection(&'static str),
312
313    #[error("field {field} must be a valid principal: {value}")]
314    InvalidPrincipal { field: &'static str, value: String },
315
316    #[error("field {0} must be a non-empty sha256 hex string")]
317    InvalidHash(&'static str),
318
319    #[error("unsupported hash algorithm {0}")]
320    UnsupportedHashAlgorithm(String),
321
322    #[error("duplicate artifact entry for canister {canister_id} snapshot {snapshot_id}")]
323    DuplicateArtifact {
324        canister_id: String,
325        snapshot_id: String,
326    },
327
328    #[error("invalid journal transition from {from:?} to {to:?}")]
329    InvalidStateTransition {
330        from: ArtifactState,
331        to: ArtifactState,
332    },
333}
334
335// Validate the journal format version before checking nested entries.
336const fn validate_journal_version(version: u16) -> Result<(), JournalValidationError> {
337    if version == SUPPORTED_JOURNAL_VERSION {
338        Ok(())
339    } else {
340        Err(JournalValidationError::UnsupportedJournalVersion(version))
341    }
342}
343
344// Validate required string fields after trimming whitespace.
345fn validate_nonempty(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
346    if value.trim().is_empty() {
347        Err(JournalValidationError::EmptyField(field))
348    } else {
349        Ok(())
350    }
351}
352
353// Validate required string fields represented as optional journal fields.
354fn validate_required_option(
355    field: &'static str,
356    value: Option<&str>,
357) -> Result<(), JournalValidationError> {
358    match value {
359        Some(value) => validate_nonempty(field, value),
360        None => Err(JournalValidationError::EmptyField(field)),
361    }
362}
363
364// Validate textual principal fields used in JSON journals.
365fn validate_principal(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
366    validate_nonempty(field, value)?;
367    Principal::from_str(value)
368        .map(|_| ())
369        .map_err(|_| JournalValidationError::InvalidPrincipal {
370            field,
371            value: value.to_string(),
372        })
373}
374
375// Validate required SHA-256 hex fields represented as optional journal fields.
376fn validate_required_hash(
377    field: &'static str,
378    value: Option<&str>,
379) -> Result<(), JournalValidationError> {
380    match value {
381        Some(value) => validate_hash(field, value),
382        None => Err(JournalValidationError::EmptyField(field)),
383    }
384}
385
386// Validate optional SHA-256 hex fields when present.
387fn validate_optional_hash(
388    field: &'static str,
389    value: Option<&str>,
390) -> Result<(), JournalValidationError> {
391    if let Some(value) = value {
392        validate_hash(field, value)?;
393    }
394    Ok(())
395}
396
397// Validate SHA-256 hex values used for downloaded artifacts.
398fn validate_hash(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
399    const SHA256_HEX_LEN: usize = 64;
400    validate_nonempty(field, value)?;
401    if value.len() == SHA256_HEX_LEN && value.bytes().all(|b| b.is_ascii_hexdigit()) {
402        Ok(())
403    } else {
404        Err(JournalValidationError::InvalidHash(field))
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    const ROOT: &str = "aaaaa-aa";
413    const HASH: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
414
415    // Build one valid durable journal for validation tests.
416    fn valid_journal() -> DownloadJournal {
417        DownloadJournal {
418            journal_version: 1,
419            backup_id: "fbk_test_001".to_string(),
420            discovery_topology_hash: Some(HASH.to_string()),
421            pre_snapshot_topology_hash: Some(HASH.to_string()),
422            operation_metrics: DownloadOperationMetrics::default(),
423            artifacts: vec![ArtifactJournalEntry {
424                canister_id: ROOT.to_string(),
425                snapshot_id: "snap-1".to_string(),
426                state: ArtifactState::Durable,
427                temp_path: None,
428                artifact_path: "artifacts/root".to_string(),
429                checksum_algorithm: "sha256".to_string(),
430                checksum: Some(HASH.to_string()),
431                updated_at: "2026-04-10T12:00:00Z".to_string(),
432            }],
433        }
434    }
435
436    // Ensure durable artifact journals validate.
437    #[test]
438    fn valid_journal_passes_validation() {
439        let journal = valid_journal();
440
441        journal.validate().expect("journal should validate");
442    }
443
444    // Ensure state determines the next idempotent resume action.
445    #[test]
446    fn resume_action_matches_artifact_state() {
447        let mut entry = valid_journal().artifacts.remove(0);
448
449        entry.state = ArtifactState::Created;
450        assert_eq!(entry.resume_action(), ResumeAction::Download);
451
452        entry.state = ArtifactState::Downloaded;
453        assert_eq!(entry.resume_action(), ResumeAction::VerifyChecksum);
454
455        entry.state = ArtifactState::ChecksumVerified;
456        assert_eq!(entry.resume_action(), ResumeAction::Finalize);
457
458        entry.state = ArtifactState::Durable;
459        assert_eq!(entry.resume_action(), ResumeAction::Skip);
460    }
461
462    // Ensure resume reports summarize states and next idempotent actions.
463    #[test]
464    fn resume_report_counts_states_and_actions() {
465        let mut journal = valid_journal();
466        journal.artifacts[0].state = ArtifactState::Created;
467        journal.artifacts[0].checksum = None;
468        let mut downloaded = journal.artifacts[0].clone();
469        downloaded.snapshot_id = "snap-2".to_string();
470        downloaded.state = ArtifactState::Downloaded;
471        downloaded.temp_path = Some("artifacts/root.tmp".to_string());
472        let mut durable = valid_journal().artifacts.remove(0);
473        durable.snapshot_id = "snap-3".to_string();
474        journal.artifacts.push(downloaded);
475        journal.artifacts.push(durable);
476
477        let report = journal.resume_report();
478
479        assert_eq!(report.total_artifacts, 3);
480        assert_eq!(report.discovery_topology_hash.as_deref(), Some(HASH));
481        assert_eq!(report.pre_snapshot_topology_hash.as_deref(), Some(HASH));
482        assert!(!report.is_complete);
483        assert_eq!(report.pending_artifacts, 2);
484        assert_eq!(report.counts.created, 1);
485        assert_eq!(report.counts.downloaded, 1);
486        assert_eq!(report.counts.durable, 1);
487        assert_eq!(report.counts.download, 1);
488        assert_eq!(report.counts.verify_checksum, 1);
489        assert_eq!(report.counts.skip, 1);
490        assert_eq!(report.artifacts[0].resume_action, ResumeAction::Download);
491    }
492
493    // Ensure journal transitions cannot move backward.
494    #[test]
495    fn state_transitions_are_monotonic() {
496        let mut entry = valid_journal().artifacts.remove(0);
497
498        let err = entry
499            .advance_to(
500                ArtifactState::Downloaded,
501                "2026-04-10T12:01:00Z".to_string(),
502            )
503            .expect_err("durable cannot move back to downloaded");
504
505        assert!(matches!(
506            err,
507            JournalValidationError::InvalidStateTransition { .. }
508        ));
509    }
510
511    // Ensure checksum is required once an artifact is durable.
512    #[test]
513    fn durable_artifact_requires_checksum() {
514        let mut journal = valid_journal();
515        journal.artifacts[0].checksum = None;
516
517        let err = journal
518            .validate()
519            .expect_err("durable artifact without checksum should fail");
520
521        assert!(matches!(err, JournalValidationError::EmptyField(_)));
522    }
523
524    // Ensure duplicate canister/snapshot rows are rejected.
525    #[test]
526    fn duplicate_artifacts_fail_validation() {
527        let mut journal = valid_journal();
528        journal.artifacts.push(journal.artifacts[0].clone());
529
530        let err = journal
531            .validate()
532            .expect_err("duplicate artifact should fail");
533
534        assert!(matches!(
535            err,
536            JournalValidationError::DuplicateArtifact { .. }
537        ));
538    }
539
540    // Ensure journals round-trip through the JSON format.
541    #[test]
542    fn journal_round_trips_through_json() {
543        let journal = valid_journal();
544
545        let encoded = serde_json::to_string(&journal).expect("serialize journal");
546        let decoded: DownloadJournal = serde_json::from_str(&encoded).expect("deserialize journal");
547
548        decoded.validate().expect("decoded journal should validate");
549    }
550}