Skip to main content

canic_backup/journal/
mod.rs

1use candid::Principal;
2use serde::{Deserialize, Serialize};
3use std::{
4    collections::BTreeSet,
5    path::{Component, PathBuf},
6    str::FromStr,
7};
8use thiserror::Error as ThisError;
9
10const SUPPORTED_JOURNAL_VERSION: u16 = 1;
11const SHA256_ALGORITHM: &str = "sha256";
12
13///
14/// DownloadJournal
15///
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
18pub struct DownloadJournal {
19    pub journal_version: u16,
20    pub backup_id: String,
21    #[serde(default)]
22    pub discovery_topology_hash: Option<String>,
23    #[serde(default)]
24    pub pre_snapshot_topology_hash: Option<String>,
25    #[serde(default)]
26    pub operation_metrics: DownloadOperationMetrics,
27    pub artifacts: Vec<ArtifactJournalEntry>,
28}
29
30impl DownloadJournal {
31    /// Validate resumable artifact state for one backup run.
32    pub fn validate(&self) -> Result<(), JournalValidationError> {
33        validate_journal_version(self.journal_version)?;
34        validate_nonempty("backup_id", &self.backup_id)?;
35        validate_optional_hash(
36            "discovery_topology_hash",
37            self.discovery_topology_hash.as_deref(),
38        )?;
39        validate_optional_hash(
40            "pre_snapshot_topology_hash",
41            self.pre_snapshot_topology_hash.as_deref(),
42        )?;
43
44        if self.artifacts.is_empty() {
45            return Err(JournalValidationError::EmptyCollection("artifacts"));
46        }
47
48        let mut keys = BTreeSet::new();
49        for artifact in &self.artifacts {
50            artifact.validate()?;
51            let key = (artifact.canister_id.clone(), artifact.snapshot_id.clone());
52            if !keys.insert(key) {
53                return Err(JournalValidationError::DuplicateArtifact {
54                    canister_id: artifact.canister_id.clone(),
55                    snapshot_id: artifact.snapshot_id.clone(),
56                });
57            }
58        }
59
60        Ok(())
61    }
62
63    /// Build a resumability report from the current journal state.
64    #[must_use]
65    pub fn resume_report(&self) -> JournalResumeReport {
66        let mut counts = JournalStateCounts::default();
67        let mut artifacts = Vec::with_capacity(self.artifacts.len());
68
69        for artifact in &self.artifacts {
70            counts.record(artifact.state, artifact.resume_action());
71            artifacts.push(ArtifactResumeReport {
72                canister_id: artifact.canister_id.clone(),
73                snapshot_id: artifact.snapshot_id.clone(),
74                state: artifact.state,
75                resume_action: artifact.resume_action(),
76                artifact_path: artifact.artifact_path.clone(),
77                temp_path: artifact.temp_path.clone(),
78                updated_at: artifact.updated_at.clone(),
79            });
80        }
81
82        JournalResumeReport {
83            backup_id: self.backup_id.clone(),
84            discovery_topology_hash: self.discovery_topology_hash.clone(),
85            pre_snapshot_topology_hash: self.pre_snapshot_topology_hash.clone(),
86            total_artifacts: self.artifacts.len(),
87            is_complete: counts.skip == self.artifacts.len(),
88            pending_artifacts: self.artifacts.len() - counts.skip,
89            counts,
90            operation_metrics: self.operation_metrics.clone(),
91            artifacts,
92        }
93    }
94}
95
96///
97/// DownloadOperationMetrics
98///
99
100#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
101pub struct DownloadOperationMetrics {
102    pub target_count: usize,
103    pub snapshot_create_started: usize,
104    pub snapshot_create_completed: usize,
105    pub snapshot_download_started: usize,
106    pub snapshot_download_completed: usize,
107    pub checksum_verify_started: usize,
108    pub checksum_verify_completed: usize,
109    pub artifact_finalize_started: usize,
110    pub artifact_finalize_completed: usize,
111}
112
113///
114/// ArtifactJournalEntry
115///
116
117#[derive(Clone, Debug, Deserialize, Serialize)]
118pub struct ArtifactJournalEntry {
119    pub canister_id: String,
120    pub snapshot_id: String,
121    pub state: ArtifactState,
122    pub temp_path: Option<String>,
123    pub artifact_path: String,
124    pub checksum_algorithm: String,
125    pub checksum: Option<String>,
126    pub updated_at: String,
127}
128
129impl ArtifactJournalEntry {
130    /// Return the idempotent action needed to resume this artifact.
131    #[must_use]
132    pub const fn resume_action(&self) -> ResumeAction {
133        match self.state {
134            ArtifactState::Created => ResumeAction::Download,
135            ArtifactState::Downloaded => ResumeAction::VerifyChecksum,
136            ArtifactState::ChecksumVerified => ResumeAction::Finalize,
137            ArtifactState::Durable => ResumeAction::Skip,
138        }
139    }
140
141    /// Advance this artifact to a later journal state.
142    pub fn advance_to(
143        &mut self,
144        next_state: ArtifactState,
145        updated_at: String,
146    ) -> Result<(), JournalValidationError> {
147        if !self.state.can_advance_to(next_state) {
148            return Err(JournalValidationError::InvalidStateTransition {
149                from: self.state,
150                to: next_state,
151            });
152        }
153
154        self.state = next_state;
155        self.updated_at = updated_at;
156        Ok(())
157    }
158
159    /// Validate one artifact's resumable state.
160    fn validate(&self) -> Result<(), JournalValidationError> {
161        validate_principal("artifacts[].canister_id", &self.canister_id)?;
162        validate_nonempty("artifacts[].snapshot_id", &self.snapshot_id)?;
163        validate_nonempty("artifacts[].artifact_path", &self.artifact_path)?;
164        validate_relative_artifact_path("artifacts[].artifact_path", &self.artifact_path)?;
165        validate_nonempty("artifacts[].checksum_algorithm", &self.checksum_algorithm)?;
166        validate_nonempty("artifacts[].updated_at", &self.updated_at)?;
167
168        if self.checksum_algorithm != SHA256_ALGORITHM {
169            return Err(JournalValidationError::UnsupportedHashAlgorithm(
170                self.checksum_algorithm.clone(),
171            ));
172        }
173
174        if matches!(
175            self.state,
176            ArtifactState::Downloaded | ArtifactState::ChecksumVerified
177        ) {
178            validate_required_option("artifacts[].temp_path", self.temp_path.as_deref())?;
179        }
180
181        if matches!(
182            self.state,
183            ArtifactState::ChecksumVerified | ArtifactState::Durable
184        ) {
185            validate_required_hash("artifacts[].checksum", self.checksum.as_deref())?;
186        }
187
188        Ok(())
189    }
190}
191
192///
193/// ArtifactState
194///
195
196#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
197#[serde(rename_all = "PascalCase")]
198pub enum ArtifactState {
199    Created,
200    Downloaded,
201    ChecksumVerified,
202    Durable,
203}
204
205impl ArtifactState {
206    /// Return whether this state can advance monotonically to `next`.
207    #[must_use]
208    pub const fn can_advance_to(self, next: Self) -> bool {
209        self.as_order() <= next.as_order()
210    }
211
212    /// Return the stable ordering used by the journal state machine.
213    const fn as_order(self) -> u8 {
214        match self {
215            Self::Created => 0,
216            Self::Downloaded => 1,
217            Self::ChecksumVerified => 2,
218            Self::Durable => 3,
219        }
220    }
221}
222
223///
224/// ResumeAction
225///
226
227#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
228#[serde(rename_all = "PascalCase")]
229pub enum ResumeAction {
230    Download,
231    VerifyChecksum,
232    Finalize,
233    Skip,
234}
235
236///
237/// JournalResumeReport
238///
239
240#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
241pub struct JournalResumeReport {
242    pub backup_id: String,
243    pub discovery_topology_hash: Option<String>,
244    pub pre_snapshot_topology_hash: Option<String>,
245    pub total_artifacts: usize,
246    pub is_complete: bool,
247    pub pending_artifacts: usize,
248    pub counts: JournalStateCounts,
249    pub operation_metrics: DownloadOperationMetrics,
250    pub artifacts: Vec<ArtifactResumeReport>,
251}
252
253///
254/// JournalStateCounts
255///
256
257#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
258pub struct JournalStateCounts {
259    pub created: usize,
260    pub downloaded: usize,
261    pub checksum_verified: usize,
262    pub durable: usize,
263    pub download: usize,
264    pub verify_checksum: usize,
265    pub finalize: usize,
266    pub skip: usize,
267}
268
269impl JournalStateCounts {
270    // Record one artifact's state and next idempotent resume action.
271    const fn record(&mut self, state: ArtifactState, action: ResumeAction) {
272        match state {
273            ArtifactState::Created => self.created += 1,
274            ArtifactState::Downloaded => self.downloaded += 1,
275            ArtifactState::ChecksumVerified => self.checksum_verified += 1,
276            ArtifactState::Durable => self.durable += 1,
277        }
278
279        match action {
280            ResumeAction::Download => self.download += 1,
281            ResumeAction::VerifyChecksum => self.verify_checksum += 1,
282            ResumeAction::Finalize => self.finalize += 1,
283            ResumeAction::Skip => self.skip += 1,
284        }
285    }
286}
287
288///
289/// ArtifactResumeReport
290///
291
292#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
293pub struct ArtifactResumeReport {
294    pub canister_id: String,
295    pub snapshot_id: String,
296    pub state: ArtifactState,
297    pub resume_action: ResumeAction,
298    pub artifact_path: String,
299    pub temp_path: Option<String>,
300    pub updated_at: String,
301}
302
303///
304/// JournalValidationError
305///
306
307#[derive(Debug, ThisError)]
308pub enum JournalValidationError {
309    #[error("unsupported journal version {0}")]
310    UnsupportedJournalVersion(u16),
311
312    #[error("field {0} must not be empty")]
313    EmptyField(&'static str),
314
315    #[error("collection {0} must not be empty")]
316    EmptyCollection(&'static str),
317
318    #[error("field {field} must be a valid principal: {value}")]
319    InvalidPrincipal { field: &'static str, value: String },
320
321    #[error("field {0} must be a non-empty sha256 hex string")]
322    InvalidHash(&'static str),
323
324    #[error("unsupported hash algorithm {0}")]
325    UnsupportedHashAlgorithm(String),
326
327    #[error("field {field} must be a relative artifact path under the backup root: {value}")]
328    InvalidArtifactPath { field: &'static str, value: String },
329
330    #[error("duplicate artifact entry for canister {canister_id} snapshot {snapshot_id}")]
331    DuplicateArtifact {
332        canister_id: String,
333        snapshot_id: String,
334    },
335
336    #[error("invalid journal transition from {from:?} to {to:?}")]
337    InvalidStateTransition {
338        from: ArtifactState,
339        to: ArtifactState,
340    },
341}
342
343// Validate the journal format version before checking nested entries.
344const fn validate_journal_version(version: u16) -> Result<(), JournalValidationError> {
345    if version == SUPPORTED_JOURNAL_VERSION {
346        Ok(())
347    } else {
348        Err(JournalValidationError::UnsupportedJournalVersion(version))
349    }
350}
351
352// Validate required string fields after trimming whitespace.
353fn validate_nonempty(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
354    if value.trim().is_empty() {
355        Err(JournalValidationError::EmptyField(field))
356    } else {
357        Ok(())
358    }
359}
360
361// Validate durable artifact paths before any runner resume code trusts them.
362fn validate_relative_artifact_path(
363    field: &'static str,
364    value: &str,
365) -> Result<(), JournalValidationError> {
366    let path = PathBuf::from(value);
367    if path.is_absolute()
368        || !path
369            .components()
370            .all(|component| matches!(component, Component::Normal(_) | Component::CurDir))
371    {
372        return Err(JournalValidationError::InvalidArtifactPath {
373            field,
374            value: value.to_string(),
375        });
376    }
377    Ok(())
378}
379
380// Validate required string fields represented as optional journal fields.
381fn validate_required_option(
382    field: &'static str,
383    value: Option<&str>,
384) -> Result<(), JournalValidationError> {
385    match value {
386        Some(value) => validate_nonempty(field, value),
387        None => Err(JournalValidationError::EmptyField(field)),
388    }
389}
390
391// Validate textual principal fields used in JSON journals.
392fn validate_principal(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
393    validate_nonempty(field, value)?;
394    Principal::from_str(value)
395        .map(|_| ())
396        .map_err(|_| JournalValidationError::InvalidPrincipal {
397            field,
398            value: value.to_string(),
399        })
400}
401
402// Validate required SHA-256 hex fields represented as optional journal fields.
403fn validate_required_hash(
404    field: &'static str,
405    value: Option<&str>,
406) -> Result<(), JournalValidationError> {
407    match value {
408        Some(value) => validate_hash(field, value),
409        None => Err(JournalValidationError::EmptyField(field)),
410    }
411}
412
413// Validate optional SHA-256 hex fields when present.
414fn validate_optional_hash(
415    field: &'static str,
416    value: Option<&str>,
417) -> Result<(), JournalValidationError> {
418    if let Some(value) = value {
419        validate_hash(field, value)?;
420    }
421    Ok(())
422}
423
424// Validate SHA-256 hex values used for downloaded artifacts.
425fn validate_hash(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
426    const SHA256_HEX_LEN: usize = 64;
427    validate_nonempty(field, value)?;
428    if value.len() == SHA256_HEX_LEN && value.bytes().all(|b| b.is_ascii_hexdigit()) {
429        Ok(())
430    } else {
431        Err(JournalValidationError::InvalidHash(field))
432    }
433}
434
435#[cfg(test)]
436mod tests;