Skip to main content

syncular_protocol/
snapshot_artifact.rs

1use crate::integrity::validate_commit_integrity_hex;
2use crate::{ProtocolError, Result, SNAPSHOT_CHUNK_COMPRESSION_GZIP};
3use serde::{Deserialize, Serialize};
4use sha2::{Digest, Sha256};
5use std::collections::BTreeSet;
6
7pub const SNAPSHOT_ARTIFACT_COMPRESSION_NONE: &str = "none";
8pub const SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION: i32 = 1;
9pub const SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1: &str = "sqlite-snapshot-v1";
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
12pub struct ScopedSnapshotArtifactManifest {
13    pub version: i32,
14    #[serde(rename = "artifactKind")]
15    pub artifact_kind: String,
16    pub digest: String,
17    #[serde(rename = "partitionId")]
18    pub partition_id: String,
19    #[serde(rename = "subscriptionId")]
20    pub subscription_id: String,
21    pub table: String,
22    #[serde(rename = "schemaVersion")]
23    pub schema_version: String,
24    #[serde(rename = "asOfCommitSeq")]
25    pub as_of_commit_seq: i64,
26    #[serde(rename = "scopeDigest")]
27    pub scope_digest: String,
28    #[serde(rename = "rowCursor")]
29    pub row_cursor: Option<String>,
30    #[serde(rename = "rowLimit")]
31    pub row_limit: i64,
32    #[serde(rename = "rowCount")]
33    pub row_count: i64,
34    #[serde(rename = "nextRowCursor")]
35    pub next_row_cursor: Option<String>,
36    #[serde(rename = "isFirstPage")]
37    pub is_first_page: bool,
38    #[serde(rename = "isLastPage")]
39    pub is_last_page: bool,
40    pub compression: String,
41    #[serde(rename = "byteLength")]
42    pub byte_length: i64,
43    pub sha256: String,
44    #[serde(rename = "featureSet", default)]
45    pub feature_set: Vec<String>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct ScopedSnapshotArtifactRef {
50    pub id: String,
51    #[serde(rename = "byteLength")]
52    pub byte_length: i64,
53    pub sha256: String,
54    #[serde(rename = "manifestDigest")]
55    pub manifest_digest: String,
56    #[serde(rename = "artifactKind")]
57    pub artifact_kind: String,
58    pub compression: String,
59    #[serde(rename = "rowCount")]
60    pub row_count: i64,
61    #[serde(rename = "nextRowCursor")]
62    pub next_row_cursor: Option<String>,
63    #[serde(rename = "isFirstPage")]
64    pub is_first_page: bool,
65    #[serde(rename = "isLastPage")]
66    pub is_last_page: bool,
67    pub manifest: ScopedSnapshotArtifactManifest,
68}
69
70pub fn validate_scoped_snapshot_artifact_manifest(
71    manifest: &ScopedSnapshotArtifactManifest,
72) -> Result<()> {
73    if manifest.version != SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION {
74        return Err(ProtocolError::message(format!(
75            "unsupported scoped snapshot artifact manifest version {}",
76            manifest.version
77        )));
78    }
79    if manifest.artifact_kind != SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1 {
80        return Err(ProtocolError::message(format!(
81            "unsupported scoped snapshot artifact kind {}",
82            manifest.artifact_kind
83        )));
84    }
85    if manifest.compression != SNAPSHOT_ARTIFACT_COMPRESSION_NONE
86        && manifest.compression != SNAPSHOT_CHUNK_COMPRESSION_GZIP
87    {
88        return Err(ProtocolError::message(format!(
89            "unsupported scoped snapshot artifact compression {}",
90            manifest.compression
91        )));
92    }
93    if manifest.row_limit < 1 {
94        return Err(ProtocolError::message(format!(
95            "scoped snapshot artifact rowLimit must be positive: {}",
96            manifest.row_limit
97        )));
98    }
99    if manifest.row_count < 0 {
100        return Err(ProtocolError::message(format!(
101            "scoped snapshot artifact rowCount must be non-negative: {}",
102            manifest.row_count
103        )));
104    }
105    if manifest.byte_length < 0 {
106        return Err(ProtocolError::message(format!(
107            "scoped snapshot artifact byteLength must be non-negative: {}",
108            manifest.byte_length
109        )));
110    }
111    validate_commit_integrity_hex(
112        "scoped snapshot artifact digest",
113        &manifest.subscription_id,
114        manifest.as_of_commit_seq,
115        &manifest.digest,
116    )?;
117    validate_commit_integrity_hex(
118        "scoped snapshot artifact scope digest",
119        &manifest.subscription_id,
120        manifest.as_of_commit_seq,
121        &manifest.scope_digest,
122    )?;
123    validate_commit_integrity_hex(
124        "scoped snapshot artifact sha256",
125        &manifest.subscription_id,
126        manifest.as_of_commit_seq,
127        &manifest.sha256,
128    )?;
129
130    let actual_digest = scoped_snapshot_artifact_manifest_digest(manifest)?;
131    if actual_digest != manifest.digest {
132        return Err(ProtocolError::message(format!(
133            "scoped snapshot artifact digest mismatch: expected {}, got {}",
134            manifest.digest, actual_digest
135        )));
136    }
137    Ok(())
138}
139
140pub fn validate_scoped_snapshot_artifact_ref(artifact: &ScopedSnapshotArtifactRef) -> Result<()> {
141    if artifact.id.is_empty() {
142        return Err(ProtocolError::message(
143            "scoped snapshot artifact id must not be empty",
144        ));
145    }
146    validate_scoped_snapshot_artifact_manifest(&artifact.manifest)?;
147    if artifact.manifest_digest != artifact.manifest.digest {
148        return Err(ProtocolError::message(format!(
149            "scoped snapshot artifact ref manifest digest mismatch: {} != {}",
150            artifact.manifest_digest, artifact.manifest.digest
151        )));
152    }
153    if artifact.artifact_kind != artifact.manifest.artifact_kind {
154        return Err(ProtocolError::message(format!(
155            "scoped snapshot artifact ref kind mismatch: {} != {}",
156            artifact.artifact_kind, artifact.manifest.artifact_kind
157        )));
158    }
159    if artifact.compression != artifact.manifest.compression {
160        return Err(ProtocolError::message(format!(
161            "scoped snapshot artifact ref compression mismatch: {} != {}",
162            artifact.compression, artifact.manifest.compression
163        )));
164    }
165    if artifact.byte_length != artifact.manifest.byte_length {
166        return Err(ProtocolError::message(format!(
167            "scoped snapshot artifact ref byte length mismatch: {} != {}",
168            artifact.byte_length, artifact.manifest.byte_length
169        )));
170    }
171    if artifact.sha256 != artifact.manifest.sha256 {
172        return Err(ProtocolError::message(format!(
173            "scoped snapshot artifact ref sha256 mismatch: {} != {}",
174            artifact.sha256, artifact.manifest.sha256
175        )));
176    }
177    if artifact.row_count != artifact.manifest.row_count {
178        return Err(ProtocolError::message(format!(
179            "scoped snapshot artifact ref row count mismatch: {} != {}",
180            artifact.row_count, artifact.manifest.row_count
181        )));
182    }
183    if artifact.next_row_cursor != artifact.manifest.next_row_cursor {
184        return Err(ProtocolError::message(
185            "scoped snapshot artifact ref next cursor mismatch",
186        ));
187    }
188    if artifact.is_first_page != artifact.manifest.is_first_page {
189        return Err(ProtocolError::message(
190            "scoped snapshot artifact ref first-page flag mismatch",
191        ));
192    }
193    if artifact.is_last_page != artifact.manifest.is_last_page {
194        return Err(ProtocolError::message(
195            "scoped snapshot artifact ref last-page flag mismatch",
196        ));
197    }
198    Ok(())
199}
200
201pub fn scoped_snapshot_artifact_manifest_digest(
202    manifest: &ScopedSnapshotArtifactManifest,
203) -> Result<String> {
204    if manifest.row_limit < 1 {
205        return Err(ProtocolError::message(format!(
206            "scoped snapshot artifact rowLimit must be positive: {}",
207            manifest.row_limit
208        )));
209    }
210    if manifest.row_count < 0 {
211        return Err(ProtocolError::message(format!(
212            "scoped snapshot artifact rowCount must be non-negative: {}",
213            manifest.row_count
214        )));
215    }
216    if manifest.byte_length < 0 {
217        return Err(ProtocolError::message(format!(
218            "scoped snapshot artifact byteLength must be non-negative: {}",
219            manifest.byte_length
220        )));
221    }
222    Ok(hex::encode(Sha256::digest(
223        scoped_snapshot_artifact_digest_payload(manifest),
224    )))
225}
226
227fn scoped_snapshot_artifact_digest_payload(manifest: &ScopedSnapshotArtifactManifest) -> String {
228    let feature_set = normalized_feature_set(&manifest.feature_set);
229    let mut parts = Vec::with_capacity(19 + feature_set.len() * 2);
230    parts.push("syncular.scoped-snapshot-artifact.v1".to_string());
231    append_manifest_int_field(&mut parts, "version", manifest.version.into());
232    append_manifest_string_field(&mut parts, "artifactKind", &manifest.artifact_kind);
233    append_manifest_string_field(&mut parts, "partitionId", &manifest.partition_id);
234    append_manifest_string_field(&mut parts, "subscriptionId", &manifest.subscription_id);
235    append_manifest_string_field(&mut parts, "table", &manifest.table);
236    append_manifest_string_field(&mut parts, "schemaVersion", &manifest.schema_version);
237    append_manifest_int_field(&mut parts, "asOfCommitSeq", manifest.as_of_commit_seq);
238    append_manifest_string_field(&mut parts, "scopeDigest", &manifest.scope_digest);
239    append_manifest_nullable_string_field(&mut parts, "rowCursor", manifest.row_cursor.as_deref());
240    append_manifest_int_field(&mut parts, "rowLimit", manifest.row_limit);
241    append_manifest_int_field(&mut parts, "rowCount", manifest.row_count);
242    append_manifest_nullable_string_field(
243        &mut parts,
244        "nextRowCursor",
245        manifest.next_row_cursor.as_deref(),
246    );
247    append_manifest_bool_field(&mut parts, "isFirstPage", manifest.is_first_page);
248    append_manifest_bool_field(&mut parts, "isLastPage", manifest.is_last_page);
249    append_manifest_string_field(&mut parts, "compression", &manifest.compression);
250    append_manifest_int_field(&mut parts, "byteLength", manifest.byte_length);
251    append_manifest_string_field(&mut parts, "sha256", &manifest.sha256);
252    append_manifest_int_field(&mut parts, "featureCount", feature_set.len() as i64);
253
254    for (index, feature) in feature_set.iter().enumerate() {
255        append_manifest_int_field(&mut parts, &format!("feature.{index}.index"), index as i64);
256        append_manifest_string_field(&mut parts, &format!("feature.{index}.name"), feature);
257    }
258
259    format!("{}\n", parts.join("\n"))
260}
261
262fn normalized_feature_set(feature_set: &[String]) -> Vec<String> {
263    feature_set
264        .iter()
265        .cloned()
266        .collect::<BTreeSet<_>>()
267        .into_iter()
268        .collect()
269}
270
271fn append_manifest_string_field(parts: &mut Vec<String>, name: &str, value: &str) {
272    parts.push(format!("{name}:s:{}:{value}", value.len()));
273}
274
275fn append_manifest_nullable_string_field(parts: &mut Vec<String>, name: &str, value: Option<&str>) {
276    match value {
277        Some(value) => append_manifest_string_field(parts, name, value),
278        None => parts.push(format!("{name}:n")),
279    }
280}
281
282fn append_manifest_int_field(parts: &mut Vec<String>, name: &str, value: i64) {
283    parts.push(format!("{name}:i:{value}"));
284}
285
286fn append_manifest_bool_field(parts: &mut Vec<String>, name: &str, value: bool) {
287    parts.push(format!("{name}:b:{}", if value { 1 } else { 0 }));
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    fn artifact(feature_set: Vec<String>) -> ScopedSnapshotArtifactManifest {
295        ScopedSnapshotArtifactManifest {
296            version: SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION,
297            artifact_kind: SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1.to_string(),
298            digest: String::new(),
299            partition_id: "partition-1".to_string(),
300            subscription_id: "sub-tasks".to_string(),
301            table: "tasks".to_string(),
302            schema_version: "7".to_string(),
303            as_of_commit_seq: 42,
304            scope_digest: "a".repeat(64),
305            row_cursor: None,
306            row_limit: 50_000,
307            row_count: 12_345,
308            next_row_cursor: Some("task-12345".to_string()),
309            is_first_page: true,
310            is_last_page: false,
311            compression: SNAPSHOT_ARTIFACT_COMPRESSION_NONE.to_string(),
312            byte_length: 4096,
313            sha256: "b".repeat(64),
314            feature_set,
315        }
316    }
317
318    #[test]
319    fn validates_scoped_snapshot_artifact_manifest() {
320        let mut manifest = artifact(vec![
321            "crdt-yjs".to_string(),
322            "blobs".to_string(),
323            "crdt-yjs".to_string(),
324        ]);
325        manifest.digest = scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
326
327        validate_scoped_snapshot_artifact_manifest(&manifest).expect("valid artifact manifest");
328
329        let mut reordered = artifact(vec!["blobs".to_string(), "crdt-yjs".to_string()]);
330        reordered.digest = scoped_snapshot_artifact_manifest_digest(&reordered).expect("digest");
331        assert_eq!(manifest.digest, reordered.digest);
332    }
333
334    #[test]
335    fn rejects_scope_mismatch() {
336        let mut manifest = artifact(vec!["blobs".to_string()]);
337        manifest.digest = scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
338        manifest.scope_digest = "c".repeat(64);
339
340        let error = validate_scoped_snapshot_artifact_manifest(&manifest)
341            .expect_err("scope mismatch rejects");
342        assert!(
343            error
344                .to_string()
345                .contains("scoped snapshot artifact digest mismatch"),
346            "{error}"
347        );
348    }
349
350    #[test]
351    fn validates_scoped_snapshot_artifact_refs() {
352        let mut manifest = artifact(vec!["blobs".to_string()]);
353        manifest.digest = scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
354        let artifact = ScopedSnapshotArtifactRef {
355            id: "artifact-1".to_string(),
356            byte_length: manifest.byte_length,
357            sha256: manifest.sha256.clone(),
358            manifest_digest: manifest.digest.clone(),
359            artifact_kind: manifest.artifact_kind.clone(),
360            compression: manifest.compression.clone(),
361            row_count: manifest.row_count,
362            next_row_cursor: manifest.next_row_cursor.clone(),
363            is_first_page: manifest.is_first_page,
364            is_last_page: manifest.is_last_page,
365            manifest,
366        };
367
368        validate_scoped_snapshot_artifact_ref(&artifact).expect("valid artifact ref");
369
370        let mut mismatched = artifact.clone();
371        mismatched.sha256 = "c".repeat(64);
372        assert!(validate_scoped_snapshot_artifact_ref(&mismatched).is_err());
373    }
374}