1use serde::{Deserialize, Serialize};
2use serde_json::{Map, Value};
3use sha2::{Digest, Sha256};
4use std::fmt::Write as _;
5
6pub mod auth_lease;
7pub mod binary_snapshot;
8pub mod binary_sync_pack;
9pub mod blob;
10pub mod error;
11pub mod integrity;
12pub mod realtime;
13pub mod snapshot_artifact;
14pub mod snapshot_chunk;
15pub mod snapshot_manifest;
16pub mod validation;
17
18pub use auth_lease::{
19 AuthLeaseCapabilities, AuthLeaseIssueRequest, AuthLeaseIssueResponse, AuthLeasePayload,
20 AuthLeaseProtectedHeader, AuthLeaseProvenance, AuthLeaseScope, AuthLeaseValidationResult,
21 AUTH_LEASE_ALG_ES256, AUTH_LEASE_CODE_BUSINESS_REJECTED, AUTH_LEASE_CODE_EXPIRED,
22 AUTH_LEASE_CODE_INVALID, AUTH_LEASE_CODE_MISSING, AUTH_LEASE_CODE_SCHEMA_MISMATCH,
23 AUTH_LEASE_CODE_SCOPE_MISMATCH, AUTH_LEASE_CODE_SCOPE_REVOKED, AUTH_LEASE_PROTOCOL_VERSION,
24 AUTH_LEASE_TYP, AUTH_LEASE_VERSION,
25};
26pub use blob::{
27 blob_hash, normalize_blob_mime_type, validate_blob_bytes, validate_blob_digest,
28 validate_blob_hash, validate_blob_ref, BlobDownloadUrlResponse, BlobRef,
29 BlobUploadCompleteResponse, BlobUploadInitRequest, BlobUploadInitResponse,
30};
31pub use error::{ProtocolError, Result};
32pub use integrity::{
33 validate_pull_commit_integrity_metadata, verify_subscription_commit_integrity,
34 wire_commit_chain_root, wire_commit_chain_root_from_digest, wire_commit_digest,
35 VerifiedCommitRoot,
36};
37pub use realtime::{
38 realtime_presence_event_from_value, realtime_push_response_from_value, RealtimePresenceEntry,
39 RealtimePresenceEvent, RealtimePresenceRequest, RealtimePushRequest, RealtimePushResponseData,
40 RealtimeServerMessage, REALTIME_CLIENT_MESSAGE_PRESENCE, REALTIME_CLIENT_MESSAGE_PUSH,
41 REALTIME_SERVER_EVENT_PRESENCE, REALTIME_SERVER_EVENT_PUSH_RESPONSE,
42 REALTIME_SERVER_EVENT_SYNC,
43};
44pub use snapshot_artifact::{
45 scoped_snapshot_artifact_manifest_digest, validate_scoped_snapshot_artifact_manifest,
46 validate_scoped_snapshot_artifact_ref, ScopedSnapshotArtifactManifest,
47 ScopedSnapshotArtifactRef, SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1,
48 SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION, SNAPSHOT_ARTIFACT_COMPRESSION_NONE,
49};
50pub use snapshot_chunk::{
51 decode_snapshot_chunk_sha256, validate_snapshot_chunk_format,
52 validate_snapshot_chunk_hash_bytes, validate_snapshot_chunk_hash_hex,
53 SNAPSHOT_CHUNK_COMPRESSION_GZIP,
54};
55pub use snapshot_manifest::{snapshot_manifest_digest, validate_pull_snapshot_manifests};
56pub use validation::{
57 validate_combined_request, validate_combined_response, validate_realtime_presence_request,
58 validate_realtime_push_request, validate_realtime_server_message,
59};
60
61pub const COMMIT_INTEGRITY_HEX_LENGTH: usize = 64;
62pub const COMMIT_INTEGRITY_GENESIS_ROOT: &str =
63 "0000000000000000000000000000000000000000000000000000000000000000";
64pub const WIRE_COMMIT_DIGEST_VERSION: &str = "syncular-wire-commit-digest-v1";
65pub const WIRE_COMMIT_CHAIN_ROOT_VERSION: &str = "syncular-wire-commit-chain-root-v1";
66pub const SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1: &str = "binary-table-v1";
67pub const SYNC_PACK_ENCODING_BINARY_V1: &str = "binary-sync-pack-v1";
68pub const SYNC_PACK_CONTENT_TYPE: &str = "application/vnd.syncular.sync-pack.v1";
69pub const BINARY_SYNC_PACK_WIRE_VERSION: u16 = 14;
70pub const SNAPSHOT_MANIFEST_VERSION: i32 = 1;
71
72pub type ScopeValues = Map<String, Value>;
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct SyncOperation {
76 pub table: String,
77 pub row_id: String,
78 pub op: String,
79 pub payload: Option<Value>,
80 pub base_version: Option<i64>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct PushCommitRequest {
85 #[serde(rename = "clientCommitId")]
86 pub client_commit_id: String,
87 pub operations: Vec<SyncOperation>,
88 #[serde(rename = "schemaVersion")]
89 pub schema_version: i32,
90 #[serde(rename = "authLease", default, skip_serializing_if = "Option::is_none")]
91 pub auth_lease: Option<AuthLeaseProvenance>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct PushBatchRequest {
96 pub commits: Vec<PushCommitRequest>,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
100pub struct BootstrapState {
101 #[serde(rename = "asOfCommitSeq")]
102 pub as_of_commit_seq: i64,
103 pub tables: Vec<String>,
104 #[serde(rename = "tableIndex")]
105 pub table_index: i64,
106 #[serde(rename = "rowCursor")]
107 pub row_cursor: Option<String>,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct CrdtStateVectorHint {
112 #[serde(rename = "rowId")]
113 pub row_id: String,
114 pub field: String,
115 #[serde(rename = "stateColumn")]
116 pub state_column: String,
117 #[serde(rename = "stateVectorBase64")]
118 pub state_vector_base64: String,
119 #[serde(rename = "syncMode")]
120 pub sync_mode: String,
121 #[serde(rename = "updatedAt")]
122 pub updated_at: i64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SubscriptionRequest {
127 pub id: String,
128 pub table: String,
129 pub scopes: ScopeValues,
130 #[serde(default, skip_serializing_if = "Map::is_empty")]
131 pub params: Map<String, Value>,
132 pub cursor: i64,
133 #[serde(rename = "bootstrapState", skip_serializing_if = "Option::is_none")]
134 pub bootstrap_state: Option<BootstrapState>,
135 #[serde(rename = "verifiedRoot", skip_serializing_if = "Option::is_none")]
136 pub verified_root: Option<String>,
137 #[serde(rename = "crdtStateVectors")]
138 pub crdt_state_vectors: Vec<CrdtStateVectorHint>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct SnapshotArtifactsRequest {
143 #[serde(rename = "artifactKinds")]
144 pub artifact_kinds: Vec<String>,
145 #[serde(default, skip_serializing_if = "Vec::is_empty")]
146 pub compressions: Vec<String>,
147 #[serde(rename = "featureSet", default, skip_serializing_if = "Vec::is_empty")]
148 pub feature_set: Vec<String>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct PullRequest {
153 #[serde(rename = "schemaVersion")]
154 pub schema_version: i32,
155 #[serde(rename = "limitCommits")]
156 pub limit_commits: i64,
157 #[serde(rename = "limitSnapshotRows")]
158 pub limit_snapshot_rows: i64,
159 #[serde(rename = "maxSnapshotPages")]
160 pub max_snapshot_pages: i64,
161 #[serde(rename = "dedupeRows", skip_serializing_if = "Option::is_none")]
162 pub dedupe_rows: Option<bool>,
163 #[serde(rename = "snapshotArtifacts", skip_serializing_if = "Option::is_none")]
164 pub snapshot_artifacts: Option<SnapshotArtifactsRequest>,
165 pub subscriptions: Vec<SubscriptionRequest>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct CombinedRequest {
170 #[serde(rename = "clientId")]
171 pub client_id: String,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub push: Option<PushBatchRequest>,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub pull: Option<PullRequest>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct CombinedResponse {
180 pub ok: bool,
181 #[serde(
182 rename = "requiredSchemaVersion",
183 skip_serializing_if = "Option::is_none"
184 )]
185 pub required_schema_version: Option<i32>,
186 #[serde(
187 rename = "latestSchemaVersion",
188 skip_serializing_if = "Option::is_none"
189 )]
190 pub latest_schema_version: Option<i32>,
191 pub push: Option<PushBatchResponse>,
192 pub pull: Option<PullResponse>,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct PushBatchResponse {
197 pub ok: bool,
198 pub commits: Vec<PushCommitResponse>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PushCommitResponse {
203 #[serde(rename = "clientCommitId")]
204 pub client_commit_id: String,
205 pub status: String,
206 #[serde(rename = "commitSeq")]
207 pub commit_seq: Option<i64>,
208 pub results: Vec<OperationResult>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct OperationResult {
213 #[serde(rename = "opIndex")]
214 pub op_index: i32,
215 pub status: String,
216 pub message: Option<String>,
217 pub error: Option<String>,
218 pub code: Option<String>,
219 pub retriable: Option<bool>,
220 #[serde(rename = "server_version")]
221 pub server_version: Option<i64>,
222 #[serde(rename = "server_row")]
223 pub server_row: Option<Value>,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct PullResponse {
228 pub ok: bool,
229 pub subscriptions: Vec<SubscriptionResponse>,
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct SubscriptionResponse {
234 pub id: String,
235 pub status: String,
236 pub scopes: ScopeValues,
237 pub bootstrap: bool,
238 #[serde(rename = "bootstrapState")]
239 pub bootstrap_state: Option<BootstrapState>,
240 #[serde(rename = "nextCursor")]
241 pub next_cursor: i64,
242 pub integrity: Option<SubscriptionIntegrity>,
243 pub commits: Vec<SyncCommit>,
244 pub snapshots: Option<Vec<SyncSnapshot>>,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
248#[serde(deny_unknown_fields)]
249pub struct SubscriptionIntegrity {
250 #[serde(rename = "partitionId")]
251 pub partition_id: String,
252 #[serde(rename = "previousChainRoot")]
253 pub previous_chain_root: String,
254 #[serde(rename = "commitChainRoot")]
255 pub commit_chain_root: String,
256 #[serde(rename = "commitSeq")]
257 pub commit_seq: i64,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261#[serde(deny_unknown_fields)]
262pub struct SyncCommit {
263 #[serde(rename = "commitSeq")]
264 pub commit_seq: i64,
265 #[serde(rename = "createdAt")]
266 pub created_at: String,
267 #[serde(rename = "actorId")]
268 pub actor_id: String,
269 pub changes: Vec<SyncChange>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct SyncChange {
274 pub table: String,
275 pub row_id: String,
276 pub op: String,
277 pub row_json: Option<Value>,
278 pub row_version: Option<i64>,
279 pub scopes: ScopeValues,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SyncSnapshot {
284 pub table: String,
285 pub rows: Vec<Value>,
286 pub chunks: Option<Vec<SnapshotChunkRef>>,
287 #[serde(skip_serializing_if = "Option::is_none")]
288 pub artifacts: Option<Vec<ScopedSnapshotArtifactRef>>,
289 #[serde(skip_serializing_if = "Option::is_none")]
290 pub manifest: Option<SnapshotManifest>,
291 #[serde(rename = "isFirstPage")]
292 pub is_first_page: bool,
293 #[serde(rename = "isLastPage")]
294 pub is_last_page: bool,
295 #[serde(rename = "bootstrapStateAfter")]
296 pub bootstrap_state_after: Option<BootstrapState>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct SnapshotChunkRef {
301 pub id: String,
302 #[serde(rename = "byteLength")]
303 pub byte_length: i64,
304 pub sha256: String,
305 pub encoding: String,
306 pub compression: String,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SnapshotManifest {
311 pub version: i32,
312 pub digest: String,
313 pub table: String,
314 #[serde(rename = "asOfCommitSeq")]
315 pub as_of_commit_seq: i64,
316 #[serde(rename = "scopeDigest")]
317 pub scope_digest: String,
318 #[serde(rename = "rowCursor")]
319 pub row_cursor: Option<String>,
320 #[serde(rename = "rowLimit")]
321 pub row_limit: i64,
322 #[serde(rename = "nextRowCursor")]
323 pub next_row_cursor: Option<String>,
324 #[serde(rename = "isFirstPage")]
325 pub is_first_page: bool,
326 #[serde(rename = "isLastPage")]
327 pub is_last_page: bool,
328 pub chunks: Vec<SnapshotManifestChunkRef>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize)]
332pub struct SnapshotManifestChunkRef {
333 pub id: String,
334 #[serde(rename = "byteLength")]
335 pub byte_length: i64,
336 pub sha256: String,
337 pub encoding: String,
338 pub compression: String,
339}
340
341pub fn sha256_hex(value: &str) -> String {
342 hex::encode(Sha256::digest(value.as_bytes()))
343}
344
345pub fn canonical_json_string(value: &Value) -> Result<String> {
346 let mut out = String::new();
347 append_canonical_json(&mut out, value)?;
348 Ok(out)
349}
350
351pub fn append_canonical_json(out: &mut String, value: &Value) -> Result<()> {
352 match value {
353 Value::Null => out.push_str("null"),
354 Value::Bool(value) => out.push_str(if *value { "true" } else { "false" }),
355 Value::Number(value) => write!(out, "{value}").expect("writing to String should not fail"),
356 Value::String(value) => append_json_string(out, value)?,
357 Value::Array(values) => {
358 out.push('[');
359 for (index, item) in values.iter().enumerate() {
360 if index > 0 {
361 out.push(',');
362 }
363 append_canonical_json(out, item)?;
364 }
365 out.push(']');
366 }
367 Value::Object(values) => {
368 append_canonical_object(out, values)?;
369 }
370 }
371 Ok(())
372}
373
374pub fn append_canonical_object(out: &mut String, values: &Map<String, Value>) -> Result<()> {
375 out.push('{');
376 let body_start = out.len();
377 let mut previous: Option<&str> = None;
378 for (index, (key, value)) in values.iter().enumerate() {
379 if let Some(previous) = previous {
380 if previous > key.as_str() {
381 out.truncate(body_start);
382 append_canonical_object_sorted_body(out, values)?;
383 out.push('}');
384 return Ok(());
385 }
386 }
387 if index > 0 {
388 out.push(',');
389 }
390 append_json_string(out, key)?;
391 out.push(':');
392 append_canonical_json(out, value)?;
393 previous = Some(key.as_str());
394 }
395 out.push('}');
396 Ok(())
397}
398
399fn append_canonical_object_sorted_body(
400 out: &mut String,
401 values: &Map<String, Value>,
402) -> Result<()> {
403 let mut keys = values.keys().collect::<Vec<_>>();
404 keys.sort();
405 for (index, key) in keys.into_iter().enumerate() {
406 if index > 0 {
407 out.push(',');
408 }
409 append_json_string(out, key)?;
410 out.push(':');
411 append_canonical_json(
412 out,
413 values
414 .get(key)
415 .expect("serde_json object key should resolve"),
416 )?;
417 }
418 Ok(())
419}
420
421pub(crate) fn append_json_string(out: &mut String, value: &str) -> Result<()> {
422 const HEX: &[u8; 16] = b"0123456789abcdef";
423
424 out.push('"');
425 let mut chunk_start = 0;
426 for (index, byte) in value.bytes().enumerate() {
427 let escaped = match byte {
428 b'"' => Some("\\\""),
429 b'\\' => Some("\\\\"),
430 b'\n' => Some("\\n"),
431 b'\r' => Some("\\r"),
432 b'\t' => Some("\\t"),
433 0x08 => Some("\\b"),
434 0x0c => Some("\\f"),
435 0x00..=0x1f => {
436 out.push_str(&value[chunk_start..index]);
437 out.push_str("\\u00");
438 out.push(HEX[(byte >> 4) as usize] as char);
439 out.push(HEX[(byte & 0x0f) as usize] as char);
440 chunk_start = index + 1;
441 continue;
442 }
443 _ => None,
444 };
445 if let Some(escaped) = escaped {
446 out.push_str(&value[chunk_start..index]);
447 out.push_str(escaped);
448 chunk_start = index + 1;
449 }
450 }
451 out.push_str(&value[chunk_start..]);
452 out.push('"');
453 Ok(())
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use serde_json::json;
460
461 #[test]
462 fn append_json_string_matches_serde_json_string_escaping() {
463 let samples = [
464 "",
465 "plain",
466 "quote\"slash\\",
467 "line\nreturn\rtab\tbackspace\u{0008}form\u{000c}",
468 "\u{0000}\u{0001}\u{001f}",
469 "München 日本語",
470 ];
471
472 for sample in samples {
473 let mut actual = String::new();
474 append_json_string(&mut actual, sample).expect("append string");
475 assert_eq!(actual, serde_json::to_string(sample).expect("serde string"));
476 }
477 }
478
479 #[test]
480 fn canonical_json_string_escapes_object_keys_and_string_values() {
481 let value = json!({
482 "line\nkey": "quote\"slash\\",
483 "nested": {
484 "control": "\u{0001}",
485 "unicode": "München 日本語"
486 }
487 });
488
489 let canonical = canonical_json_string(&value).expect("canonical json");
490 assert_eq!(
491 canonical,
492 "{\"line\\nkey\":\"quote\\\"slash\\\\\",\"nested\":{\"control\":\"\\u0001\",\"unicode\":\"München 日本語\"}}"
493 );
494 }
495}