1use candid::Principal;
2use serde::{Deserialize, Serialize};
3use std::{
4 collections::BTreeSet,
5 path::{Component, PathBuf},
6 str::FromStr,
7};
8use thiserror::Error as ThisError;
9
10const SUPPORTED_JOURNAL_VERSION: u16 = 1;
11const SHA256_ALGORITHM: &str = "sha256";
12
13#[derive(Clone, Debug, Deserialize, Serialize)]
18pub struct DownloadJournal {
19 pub journal_version: u16,
20 pub backup_id: String,
21 #[serde(default)]
22 pub discovery_topology_hash: Option<String>,
23 #[serde(default)]
24 pub pre_snapshot_topology_hash: Option<String>,
25 #[serde(default)]
26 pub operation_metrics: DownloadOperationMetrics,
27 pub artifacts: Vec<ArtifactJournalEntry>,
28}
29
30impl DownloadJournal {
31 pub fn validate(&self) -> Result<(), JournalValidationError> {
33 validate_journal_version(self.journal_version)?;
34 validate_nonempty("backup_id", &self.backup_id)?;
35 validate_optional_hash(
36 "discovery_topology_hash",
37 self.discovery_topology_hash.as_deref(),
38 )?;
39 validate_optional_hash(
40 "pre_snapshot_topology_hash",
41 self.pre_snapshot_topology_hash.as_deref(),
42 )?;
43
44 if self.artifacts.is_empty() {
45 return Err(JournalValidationError::EmptyCollection("artifacts"));
46 }
47
48 let mut keys = BTreeSet::new();
49 for artifact in &self.artifacts {
50 artifact.validate()?;
51 let key = (artifact.canister_id.clone(), artifact.snapshot_id.clone());
52 if !keys.insert(key) {
53 return Err(JournalValidationError::DuplicateArtifact {
54 canister_id: artifact.canister_id.clone(),
55 snapshot_id: artifact.snapshot_id.clone(),
56 });
57 }
58 }
59
60 Ok(())
61 }
62
63 #[must_use]
65 pub fn resume_report(&self) -> JournalResumeReport {
66 let mut counts = JournalStateCounts::default();
67 let mut artifacts = Vec::with_capacity(self.artifacts.len());
68
69 for artifact in &self.artifacts {
70 counts.record(artifact.state, artifact.resume_action());
71 artifacts.push(ArtifactResumeReport {
72 canister_id: artifact.canister_id.clone(),
73 snapshot_id: artifact.snapshot_id.clone(),
74 state: artifact.state,
75 resume_action: artifact.resume_action(),
76 artifact_path: artifact.artifact_path.clone(),
77 temp_path: artifact.temp_path.clone(),
78 updated_at: artifact.updated_at.clone(),
79 });
80 }
81
82 JournalResumeReport {
83 backup_id: self.backup_id.clone(),
84 discovery_topology_hash: self.discovery_topology_hash.clone(),
85 pre_snapshot_topology_hash: self.pre_snapshot_topology_hash.clone(),
86 total_artifacts: self.artifacts.len(),
87 is_complete: counts.skip == self.artifacts.len(),
88 pending_artifacts: self.artifacts.len() - counts.skip,
89 counts,
90 operation_metrics: self.operation_metrics.clone(),
91 artifacts,
92 }
93 }
94}
95
96#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
101pub struct DownloadOperationMetrics {
102 pub target_count: usize,
103 pub snapshot_create_started: usize,
104 pub snapshot_create_completed: usize,
105 pub snapshot_download_started: usize,
106 pub snapshot_download_completed: usize,
107 pub checksum_verify_started: usize,
108 pub checksum_verify_completed: usize,
109 pub artifact_finalize_started: usize,
110 pub artifact_finalize_completed: usize,
111}
112
113#[derive(Clone, Debug, Deserialize, Serialize)]
118pub struct ArtifactJournalEntry {
119 pub canister_id: String,
120 pub snapshot_id: String,
121 pub state: ArtifactState,
122 pub temp_path: Option<String>,
123 pub artifact_path: String,
124 pub checksum_algorithm: String,
125 pub checksum: Option<String>,
126 pub updated_at: String,
127}
128
129impl ArtifactJournalEntry {
130 #[must_use]
132 pub const fn resume_action(&self) -> ResumeAction {
133 match self.state {
134 ArtifactState::Created => ResumeAction::Download,
135 ArtifactState::Downloaded => ResumeAction::VerifyChecksum,
136 ArtifactState::ChecksumVerified => ResumeAction::Finalize,
137 ArtifactState::Durable => ResumeAction::Skip,
138 }
139 }
140
141 pub fn advance_to(
143 &mut self,
144 next_state: ArtifactState,
145 updated_at: String,
146 ) -> Result<(), JournalValidationError> {
147 if !self.state.can_advance_to(next_state) {
148 return Err(JournalValidationError::InvalidStateTransition {
149 from: self.state,
150 to: next_state,
151 });
152 }
153
154 self.state = next_state;
155 self.updated_at = updated_at;
156 Ok(())
157 }
158
159 fn validate(&self) -> Result<(), JournalValidationError> {
161 validate_principal("artifacts[].canister_id", &self.canister_id)?;
162 validate_nonempty("artifacts[].snapshot_id", &self.snapshot_id)?;
163 validate_nonempty("artifacts[].artifact_path", &self.artifact_path)?;
164 validate_relative_artifact_path("artifacts[].artifact_path", &self.artifact_path)?;
165 validate_nonempty("artifacts[].checksum_algorithm", &self.checksum_algorithm)?;
166 validate_nonempty("artifacts[].updated_at", &self.updated_at)?;
167
168 if self.checksum_algorithm != SHA256_ALGORITHM {
169 return Err(JournalValidationError::UnsupportedHashAlgorithm(
170 self.checksum_algorithm.clone(),
171 ));
172 }
173
174 if matches!(
175 self.state,
176 ArtifactState::Downloaded | ArtifactState::ChecksumVerified
177 ) {
178 validate_required_option("artifacts[].temp_path", self.temp_path.as_deref())?;
179 }
180
181 if matches!(
182 self.state,
183 ArtifactState::ChecksumVerified | ArtifactState::Durable
184 ) {
185 validate_required_hash("artifacts[].checksum", self.checksum.as_deref())?;
186 }
187
188 Ok(())
189 }
190}
191
192#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
197#[serde(rename_all = "PascalCase")]
198pub enum ArtifactState {
199 Created,
200 Downloaded,
201 ChecksumVerified,
202 Durable,
203}
204
205impl ArtifactState {
206 #[must_use]
208 pub const fn can_advance_to(self, next: Self) -> bool {
209 self.as_order() <= next.as_order()
210 }
211
212 const fn as_order(self) -> u8 {
214 match self {
215 Self::Created => 0,
216 Self::Downloaded => 1,
217 Self::ChecksumVerified => 2,
218 Self::Durable => 3,
219 }
220 }
221}
222
223#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
228#[serde(rename_all = "PascalCase")]
229pub enum ResumeAction {
230 Download,
231 VerifyChecksum,
232 Finalize,
233 Skip,
234}
235
236#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
241pub struct JournalResumeReport {
242 pub backup_id: String,
243 pub discovery_topology_hash: Option<String>,
244 pub pre_snapshot_topology_hash: Option<String>,
245 pub total_artifacts: usize,
246 pub is_complete: bool,
247 pub pending_artifacts: usize,
248 pub counts: JournalStateCounts,
249 pub operation_metrics: DownloadOperationMetrics,
250 pub artifacts: Vec<ArtifactResumeReport>,
251}
252
253#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
258pub struct JournalStateCounts {
259 pub created: usize,
260 pub downloaded: usize,
261 pub checksum_verified: usize,
262 pub durable: usize,
263 pub download: usize,
264 pub verify_checksum: usize,
265 pub finalize: usize,
266 pub skip: usize,
267}
268
269impl JournalStateCounts {
270 const fn record(&mut self, state: ArtifactState, action: ResumeAction) {
272 match state {
273 ArtifactState::Created => self.created += 1,
274 ArtifactState::Downloaded => self.downloaded += 1,
275 ArtifactState::ChecksumVerified => self.checksum_verified += 1,
276 ArtifactState::Durable => self.durable += 1,
277 }
278
279 match action {
280 ResumeAction::Download => self.download += 1,
281 ResumeAction::VerifyChecksum => self.verify_checksum += 1,
282 ResumeAction::Finalize => self.finalize += 1,
283 ResumeAction::Skip => self.skip += 1,
284 }
285 }
286}
287
288#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
293pub struct ArtifactResumeReport {
294 pub canister_id: String,
295 pub snapshot_id: String,
296 pub state: ArtifactState,
297 pub resume_action: ResumeAction,
298 pub artifact_path: String,
299 pub temp_path: Option<String>,
300 pub updated_at: String,
301}
302
303#[derive(Debug, ThisError)]
308pub enum JournalValidationError {
309 #[error("unsupported journal version {0}")]
310 UnsupportedJournalVersion(u16),
311
312 #[error("field {0} must not be empty")]
313 EmptyField(&'static str),
314
315 #[error("collection {0} must not be empty")]
316 EmptyCollection(&'static str),
317
318 #[error("field {field} must be a valid principal: {value}")]
319 InvalidPrincipal { field: &'static str, value: String },
320
321 #[error("field {0} must be a non-empty sha256 hex string")]
322 InvalidHash(&'static str),
323
324 #[error("unsupported hash algorithm {0}")]
325 UnsupportedHashAlgorithm(String),
326
327 #[error("field {field} must be a relative artifact path under the backup root: {value}")]
328 InvalidArtifactPath { field: &'static str, value: String },
329
330 #[error("duplicate artifact entry for canister {canister_id} snapshot {snapshot_id}")]
331 DuplicateArtifact {
332 canister_id: String,
333 snapshot_id: String,
334 },
335
336 #[error("invalid journal transition from {from:?} to {to:?}")]
337 InvalidStateTransition {
338 from: ArtifactState,
339 to: ArtifactState,
340 },
341}
342
343const fn validate_journal_version(version: u16) -> Result<(), JournalValidationError> {
345 if version == SUPPORTED_JOURNAL_VERSION {
346 Ok(())
347 } else {
348 Err(JournalValidationError::UnsupportedJournalVersion(version))
349 }
350}
351
352fn validate_nonempty(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
354 if value.trim().is_empty() {
355 Err(JournalValidationError::EmptyField(field))
356 } else {
357 Ok(())
358 }
359}
360
361fn validate_relative_artifact_path(
363 field: &'static str,
364 value: &str,
365) -> Result<(), JournalValidationError> {
366 let path = PathBuf::from(value);
367 if path.is_absolute()
368 || !path
369 .components()
370 .all(|component| matches!(component, Component::Normal(_) | Component::CurDir))
371 {
372 return Err(JournalValidationError::InvalidArtifactPath {
373 field,
374 value: value.to_string(),
375 });
376 }
377 Ok(())
378}
379
380fn validate_required_option(
382 field: &'static str,
383 value: Option<&str>,
384) -> Result<(), JournalValidationError> {
385 match value {
386 Some(value) => validate_nonempty(field, value),
387 None => Err(JournalValidationError::EmptyField(field)),
388 }
389}
390
391fn validate_principal(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
393 validate_nonempty(field, value)?;
394 Principal::from_str(value)
395 .map(|_| ())
396 .map_err(|_| JournalValidationError::InvalidPrincipal {
397 field,
398 value: value.to_string(),
399 })
400}
401
402fn validate_required_hash(
404 field: &'static str,
405 value: Option<&str>,
406) -> Result<(), JournalValidationError> {
407 match value {
408 Some(value) => validate_hash(field, value),
409 None => Err(JournalValidationError::EmptyField(field)),
410 }
411}
412
413fn validate_optional_hash(
415 field: &'static str,
416 value: Option<&str>,
417) -> Result<(), JournalValidationError> {
418 if let Some(value) = value {
419 validate_hash(field, value)?;
420 }
421 Ok(())
422}
423
424fn validate_hash(field: &'static str, value: &str) -> Result<(), JournalValidationError> {
426 const SHA256_HEX_LEN: usize = 64;
427 validate_nonempty(field, value)?;
428 if value.len() == SHA256_HEX_LEN && value.bytes().all(|b| b.is_ascii_hexdigit()) {
429 Ok(())
430 } else {
431 Err(JournalValidationError::InvalidHash(field))
432 }
433}
434
435#[cfg(test)]
436mod tests;