Skip to main content

syncular_protocol/
validation.rs

1use crate::{
2    decode_snapshot_chunk_sha256, validate_pull_snapshot_manifests,
3    validate_scoped_snapshot_artifact_ref, validate_snapshot_chunk_format, AuthLeaseProvenance,
4    CombinedRequest, CombinedResponse, OperationResult, ProtocolError, PullRequest, PullResponse,
5    PushBatchRequest, PushBatchResponse, PushCommitRequest, PushCommitResponse,
6    RealtimePresenceRequest, RealtimePushRequest, RealtimeServerMessage, Result, ScopeValues,
7    SnapshotChunkRef, SyncChange, SyncOperation, SyncSnapshot, SYNC_PACK_ENCODING_BINARY_V1,
8};
9use serde_json::Value;
10
11pub fn validate_combined_request(request: &CombinedRequest) -> Result<()> {
12    ensure_non_empty("clientId", &request.client_id)?;
13    if let Some(push) = &request.push {
14        validate_push_batch_request(push)?;
15    }
16    if let Some(pull) = &request.pull {
17        validate_pull_request(pull)?;
18    }
19    if request.push.is_none() && request.pull.is_none() {
20        return Err(ProtocolError::message(
21            "combined request must include push or pull",
22        ));
23    }
24    Ok(())
25}
26
27pub fn validate_combined_response(response: &CombinedResponse) -> Result<()> {
28    if !response.ok {
29        return Err(ProtocolError::message("combined response ok must be true"));
30    }
31    if response
32        .required_schema_version
33        .is_some_and(|value| value < 1)
34    {
35        return Err(ProtocolError::message(
36            "combined response requiredSchemaVersion must be positive",
37        ));
38    }
39    if response
40        .latest_schema_version
41        .is_some_and(|value| value < 1)
42    {
43        return Err(ProtocolError::message(
44            "combined response latestSchemaVersion must be positive",
45        ));
46    }
47    if let Some(push) = &response.push {
48        validate_push_batch_response(push)?;
49    }
50    if let Some(pull) = &response.pull {
51        validate_pull_response(pull)?;
52    }
53    Ok(())
54}
55
56pub fn validate_realtime_push_request(request: &RealtimePushRequest) -> Result<()> {
57    if request.message_type != crate::REALTIME_CLIENT_MESSAGE_PUSH {
58        return Err(ProtocolError::message(format!(
59            "realtime push type must be {}, got {}",
60            crate::REALTIME_CLIENT_MESSAGE_PUSH,
61            request.message_type
62        )));
63    }
64    ensure_non_empty("realtime push requestId", &request.request_id)?;
65    validate_push_commit_request(&PushCommitRequest {
66        client_commit_id: request.client_commit_id.clone(),
67        operations: request.operations.clone(),
68        schema_version: request.schema_version,
69        auth_lease: request.auth_lease.clone(),
70    })
71}
72
73pub fn validate_realtime_presence_request(request: &RealtimePresenceRequest) -> Result<()> {
74    if request.message_type != crate::REALTIME_CLIENT_MESSAGE_PRESENCE {
75        return Err(ProtocolError::message(format!(
76            "realtime presence type must be {}, got {}",
77            crate::REALTIME_CLIENT_MESSAGE_PRESENCE,
78            request.message_type
79        )));
80    }
81    ensure_non_empty("realtime presence action", &request.action)?;
82    ensure_non_empty("realtime presence scopeKey", &request.scope_key)
83}
84
85pub fn validate_realtime_server_message(message: &RealtimeServerMessage) -> Result<()> {
86    match message.event.as_str() {
87        crate::REALTIME_SERVER_EVENT_SYNC => validate_realtime_sync_data(&message.data),
88        crate::REALTIME_SERVER_EVENT_PRESENCE => {
89            if crate::realtime_presence_event_from_value(&serde_json::json!({
90                "event": message.event,
91                "data": message.data
92            }))
93            .is_none()
94            {
95                return Err(ProtocolError::message(
96                    "realtime presence message is missing presence data",
97                ));
98            }
99            Ok(())
100        }
101        crate::REALTIME_SERVER_EVENT_PUSH_RESPONSE => {
102            let data = message
103                .data
104                .as_object()
105                .ok_or_else(|| ProtocolError::message("push-response data must be an object"))?;
106            ensure_value_string("push-response requestId", data.get("requestId"))?;
107            if let Some(results) = data.get("results") {
108                let results = results.as_array().ok_or_else(|| {
109                    ProtocolError::message("push-response results must be an array")
110                })?;
111                for result in results {
112                    let result: OperationResult = serde_json::from_value(result.clone())?;
113                    validate_operation_result(&result)?;
114                }
115            }
116            Ok(())
117        }
118        "hello" | "heartbeat" | "error" => Ok(()),
119        event => Err(ProtocolError::message(format!(
120            "unsupported realtime server event: {event}"
121        ))),
122    }
123}
124
125fn validate_push_batch_request(push: &PushBatchRequest) -> Result<()> {
126    if push.commits.is_empty() {
127        return Err(ProtocolError::message(
128            "push request must include at least one commit",
129        ));
130    }
131    for commit in &push.commits {
132        validate_push_commit_request(commit)?;
133    }
134    Ok(())
135}
136
137fn validate_push_commit_request(commit: &PushCommitRequest) -> Result<()> {
138    ensure_non_empty("clientCommitId", &commit.client_commit_id)?;
139    if commit.schema_version < 1 {
140        return Err(ProtocolError::message(
141            "push commit schemaVersion must be positive",
142        ));
143    }
144    if commit.operations.is_empty() {
145        return Err(ProtocolError::message(
146            "push commit must include at least one operation",
147        ));
148    }
149    for operation in &commit.operations {
150        validate_operation(operation)?;
151    }
152    if let Some(auth_lease) = &commit.auth_lease {
153        validate_auth_lease_provenance(auth_lease)?;
154    }
155    Ok(())
156}
157
158fn validate_operation(operation: &SyncOperation) -> Result<()> {
159    ensure_non_empty("operation table", &operation.table)?;
160    ensure_non_empty("operation row_id", &operation.row_id)?;
161    match operation.op.as_str() {
162        "upsert" | "delete" => Ok(()),
163        op => Err(ProtocolError::message(format!(
164            "unsupported operation op: {op}"
165        ))),
166    }
167}
168
169fn validate_auth_lease_provenance(auth_lease: &AuthLeaseProvenance) -> Result<()> {
170    ensure_non_empty("authLease leaseId", &auth_lease.lease_id)?;
171    if auth_lease.lease_expires_at_ms < 0 {
172        return Err(ProtocolError::message(
173            "authLease leaseExpiresAtMs must be non-negative",
174        ));
175    }
176    ensure_non_empty(
177        "authLease leaseStatusAtEnqueue",
178        &auth_lease.lease_status_at_enqueue,
179    )?;
180    if auth_lease.lease_token.as_deref().is_some_and(str::is_empty) {
181        return Err(ProtocolError::message(
182            "authLease leaseToken must not be empty",
183        ));
184    }
185    Ok(())
186}
187
188fn validate_pull_request(pull: &PullRequest) -> Result<()> {
189    if pull.schema_version < 1 {
190        return Err(ProtocolError::message(
191            "pull request schemaVersion must be positive",
192        ));
193    }
194    if pull.limit_commits < 1 {
195        return Err(ProtocolError::message(
196            "pull request limitCommits must be positive",
197        ));
198    }
199    if pull.limit_snapshot_rows < 1 {
200        return Err(ProtocolError::message(
201            "pull request limitSnapshotRows must be positive",
202        ));
203    }
204    if pull.max_snapshot_pages < 1 {
205        return Err(ProtocolError::message(
206            "pull request maxSnapshotPages must be positive",
207        ));
208    }
209    if let Some(artifacts) = &pull.snapshot_artifacts {
210        if artifacts.artifact_kinds.is_empty() {
211            return Err(ProtocolError::message(
212                "snapshotArtifacts artifactKinds must not be empty",
213            ));
214        }
215        for artifact_kind in &artifacts.artifact_kinds {
216            if artifact_kind != crate::SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1 {
217                return Err(ProtocolError::message(format!(
218                    "unsupported snapshot artifact kind: {artifact_kind}"
219                )));
220            }
221        }
222        for compression in &artifacts.compressions {
223            if compression != crate::SNAPSHOT_ARTIFACT_COMPRESSION_NONE
224                && compression != crate::SNAPSHOT_CHUNK_COMPRESSION_GZIP
225            {
226                return Err(ProtocolError::message(format!(
227                    "unsupported snapshot artifact compression: {compression}"
228                )));
229            }
230        }
231    }
232    for subscription in &pull.subscriptions {
233        ensure_non_empty("subscription id", &subscription.id)?;
234        ensure_non_empty("subscription table", &subscription.table)?;
235        validate_request_scopes(&subscription.scopes)?;
236        if subscription.cursor < 0 {
237            return Err(ProtocolError::message(
238                "subscription cursor must be non-negative",
239            ));
240        }
241        if subscription
242            .verified_root
243            .as_deref()
244            .is_some_and(|root| validate_hex_root("subscription verifiedRoot", root).is_err())
245        {
246            return Err(ProtocolError::message(
247                "subscription verifiedRoot must be a 64-character hex root",
248            ));
249        }
250    }
251    Ok(())
252}
253
254fn validate_push_batch_response(push: &PushBatchResponse) -> Result<()> {
255    if !push.ok {
256        return Err(ProtocolError::message("push response ok must be true"));
257    }
258    for commit in &push.commits {
259        validate_push_commit_response(commit)?;
260    }
261    Ok(())
262}
263
264fn validate_push_commit_response(commit: &PushCommitResponse) -> Result<()> {
265    ensure_non_empty("push response clientCommitId", &commit.client_commit_id)?;
266    match commit.status.as_str() {
267        "applied" | "cached" | "rejected" => {}
268        status => {
269            return Err(ProtocolError::message(format!(
270                "unsupported push response status: {status}"
271            )))
272        }
273    }
274    for result in &commit.results {
275        validate_operation_result(result)?;
276    }
277    Ok(())
278}
279
280fn validate_operation_result(result: &OperationResult) -> Result<()> {
281    if result.op_index < 0 {
282        return Err(ProtocolError::message(
283            "operation result opIndex must be non-negative",
284        ));
285    }
286    match result.status.as_str() {
287        "applied" => Ok(()),
288        "conflict" => {
289            if result.message.as_deref().unwrap_or("").is_empty() {
290                return Err(ProtocolError::message(
291                    "conflict operation result must include message",
292                ));
293            }
294            if result.server_version.is_none() {
295                return Err(ProtocolError::message(
296                    "conflict operation result must include server_version",
297                ));
298            }
299            Ok(())
300        }
301        "error" => {
302            if result.error.as_deref().unwrap_or("").is_empty() {
303                return Err(ProtocolError::message(
304                    "error operation result must include error",
305                ));
306            }
307            Ok(())
308        }
309        status => Err(ProtocolError::message(format!(
310            "unsupported operation result status: {status}"
311        ))),
312    }
313}
314
315fn validate_pull_response(pull: &PullResponse) -> Result<()> {
316    if !pull.ok {
317        return Err(ProtocolError::message("pull response ok must be true"));
318    }
319    validate_pull_snapshot_manifests(pull)?;
320    for subscription in &pull.subscriptions {
321        ensure_non_empty("pull subscription id", &subscription.id)?;
322        match subscription.status.as_str() {
323            "active" | "revoked" => {}
324            status => {
325                return Err(ProtocolError::message(format!(
326                    "unsupported pull subscription status: {status}"
327                )))
328            }
329        }
330        validate_request_scopes(&subscription.scopes)?;
331        if subscription.next_cursor < 0 {
332            return Err(ProtocolError::message(
333                "pull subscription nextCursor must be non-negative",
334            ));
335        }
336        if let Some(integrity) = &subscription.integrity {
337            ensure_non_empty(
338                "subscription integrity partitionId",
339                &integrity.partition_id,
340            )?;
341            validate_hex_root(
342                "subscription integrity previousChainRoot",
343                &integrity.previous_chain_root,
344            )?;
345            validate_hex_root(
346                "subscription integrity commitChainRoot",
347                &integrity.commit_chain_root,
348            )?;
349        }
350        if let Some(snapshots) = &subscription.snapshots {
351            for snapshot in snapshots {
352                validate_snapshot(snapshot)?;
353            }
354        }
355        for commit in &subscription.commits {
356            if commit.commit_seq < 0 {
357                return Err(ProtocolError::message(
358                    "sync commit commitSeq must be non-negative",
359                ));
360            }
361            ensure_non_empty("sync commit actorId", &commit.actor_id)?;
362            for change in &commit.changes {
363                validate_change(change)?;
364            }
365        }
366    }
367    Ok(())
368}
369
370fn validate_change(change: &SyncChange) -> Result<()> {
371    ensure_non_empty("sync change table", &change.table)?;
372    ensure_non_empty("sync change row_id", &change.row_id)?;
373    match change.op.as_str() {
374        "upsert" | "delete" => {}
375        op => {
376            return Err(ProtocolError::message(format!(
377                "unsupported sync change op: {op}"
378            )))
379        }
380    }
381    validate_stored_scopes(&change.scopes)
382}
383
384fn validate_snapshot(snapshot: &SyncSnapshot) -> Result<()> {
385    ensure_non_empty("snapshot table", &snapshot.table)?;
386    if let Some(chunks) = &snapshot.chunks {
387        for chunk in chunks {
388            validate_snapshot_chunk_ref(chunk)?;
389        }
390    }
391    if let Some(artifacts) = &snapshot.artifacts {
392        for artifact in artifacts {
393            validate_scoped_snapshot_artifact_ref(artifact)?;
394        }
395    }
396    Ok(())
397}
398
399fn validate_snapshot_chunk_ref(chunk: &SnapshotChunkRef) -> Result<()> {
400    ensure_non_empty("snapshot chunk id", &chunk.id)?;
401    if chunk.byte_length < 0 {
402        return Err(ProtocolError::message(
403            "snapshot chunk byteLength must be non-negative",
404        ));
405    }
406    validate_snapshot_chunk_format(chunk)?;
407    decode_snapshot_chunk_sha256(chunk)?;
408    Ok(())
409}
410
411fn validate_realtime_sync_data(value: &Value) -> Result<()> {
412    let data = value
413        .as_object()
414        .ok_or_else(|| ProtocolError::message("realtime sync data must be an object"))?;
415    if let Some(cursor) = data.get("cursor") {
416        if cursor.as_i64().is_none_or(|cursor| cursor < 0) {
417            return Err(ProtocolError::message(
418                "realtime sync cursor must be a non-negative integer",
419            ));
420        }
421    }
422    if let Some(dropped_count) = data.get("droppedCount") {
423        if dropped_count
424            .as_i64()
425            .is_none_or(|dropped_count| dropped_count < 0)
426        {
427            return Err(ProtocolError::message(
428                "realtime sync droppedCount must be a non-negative integer",
429            ));
430        }
431    }
432    if let Some(encoding) = data.get("syncPackEncoding").and_then(Value::as_str) {
433        if encoding != SYNC_PACK_ENCODING_BINARY_V1 {
434            return Err(ProtocolError::message(format!(
435                "unsupported realtime sync pack encoding: {encoding}"
436            )));
437        }
438    }
439    Ok(())
440}
441
442fn validate_request_scopes(scopes: &ScopeValues) -> Result<()> {
443    for (key, value) in scopes {
444        ensure_non_empty("scope key", key)?;
445        match value {
446            Value::String(_) => {}
447            Value::Array(values) if values.iter().all(Value::is_string) => {}
448            _ => {
449                return Err(ProtocolError::message(format!(
450                    "scope {key} must be a string or string array"
451                )))
452            }
453        }
454    }
455    Ok(())
456}
457
458fn validate_stored_scopes(scopes: &ScopeValues) -> Result<()> {
459    for (key, value) in scopes {
460        ensure_non_empty("stored scope key", key)?;
461        if !value.is_string() {
462            return Err(ProtocolError::message(format!(
463                "stored scope {key} must be a string"
464            )));
465        }
466    }
467    Ok(())
468}
469
470fn ensure_value_string(label: &str, value: Option<&Value>) -> Result<()> {
471    let value = value
472        .and_then(Value::as_str)
473        .ok_or_else(|| ProtocolError::message(format!("{label} must be a string")))?;
474    ensure_non_empty(label, value)
475}
476
477fn validate_hex_root(label: &str, value: &str) -> Result<()> {
478    if value.len() != crate::COMMIT_INTEGRITY_HEX_LENGTH
479        || !value.bytes().all(|byte| byte.is_ascii_hexdigit())
480    {
481        return Err(ProtocolError::message(format!(
482            "{label} must be a 64-character hex root"
483        )));
484    }
485    Ok(())
486}
487
488fn ensure_non_empty(label: &str, value: &str) -> Result<()> {
489    if value.is_empty() {
490        return Err(ProtocolError::message(format!("{label} must not be empty")));
491    }
492    Ok(())
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use crate::{
499        binary_sync_pack::decode_binary_sync_pack, validate_blob_ref, BlobRef,
500        RealtimePresenceRequest, RealtimePushRequest,
501    };
502    use serde::Deserialize;
503    use serde_json::Value;
504
505    #[derive(Deserialize)]
506    #[serde(rename_all = "camelCase")]
507    struct RelayProtocolBoundaryFixture {
508        combined: CombinedFixture,
509        binary_sync_pack: BinarySyncPackFixture,
510        blob: BlobFixture,
511        realtime: RealtimeFixture,
512    }
513
514    #[derive(Deserialize)]
515    struct CombinedFixture {
516        request: CombinedRequest,
517        response: CombinedResponse,
518    }
519
520    #[derive(Deserialize)]
521    #[serde(rename_all = "camelCase")]
522    struct BinarySyncPackFixture {
523        encoded_hex: String,
524        decoded_response: CombinedResponse,
525    }
526
527    #[derive(Deserialize)]
528    #[serde(rename_all = "camelCase")]
529    struct BlobFixture {
530        r#ref: BlobRef,
531    }
532
533    #[derive(Deserialize)]
534    #[serde(rename_all = "camelCase")]
535    struct RealtimeFixture {
536        push_request: RealtimePushRequest,
537        presence_request: RealtimePresenceRequest,
538        server_sync_message: RealtimeServerMessage,
539        server_presence_message: RealtimeServerMessage,
540        server_push_response_message: RealtimeServerMessage,
541        binary_sync_pack_hex: String,
542    }
543
544    #[derive(Deserialize)]
545    #[serde(rename_all = "camelCase")]
546    struct RustCanonicalFixture {
547        combined_request: Value,
548        realtime_push_request: Value,
549        realtime_presence_request: Value,
550        blob_ref: Value,
551    }
552
553    #[test]
554    fn validates_relay_protocol_boundary_fixture() {
555        let fixture: RelayProtocolBoundaryFixture = serde_json::from_str(include_str!(
556            "../../runtime/tests/fixtures/relay-protocol-boundary-v1.json"
557        ))
558        .expect("relay boundary fixture");
559
560        validate_combined_request(&fixture.combined.request).expect("combined request");
561        validate_combined_response(&fixture.combined.response).expect("combined response");
562        validate_combined_response(&fixture.binary_sync_pack.decoded_response)
563            .expect("binary decoded response fixture");
564
565        let encoded = hex::decode(fixture.binary_sync_pack.encoded_hex).expect("binary hex");
566        let decoded = decode_binary_sync_pack(&encoded).expect("decode binary sync pack");
567        validate_combined_response(&decoded).expect("decoded binary response");
568
569        validate_blob_ref(&fixture.blob.r#ref).expect("blob ref");
570        validate_realtime_push_request(&fixture.realtime.push_request).expect("push request");
571        validate_realtime_presence_request(&fixture.realtime.presence_request)
572            .expect("presence request");
573        validate_realtime_server_message(&fixture.realtime.server_sync_message)
574            .expect("sync message");
575        validate_realtime_server_message(&fixture.realtime.server_presence_message)
576            .expect("presence message");
577        validate_realtime_server_message(&fixture.realtime.server_push_response_message)
578            .expect("push response message");
579
580        let realtime_pack =
581            hex::decode(fixture.realtime.binary_sync_pack_hex).expect("realtime sync pack hex");
582        let decoded_realtime_pack =
583            decode_binary_sync_pack(&realtime_pack).expect("decode realtime sync pack");
584        validate_combined_response(&decoded_realtime_pack).expect("realtime sync pack response");
585    }
586
587    #[test]
588    fn rejects_stale_binary_sync_pack_versions_for_relay_boundary() {
589        let fixture: Value = serde_json::from_str(include_str!(
590            "../../runtime/tests/fixtures/relay-protocol-boundary-v1.json"
591        ))
592        .expect("relay boundary fixture");
593        let mut encoded = hex::decode(
594            fixture["binarySyncPack"]["encodedHex"]
595                .as_str()
596                .expect("encoded hex"),
597        )
598        .expect("hex");
599        encoded[4..6].copy_from_slice(&10u16.to_le_bytes());
600
601        let error = decode_binary_sync_pack(&encoded).expect_err("old version rejects");
602        assert!(
603            error
604                .to_string()
605                .contains("unsupported binary sync pack version: 10"),
606            "{error}"
607        );
608    }
609
610    #[test]
611    fn keeps_rust_canonical_relay_examples_stable() {
612        let fixture: RustCanonicalFixture = serde_json::from_str(include_str!(
613            "../../runtime/tests/fixtures/rust-relay-protocol-canonical-v1.json"
614        ))
615        .expect("rust canonical relay fixture");
616        let operation = crate::SyncOperation {
617            table: "tasks".to_string(),
618            row_id: "rust-relay-task-1".to_string(),
619            op: "upsert".to_string(),
620            payload: Some(serde_json::json!({
621                "id": "rust-relay-task-1",
622                "title": "Rust relay canonical"
623            })),
624            base_version: None,
625        };
626        let commit = PushCommitRequest {
627            client_commit_id: "rust-relay-commit-1".to_string(),
628            operations: vec![operation.clone()],
629            schema_version: 7,
630            auth_lease: None,
631        };
632        let combined = CombinedRequest {
633            client_id: "rust-relay-client-1".to_string(),
634            push: Some(PushBatchRequest {
635                commits: vec![commit.clone()],
636            }),
637            pull: None,
638        };
639        let realtime_push =
640            RealtimePushRequest::from_commit("rust-relay-request-1", commit.clone());
641        let realtime_presence = RealtimePresenceRequest::new(
642            "join",
643            "project:rust-relay",
644            Some(serde_json::json!({"relayId": "rust-relay-1"})),
645        );
646        let blob = BlobRef {
647            hash: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
648                .to_string(),
649            size: 17,
650            mime_type: "text/plain".to_string(),
651            encrypted: true,
652            key_id: Some("rust-relay-key-1".to_string()),
653        };
654
655        validate_combined_request(&combined).expect("combined request");
656        validate_realtime_push_request(&realtime_push).expect("realtime push");
657        validate_realtime_presence_request(&realtime_presence).expect("presence");
658        validate_blob_ref(&blob).expect("blob ref");
659        assert_eq!(
660            serde_json::to_value(combined).expect("combined json"),
661            fixture.combined_request
662        );
663        assert_eq!(
664            serde_json::to_value(realtime_push).expect("push json"),
665            fixture.realtime_push_request
666        );
667        assert_eq!(
668            serde_json::to_value(realtime_presence).expect("presence json"),
669            fixture.realtime_presence_request
670        );
671        assert_eq!(
672            serde_json::to_value(blob).expect("blob json"),
673            fixture.blob_ref
674        );
675    }
676
677    #[test]
678    fn rejects_invalid_relay_protocol_shapes() {
679        let request = CombinedRequest {
680            client_id: "relay-client".to_string(),
681            push: None,
682            pull: None,
683        };
684        let error = validate_combined_request(&request).expect_err("invalid request");
685        assert!(
686            error
687                .to_string()
688                .contains("combined request must include push or pull"),
689            "{error}"
690        );
691    }
692}