1use crate::integrity::validate_commit_integrity_hex;
2use crate::{ProtocolError, Result, SNAPSHOT_CHUNK_COMPRESSION_GZIP};
3use serde::{Deserialize, Serialize};
4use sha2::{Digest, Sha256};
5use std::collections::BTreeSet;
6
7pub const SNAPSHOT_ARTIFACT_COMPRESSION_NONE: &str = "none";
8pub const SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION: i32 = 1;
9pub const SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1: &str = "sqlite-snapshot-v1";
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
12pub struct ScopedSnapshotArtifactManifest {
13 pub version: i32,
14 #[serde(rename = "artifactKind")]
15 pub artifact_kind: String,
16 pub digest: String,
17 #[serde(rename = "partitionId")]
18 pub partition_id: String,
19 #[serde(rename = "subscriptionId")]
20 pub subscription_id: String,
21 pub table: String,
22 #[serde(rename = "schemaVersion")]
23 pub schema_version: String,
24 #[serde(rename = "asOfCommitSeq")]
25 pub as_of_commit_seq: i64,
26 #[serde(rename = "scopeDigest")]
27 pub scope_digest: String,
28 #[serde(rename = "rowCursor")]
29 pub row_cursor: Option<String>,
30 #[serde(rename = "rowLimit")]
31 pub row_limit: i64,
32 #[serde(rename = "rowCount")]
33 pub row_count: i64,
34 #[serde(rename = "nextRowCursor")]
35 pub next_row_cursor: Option<String>,
36 #[serde(rename = "isFirstPage")]
37 pub is_first_page: bool,
38 #[serde(rename = "isLastPage")]
39 pub is_last_page: bool,
40 pub compression: String,
41 #[serde(rename = "byteLength")]
42 pub byte_length: i64,
43 pub sha256: String,
44 #[serde(rename = "featureSet", default)]
45 pub feature_set: Vec<String>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct ScopedSnapshotArtifactRef {
50 pub id: String,
51 #[serde(rename = "byteLength")]
52 pub byte_length: i64,
53 pub sha256: String,
54 #[serde(rename = "manifestDigest")]
55 pub manifest_digest: String,
56 #[serde(rename = "artifactKind")]
57 pub artifact_kind: String,
58 pub compression: String,
59 #[serde(rename = "rowCount")]
60 pub row_count: i64,
61 #[serde(rename = "nextRowCursor")]
62 pub next_row_cursor: Option<String>,
63 #[serde(rename = "isFirstPage")]
64 pub is_first_page: bool,
65 #[serde(rename = "isLastPage")]
66 pub is_last_page: bool,
67 pub manifest: ScopedSnapshotArtifactManifest,
68}
69
70pub fn validate_scoped_snapshot_artifact_manifest(
71 manifest: &ScopedSnapshotArtifactManifest,
72) -> Result<()> {
73 if manifest.version != SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION {
74 return Err(ProtocolError::message(format!(
75 "unsupported scoped snapshot artifact manifest version {}",
76 manifest.version
77 )));
78 }
79 if manifest.artifact_kind != SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1 {
80 return Err(ProtocolError::message(format!(
81 "unsupported scoped snapshot artifact kind {}",
82 manifest.artifact_kind
83 )));
84 }
85 if manifest.compression != SNAPSHOT_ARTIFACT_COMPRESSION_NONE
86 && manifest.compression != SNAPSHOT_CHUNK_COMPRESSION_GZIP
87 {
88 return Err(ProtocolError::message(format!(
89 "unsupported scoped snapshot artifact compression {}",
90 manifest.compression
91 )));
92 }
93 if manifest.row_limit < 1 {
94 return Err(ProtocolError::message(format!(
95 "scoped snapshot artifact rowLimit must be positive: {}",
96 manifest.row_limit
97 )));
98 }
99 if manifest.row_count < 0 {
100 return Err(ProtocolError::message(format!(
101 "scoped snapshot artifact rowCount must be non-negative: {}",
102 manifest.row_count
103 )));
104 }
105 if manifest.byte_length < 0 {
106 return Err(ProtocolError::message(format!(
107 "scoped snapshot artifact byteLength must be non-negative: {}",
108 manifest.byte_length
109 )));
110 }
111 validate_commit_integrity_hex(
112 "scoped snapshot artifact digest",
113 &manifest.subscription_id,
114 manifest.as_of_commit_seq,
115 &manifest.digest,
116 )?;
117 validate_commit_integrity_hex(
118 "scoped snapshot artifact scope digest",
119 &manifest.subscription_id,
120 manifest.as_of_commit_seq,
121 &manifest.scope_digest,
122 )?;
123 validate_commit_integrity_hex(
124 "scoped snapshot artifact sha256",
125 &manifest.subscription_id,
126 manifest.as_of_commit_seq,
127 &manifest.sha256,
128 )?;
129
130 let actual_digest = scoped_snapshot_artifact_manifest_digest(manifest)?;
131 if actual_digest != manifest.digest {
132 return Err(ProtocolError::message(format!(
133 "scoped snapshot artifact digest mismatch: expected {}, got {}",
134 manifest.digest, actual_digest
135 )));
136 }
137 Ok(())
138}
139
140pub fn validate_scoped_snapshot_artifact_ref(artifact: &ScopedSnapshotArtifactRef) -> Result<()> {
141 if artifact.id.is_empty() {
142 return Err(ProtocolError::message(
143 "scoped snapshot artifact id must not be empty",
144 ));
145 }
146 validate_scoped_snapshot_artifact_manifest(&artifact.manifest)?;
147 if artifact.manifest_digest != artifact.manifest.digest {
148 return Err(ProtocolError::message(format!(
149 "scoped snapshot artifact ref manifest digest mismatch: {} != {}",
150 artifact.manifest_digest, artifact.manifest.digest
151 )));
152 }
153 if artifact.artifact_kind != artifact.manifest.artifact_kind {
154 return Err(ProtocolError::message(format!(
155 "scoped snapshot artifact ref kind mismatch: {} != {}",
156 artifact.artifact_kind, artifact.manifest.artifact_kind
157 )));
158 }
159 if artifact.compression != artifact.manifest.compression {
160 return Err(ProtocolError::message(format!(
161 "scoped snapshot artifact ref compression mismatch: {} != {}",
162 artifact.compression, artifact.manifest.compression
163 )));
164 }
165 if artifact.byte_length != artifact.manifest.byte_length {
166 return Err(ProtocolError::message(format!(
167 "scoped snapshot artifact ref byte length mismatch: {} != {}",
168 artifact.byte_length, artifact.manifest.byte_length
169 )));
170 }
171 if artifact.sha256 != artifact.manifest.sha256 {
172 return Err(ProtocolError::message(format!(
173 "scoped snapshot artifact ref sha256 mismatch: {} != {}",
174 artifact.sha256, artifact.manifest.sha256
175 )));
176 }
177 if artifact.row_count != artifact.manifest.row_count {
178 return Err(ProtocolError::message(format!(
179 "scoped snapshot artifact ref row count mismatch: {} != {}",
180 artifact.row_count, artifact.manifest.row_count
181 )));
182 }
183 if artifact.next_row_cursor != artifact.manifest.next_row_cursor {
184 return Err(ProtocolError::message(
185 "scoped snapshot artifact ref next cursor mismatch",
186 ));
187 }
188 if artifact.is_first_page != artifact.manifest.is_first_page {
189 return Err(ProtocolError::message(
190 "scoped snapshot artifact ref first-page flag mismatch",
191 ));
192 }
193 if artifact.is_last_page != artifact.manifest.is_last_page {
194 return Err(ProtocolError::message(
195 "scoped snapshot artifact ref last-page flag mismatch",
196 ));
197 }
198 Ok(())
199}
200
201pub fn scoped_snapshot_artifact_manifest_digest(
202 manifest: &ScopedSnapshotArtifactManifest,
203) -> Result<String> {
204 if manifest.row_limit < 1 {
205 return Err(ProtocolError::message(format!(
206 "scoped snapshot artifact rowLimit must be positive: {}",
207 manifest.row_limit
208 )));
209 }
210 if manifest.row_count < 0 {
211 return Err(ProtocolError::message(format!(
212 "scoped snapshot artifact rowCount must be non-negative: {}",
213 manifest.row_count
214 )));
215 }
216 if manifest.byte_length < 0 {
217 return Err(ProtocolError::message(format!(
218 "scoped snapshot artifact byteLength must be non-negative: {}",
219 manifest.byte_length
220 )));
221 }
222 Ok(hex::encode(Sha256::digest(
223 scoped_snapshot_artifact_digest_payload(manifest),
224 )))
225}
226
227fn scoped_snapshot_artifact_digest_payload(manifest: &ScopedSnapshotArtifactManifest) -> String {
228 let feature_set = normalized_feature_set(&manifest.feature_set);
229 let mut parts = Vec::with_capacity(19 + feature_set.len() * 2);
230 parts.push("syncular.scoped-snapshot-artifact.v1".to_string());
231 append_manifest_int_field(&mut parts, "version", manifest.version.into());
232 append_manifest_string_field(&mut parts, "artifactKind", &manifest.artifact_kind);
233 append_manifest_string_field(&mut parts, "partitionId", &manifest.partition_id);
234 append_manifest_string_field(&mut parts, "subscriptionId", &manifest.subscription_id);
235 append_manifest_string_field(&mut parts, "table", &manifest.table);
236 append_manifest_string_field(&mut parts, "schemaVersion", &manifest.schema_version);
237 append_manifest_int_field(&mut parts, "asOfCommitSeq", manifest.as_of_commit_seq);
238 append_manifest_string_field(&mut parts, "scopeDigest", &manifest.scope_digest);
239 append_manifest_nullable_string_field(&mut parts, "rowCursor", manifest.row_cursor.as_deref());
240 append_manifest_int_field(&mut parts, "rowLimit", manifest.row_limit);
241 append_manifest_int_field(&mut parts, "rowCount", manifest.row_count);
242 append_manifest_nullable_string_field(
243 &mut parts,
244 "nextRowCursor",
245 manifest.next_row_cursor.as_deref(),
246 );
247 append_manifest_bool_field(&mut parts, "isFirstPage", manifest.is_first_page);
248 append_manifest_bool_field(&mut parts, "isLastPage", manifest.is_last_page);
249 append_manifest_string_field(&mut parts, "compression", &manifest.compression);
250 append_manifest_int_field(&mut parts, "byteLength", manifest.byte_length);
251 append_manifest_string_field(&mut parts, "sha256", &manifest.sha256);
252 append_manifest_int_field(&mut parts, "featureCount", feature_set.len() as i64);
253
254 for (index, feature) in feature_set.iter().enumerate() {
255 append_manifest_int_field(&mut parts, &format!("feature.{index}.index"), index as i64);
256 append_manifest_string_field(&mut parts, &format!("feature.{index}.name"), feature);
257 }
258
259 format!("{}\n", parts.join("\n"))
260}
261
262fn normalized_feature_set(feature_set: &[String]) -> Vec<String> {
263 feature_set
264 .iter()
265 .cloned()
266 .collect::<BTreeSet<_>>()
267 .into_iter()
268 .collect()
269}
270
271fn append_manifest_string_field(parts: &mut Vec<String>, name: &str, value: &str) {
272 parts.push(format!("{name}:s:{}:{value}", value.len()));
273}
274
275fn append_manifest_nullable_string_field(parts: &mut Vec<String>, name: &str, value: Option<&str>) {
276 match value {
277 Some(value) => append_manifest_string_field(parts, name, value),
278 None => parts.push(format!("{name}:n")),
279 }
280}
281
282fn append_manifest_int_field(parts: &mut Vec<String>, name: &str, value: i64) {
283 parts.push(format!("{name}:i:{value}"));
284}
285
286fn append_manifest_bool_field(parts: &mut Vec<String>, name: &str, value: bool) {
287 parts.push(format!("{name}:b:{}", if value { 1 } else { 0 }));
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 fn artifact(feature_set: Vec<String>) -> ScopedSnapshotArtifactManifest {
295 ScopedSnapshotArtifactManifest {
296 version: SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION,
297 artifact_kind: SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1.to_string(),
298 digest: String::new(),
299 partition_id: "partition-1".to_string(),
300 subscription_id: "sub-tasks".to_string(),
301 table: "tasks".to_string(),
302 schema_version: "7".to_string(),
303 as_of_commit_seq: 42,
304 scope_digest: "a".repeat(64),
305 row_cursor: None,
306 row_limit: 50_000,
307 row_count: 12_345,
308 next_row_cursor: Some("task-12345".to_string()),
309 is_first_page: true,
310 is_last_page: false,
311 compression: SNAPSHOT_ARTIFACT_COMPRESSION_NONE.to_string(),
312 byte_length: 4096,
313 sha256: "b".repeat(64),
314 feature_set,
315 }
316 }
317
318 #[test]
319 fn validates_scoped_snapshot_artifact_manifest() {
320 let mut manifest = artifact(vec![
321 "crdt-yjs".to_string(),
322 "blobs".to_string(),
323 "crdt-yjs".to_string(),
324 ]);
325 manifest.digest = scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
326
327 validate_scoped_snapshot_artifact_manifest(&manifest).expect("valid artifact manifest");
328
329 let mut reordered = artifact(vec!["blobs".to_string(), "crdt-yjs".to_string()]);
330 reordered.digest = scoped_snapshot_artifact_manifest_digest(&reordered).expect("digest");
331 assert_eq!(manifest.digest, reordered.digest);
332 }
333
334 #[test]
335 fn rejects_scope_mismatch() {
336 let mut manifest = artifact(vec!["blobs".to_string()]);
337 manifest.digest = scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
338 manifest.scope_digest = "c".repeat(64);
339
340 let error = validate_scoped_snapshot_artifact_manifest(&manifest)
341 .expect_err("scope mismatch rejects");
342 assert!(
343 error
344 .to_string()
345 .contains("scoped snapshot artifact digest mismatch"),
346 "{error}"
347 );
348 }
349
350 #[test]
351 fn validates_scoped_snapshot_artifact_refs() {
352 let mut manifest = artifact(vec!["blobs".to_string()]);
353 manifest.digest = scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
354 let artifact = ScopedSnapshotArtifactRef {
355 id: "artifact-1".to_string(),
356 byte_length: manifest.byte_length,
357 sha256: manifest.sha256.clone(),
358 manifest_digest: manifest.digest.clone(),
359 artifact_kind: manifest.artifact_kind.clone(),
360 compression: manifest.compression.clone(),
361 row_count: manifest.row_count,
362 next_row_cursor: manifest.next_row_cursor.clone(),
363 is_first_page: manifest.is_first_page,
364 is_last_page: manifest.is_last_page,
365 manifest,
366 };
367
368 validate_scoped_snapshot_artifact_ref(&artifact).expect("valid artifact ref");
369
370 let mut mismatched = artifact.clone();
371 mismatched.sha256 = "c".repeat(64);
372 assert!(validate_scoped_snapshot_artifact_ref(&mismatched).is_err());
373 }
374}