Skip to main content

syncular_protocol/
lib.rs

1use serde::{Deserialize, Serialize};
2use serde_json::{Map, Value};
3use sha2::{Digest, Sha256};
4use std::fmt::Write as _;
5
6pub mod auth_lease;
7pub mod binary_snapshot;
8pub mod binary_sync_pack;
9pub mod blob;
10pub mod error;
11pub mod integrity;
12pub mod realtime;
13pub mod snapshot_artifact;
14pub mod snapshot_chunk;
15pub mod snapshot_manifest;
16pub mod validation;
17
18pub use auth_lease::{
19    AuthLeaseCapabilities, AuthLeaseIssueRequest, AuthLeaseIssueResponse, AuthLeasePayload,
20    AuthLeaseProtectedHeader, AuthLeaseProvenance, AuthLeaseScope, AuthLeaseValidationResult,
21    AUTH_LEASE_ALG_ES256, AUTH_LEASE_CODE_BUSINESS_REJECTED, AUTH_LEASE_CODE_EXPIRED,
22    AUTH_LEASE_CODE_INVALID, AUTH_LEASE_CODE_MISSING, AUTH_LEASE_CODE_SCHEMA_MISMATCH,
23    AUTH_LEASE_CODE_SCOPE_MISMATCH, AUTH_LEASE_CODE_SCOPE_REVOKED, AUTH_LEASE_PROTOCOL_VERSION,
24    AUTH_LEASE_TYP, AUTH_LEASE_VERSION,
25};
26pub use blob::{
27    blob_hash, normalize_blob_mime_type, validate_blob_bytes, validate_blob_digest,
28    validate_blob_hash, validate_blob_ref, BlobDownloadUrlResponse, BlobRef,
29    BlobUploadCompleteResponse, BlobUploadInitRequest, BlobUploadInitResponse,
30};
31pub use error::{ProtocolError, Result};
32pub use integrity::{
33    validate_pull_commit_integrity_metadata, verify_subscription_commit_integrity,
34    wire_commit_chain_root, wire_commit_chain_root_from_digest, wire_commit_digest,
35    VerifiedCommitRoot,
36};
37pub use realtime::{
38    realtime_presence_event_from_value, realtime_push_response_from_value, RealtimePresenceEntry,
39    RealtimePresenceEvent, RealtimePresenceRequest, RealtimePushRequest, RealtimePushResponseData,
40    RealtimeServerMessage, REALTIME_CLIENT_MESSAGE_PRESENCE, REALTIME_CLIENT_MESSAGE_PUSH,
41    REALTIME_SERVER_EVENT_PRESENCE, REALTIME_SERVER_EVENT_PUSH_RESPONSE,
42    REALTIME_SERVER_EVENT_SYNC,
43};
44pub use snapshot_artifact::{
45    scoped_snapshot_artifact_manifest_digest, validate_scoped_snapshot_artifact_manifest,
46    validate_scoped_snapshot_artifact_ref, ScopedSnapshotArtifactManifest,
47    ScopedSnapshotArtifactRef, SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1,
48    SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION, SNAPSHOT_ARTIFACT_COMPRESSION_NONE,
49};
50pub use snapshot_chunk::{
51    decode_snapshot_chunk_sha256, validate_snapshot_chunk_format,
52    validate_snapshot_chunk_hash_bytes, validate_snapshot_chunk_hash_hex,
53    SNAPSHOT_CHUNK_COMPRESSION_GZIP,
54};
55pub use snapshot_manifest::{snapshot_manifest_digest, validate_pull_snapshot_manifests};
56pub use validation::{
57    validate_combined_request, validate_combined_response, validate_realtime_presence_request,
58    validate_realtime_push_request, validate_realtime_server_message,
59};
60
61pub const COMMIT_INTEGRITY_HEX_LENGTH: usize = 64;
62pub const COMMIT_INTEGRITY_GENESIS_ROOT: &str =
63    "0000000000000000000000000000000000000000000000000000000000000000";
64pub const WIRE_COMMIT_DIGEST_VERSION: &str = "syncular-wire-commit-digest-v1";
65pub const WIRE_COMMIT_CHAIN_ROOT_VERSION: &str = "syncular-wire-commit-chain-root-v1";
66pub const SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1: &str = "binary-table-v1";
67pub const SYNC_PACK_ENCODING_BINARY_V1: &str = "binary-sync-pack-v1";
68pub const SYNC_PACK_CONTENT_TYPE: &str = "application/vnd.syncular.sync-pack.v1";
69pub const BINARY_SYNC_PACK_WIRE_VERSION: u16 = 14;
70pub const SNAPSHOT_MANIFEST_VERSION: i32 = 1;
71
72pub type ScopeValues = Map<String, Value>;
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct SyncOperation {
76    pub table: String,
77    pub row_id: String,
78    pub op: String,
79    pub payload: Option<Value>,
80    pub base_version: Option<i64>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct PushCommitRequest {
85    #[serde(rename = "clientCommitId")]
86    pub client_commit_id: String,
87    pub operations: Vec<SyncOperation>,
88    #[serde(rename = "schemaVersion")]
89    pub schema_version: i32,
90    #[serde(rename = "authLease", default, skip_serializing_if = "Option::is_none")]
91    pub auth_lease: Option<AuthLeaseProvenance>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct PushBatchRequest {
96    pub commits: Vec<PushCommitRequest>,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
100pub struct BootstrapState {
101    #[serde(rename = "asOfCommitSeq")]
102    pub as_of_commit_seq: i64,
103    pub tables: Vec<String>,
104    #[serde(rename = "tableIndex")]
105    pub table_index: i64,
106    #[serde(rename = "rowCursor")]
107    pub row_cursor: Option<String>,
108}
109
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct CrdtStateVectorHint {
112    #[serde(rename = "rowId")]
113    pub row_id: String,
114    pub field: String,
115    #[serde(rename = "stateColumn")]
116    pub state_column: String,
117    #[serde(rename = "stateVectorBase64")]
118    pub state_vector_base64: String,
119    #[serde(rename = "syncMode")]
120    pub sync_mode: String,
121    #[serde(rename = "updatedAt")]
122    pub updated_at: i64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SubscriptionRequest {
127    pub id: String,
128    pub table: String,
129    pub scopes: ScopeValues,
130    #[serde(default, skip_serializing_if = "Map::is_empty")]
131    pub params: Map<String, Value>,
132    pub cursor: i64,
133    #[serde(rename = "bootstrapState", skip_serializing_if = "Option::is_none")]
134    pub bootstrap_state: Option<BootstrapState>,
135    #[serde(rename = "verifiedRoot", skip_serializing_if = "Option::is_none")]
136    pub verified_root: Option<String>,
137    #[serde(rename = "crdtStateVectors")]
138    pub crdt_state_vectors: Vec<CrdtStateVectorHint>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct SnapshotArtifactsRequest {
143    #[serde(rename = "artifactKinds")]
144    pub artifact_kinds: Vec<String>,
145    #[serde(default, skip_serializing_if = "Vec::is_empty")]
146    pub compressions: Vec<String>,
147    #[serde(rename = "featureSet", default, skip_serializing_if = "Vec::is_empty")]
148    pub feature_set: Vec<String>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct PullRequest {
153    #[serde(rename = "schemaVersion")]
154    pub schema_version: i32,
155    #[serde(rename = "limitCommits")]
156    pub limit_commits: i64,
157    #[serde(rename = "limitSnapshotRows")]
158    pub limit_snapshot_rows: i64,
159    #[serde(rename = "maxSnapshotPages")]
160    pub max_snapshot_pages: i64,
161    #[serde(rename = "dedupeRows", skip_serializing_if = "Option::is_none")]
162    pub dedupe_rows: Option<bool>,
163    #[serde(rename = "snapshotArtifacts", skip_serializing_if = "Option::is_none")]
164    pub snapshot_artifacts: Option<SnapshotArtifactsRequest>,
165    pub subscriptions: Vec<SubscriptionRequest>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct CombinedRequest {
170    #[serde(rename = "clientId")]
171    pub client_id: String,
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub push: Option<PushBatchRequest>,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub pull: Option<PullRequest>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct CombinedResponse {
180    pub ok: bool,
181    #[serde(
182        rename = "requiredSchemaVersion",
183        skip_serializing_if = "Option::is_none"
184    )]
185    pub required_schema_version: Option<i32>,
186    #[serde(
187        rename = "latestSchemaVersion",
188        skip_serializing_if = "Option::is_none"
189    )]
190    pub latest_schema_version: Option<i32>,
191    pub push: Option<PushBatchResponse>,
192    pub pull: Option<PullResponse>,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct PushBatchResponse {
197    pub ok: bool,
198    pub commits: Vec<PushCommitResponse>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct PushCommitResponse {
203    #[serde(rename = "clientCommitId")]
204    pub client_commit_id: String,
205    pub status: String,
206    #[serde(rename = "commitSeq")]
207    pub commit_seq: Option<i64>,
208    pub results: Vec<OperationResult>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct OperationResult {
213    #[serde(rename = "opIndex")]
214    pub op_index: i32,
215    pub status: String,
216    pub message: Option<String>,
217    pub error: Option<String>,
218    pub code: Option<String>,
219    pub retriable: Option<bool>,
220    #[serde(rename = "server_version")]
221    pub server_version: Option<i64>,
222    #[serde(rename = "server_row")]
223    pub server_row: Option<Value>,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct PullResponse {
228    pub ok: bool,
229    pub subscriptions: Vec<SubscriptionResponse>,
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct SubscriptionResponse {
234    pub id: String,
235    pub status: String,
236    pub scopes: ScopeValues,
237    pub bootstrap: bool,
238    #[serde(rename = "bootstrapState")]
239    pub bootstrap_state: Option<BootstrapState>,
240    #[serde(rename = "nextCursor")]
241    pub next_cursor: i64,
242    pub integrity: Option<SubscriptionIntegrity>,
243    pub commits: Vec<SyncCommit>,
244    pub snapshots: Option<Vec<SyncSnapshot>>,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
248#[serde(deny_unknown_fields)]
249pub struct SubscriptionIntegrity {
250    #[serde(rename = "partitionId")]
251    pub partition_id: String,
252    #[serde(rename = "previousChainRoot")]
253    pub previous_chain_root: String,
254    #[serde(rename = "commitChainRoot")]
255    pub commit_chain_root: String,
256    #[serde(rename = "commitSeq")]
257    pub commit_seq: i64,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261#[serde(deny_unknown_fields)]
262pub struct SyncCommit {
263    #[serde(rename = "commitSeq")]
264    pub commit_seq: i64,
265    #[serde(rename = "createdAt")]
266    pub created_at: String,
267    #[serde(rename = "actorId")]
268    pub actor_id: String,
269    pub changes: Vec<SyncChange>,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273pub struct SyncChange {
274    pub table: String,
275    pub row_id: String,
276    pub op: String,
277    pub row_json: Option<Value>,
278    pub row_version: Option<i64>,
279    pub scopes: ScopeValues,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SyncSnapshot {
284    pub table: String,
285    pub rows: Vec<Value>,
286    pub chunks: Option<Vec<SnapshotChunkRef>>,
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub artifacts: Option<Vec<ScopedSnapshotArtifactRef>>,
289    #[serde(skip_serializing_if = "Option::is_none")]
290    pub manifest: Option<SnapshotManifest>,
291    #[serde(rename = "isFirstPage")]
292    pub is_first_page: bool,
293    #[serde(rename = "isLastPage")]
294    pub is_last_page: bool,
295    #[serde(rename = "bootstrapStateAfter")]
296    pub bootstrap_state_after: Option<BootstrapState>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct SnapshotChunkRef {
301    pub id: String,
302    #[serde(rename = "byteLength")]
303    pub byte_length: i64,
304    pub sha256: String,
305    pub encoding: String,
306    pub compression: String,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct SnapshotManifest {
311    pub version: i32,
312    pub digest: String,
313    pub table: String,
314    #[serde(rename = "asOfCommitSeq")]
315    pub as_of_commit_seq: i64,
316    #[serde(rename = "scopeDigest")]
317    pub scope_digest: String,
318    #[serde(rename = "rowCursor")]
319    pub row_cursor: Option<String>,
320    #[serde(rename = "rowLimit")]
321    pub row_limit: i64,
322    #[serde(rename = "nextRowCursor")]
323    pub next_row_cursor: Option<String>,
324    #[serde(rename = "isFirstPage")]
325    pub is_first_page: bool,
326    #[serde(rename = "isLastPage")]
327    pub is_last_page: bool,
328    pub chunks: Vec<SnapshotManifestChunkRef>,
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize)]
332pub struct SnapshotManifestChunkRef {
333    pub id: String,
334    #[serde(rename = "byteLength")]
335    pub byte_length: i64,
336    pub sha256: String,
337    pub encoding: String,
338    pub compression: String,
339}
340
341pub fn sha256_hex(value: &str) -> String {
342    hex::encode(Sha256::digest(value.as_bytes()))
343}
344
345pub fn canonical_json_string(value: &Value) -> Result<String> {
346    let mut out = String::new();
347    append_canonical_json(&mut out, value)?;
348    Ok(out)
349}
350
351pub fn append_canonical_json(out: &mut String, value: &Value) -> Result<()> {
352    match value {
353        Value::Null => out.push_str("null"),
354        Value::Bool(value) => out.push_str(if *value { "true" } else { "false" }),
355        Value::Number(value) => write!(out, "{value}").expect("writing to String should not fail"),
356        Value::String(value) => append_json_string(out, value)?,
357        Value::Array(values) => {
358            out.push('[');
359            for (index, item) in values.iter().enumerate() {
360                if index > 0 {
361                    out.push(',');
362                }
363                append_canonical_json(out, item)?;
364            }
365            out.push(']');
366        }
367        Value::Object(values) => {
368            append_canonical_object(out, values)?;
369        }
370    }
371    Ok(())
372}
373
374pub fn append_canonical_object(out: &mut String, values: &Map<String, Value>) -> Result<()> {
375    out.push('{');
376    let body_start = out.len();
377    let mut previous: Option<&str> = None;
378    for (index, (key, value)) in values.iter().enumerate() {
379        if let Some(previous) = previous {
380            if previous > key.as_str() {
381                out.truncate(body_start);
382                append_canonical_object_sorted_body(out, values)?;
383                out.push('}');
384                return Ok(());
385            }
386        }
387        if index > 0 {
388            out.push(',');
389        }
390        append_json_string(out, key)?;
391        out.push(':');
392        append_canonical_json(out, value)?;
393        previous = Some(key.as_str());
394    }
395    out.push('}');
396    Ok(())
397}
398
399fn append_canonical_object_sorted_body(
400    out: &mut String,
401    values: &Map<String, Value>,
402) -> Result<()> {
403    let mut keys = values.keys().collect::<Vec<_>>();
404    keys.sort();
405    for (index, key) in keys.into_iter().enumerate() {
406        if index > 0 {
407            out.push(',');
408        }
409        append_json_string(out, key)?;
410        out.push(':');
411        append_canonical_json(
412            out,
413            values
414                .get(key)
415                .expect("serde_json object key should resolve"),
416        )?;
417    }
418    Ok(())
419}
420
421pub(crate) fn append_json_string(out: &mut String, value: &str) -> Result<()> {
422    const HEX: &[u8; 16] = b"0123456789abcdef";
423
424    out.push('"');
425    let mut chunk_start = 0;
426    for (index, byte) in value.bytes().enumerate() {
427        let escaped = match byte {
428            b'"' => Some("\\\""),
429            b'\\' => Some("\\\\"),
430            b'\n' => Some("\\n"),
431            b'\r' => Some("\\r"),
432            b'\t' => Some("\\t"),
433            0x08 => Some("\\b"),
434            0x0c => Some("\\f"),
435            0x00..=0x1f => {
436                out.push_str(&value[chunk_start..index]);
437                out.push_str("\\u00");
438                out.push(HEX[(byte >> 4) as usize] as char);
439                out.push(HEX[(byte & 0x0f) as usize] as char);
440                chunk_start = index + 1;
441                continue;
442            }
443            _ => None,
444        };
445        if let Some(escaped) = escaped {
446            out.push_str(&value[chunk_start..index]);
447            out.push_str(escaped);
448            chunk_start = index + 1;
449        }
450    }
451    out.push_str(&value[chunk_start..]);
452    out.push('"');
453    Ok(())
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use serde_json::json;
460
461    #[test]
462    fn append_json_string_matches_serde_json_string_escaping() {
463        let samples = [
464            "",
465            "plain",
466            "quote\"slash\\",
467            "line\nreturn\rtab\tbackspace\u{0008}form\u{000c}",
468            "\u{0000}\u{0001}\u{001f}",
469            "München 日本語",
470        ];
471
472        for sample in samples {
473            let mut actual = String::new();
474            append_json_string(&mut actual, sample).expect("append string");
475            assert_eq!(actual, serde_json::to_string(sample).expect("serde string"));
476        }
477    }
478
479    #[test]
480    fn canonical_json_string_escapes_object_keys_and_string_values() {
481        let value = json!({
482            "line\nkey": "quote\"slash\\",
483            "nested": {
484                "control": "\u{0001}",
485                "unicode": "München 日本語"
486            }
487        });
488
489        let canonical = canonical_json_string(&value).expect("canonical json");
490        assert_eq!(
491            canonical,
492            "{\"line\\nkey\":\"quote\\\"slash\\\\\",\"nested\":{\"control\":\"\\u0001\",\"unicode\":\"München 日本語\"}}"
493        );
494    }
495}