1use crate::{
2 decode_snapshot_chunk_sha256, validate_pull_snapshot_manifests,
3 validate_scoped_snapshot_artifact_ref, validate_snapshot_chunk_format, AuthLeaseProvenance,
4 CombinedRequest, CombinedResponse, OperationResult, ProtocolError, PullRequest, PullResponse,
5 PushBatchRequest, PushBatchResponse, PushCommitRequest, PushCommitResponse,
6 RealtimePresenceRequest, RealtimePushRequest, RealtimeServerMessage, Result, ScopeValues,
7 SnapshotChunkRef, SyncChange, SyncOperation, SyncSnapshot, SYNC_PACK_ENCODING_BINARY_V1,
8};
9use serde_json::Value;
10
11pub fn validate_combined_request(request: &CombinedRequest) -> Result<()> {
12 ensure_non_empty("clientId", &request.client_id)?;
13 if let Some(push) = &request.push {
14 validate_push_batch_request(push)?;
15 }
16 if let Some(pull) = &request.pull {
17 validate_pull_request(pull)?;
18 }
19 if request.push.is_none() && request.pull.is_none() {
20 return Err(ProtocolError::message(
21 "combined request must include push or pull",
22 ));
23 }
24 Ok(())
25}
26
27pub fn validate_combined_response(response: &CombinedResponse) -> Result<()> {
28 if !response.ok {
29 return Err(ProtocolError::message("combined response ok must be true"));
30 }
31 if response
32 .required_schema_version
33 .is_some_and(|value| value < 1)
34 {
35 return Err(ProtocolError::message(
36 "combined response requiredSchemaVersion must be positive",
37 ));
38 }
39 if response
40 .latest_schema_version
41 .is_some_and(|value| value < 1)
42 {
43 return Err(ProtocolError::message(
44 "combined response latestSchemaVersion must be positive",
45 ));
46 }
47 if let Some(push) = &response.push {
48 validate_push_batch_response(push)?;
49 }
50 if let Some(pull) = &response.pull {
51 validate_pull_response(pull)?;
52 }
53 Ok(())
54}
55
56pub fn validate_realtime_push_request(request: &RealtimePushRequest) -> Result<()> {
57 if request.message_type != crate::REALTIME_CLIENT_MESSAGE_PUSH {
58 return Err(ProtocolError::message(format!(
59 "realtime push type must be {}, got {}",
60 crate::REALTIME_CLIENT_MESSAGE_PUSH,
61 request.message_type
62 )));
63 }
64 ensure_non_empty("realtime push requestId", &request.request_id)?;
65 validate_push_commit_request(&PushCommitRequest {
66 client_commit_id: request.client_commit_id.clone(),
67 operations: request.operations.clone(),
68 schema_version: request.schema_version,
69 auth_lease: request.auth_lease.clone(),
70 })
71}
72
73pub fn validate_realtime_presence_request(request: &RealtimePresenceRequest) -> Result<()> {
74 if request.message_type != crate::REALTIME_CLIENT_MESSAGE_PRESENCE {
75 return Err(ProtocolError::message(format!(
76 "realtime presence type must be {}, got {}",
77 crate::REALTIME_CLIENT_MESSAGE_PRESENCE,
78 request.message_type
79 )));
80 }
81 ensure_non_empty("realtime presence action", &request.action)?;
82 ensure_non_empty("realtime presence scopeKey", &request.scope_key)
83}
84
85pub fn validate_realtime_server_message(message: &RealtimeServerMessage) -> Result<()> {
86 match message.event.as_str() {
87 crate::REALTIME_SERVER_EVENT_SYNC => validate_realtime_sync_data(&message.data),
88 crate::REALTIME_SERVER_EVENT_PRESENCE => {
89 if crate::realtime_presence_event_from_value(&serde_json::json!({
90 "event": message.event,
91 "data": message.data
92 }))
93 .is_none()
94 {
95 return Err(ProtocolError::message(
96 "realtime presence message is missing presence data",
97 ));
98 }
99 Ok(())
100 }
101 crate::REALTIME_SERVER_EVENT_PUSH_RESPONSE => {
102 let data = message
103 .data
104 .as_object()
105 .ok_or_else(|| ProtocolError::message("push-response data must be an object"))?;
106 ensure_value_string("push-response requestId", data.get("requestId"))?;
107 if let Some(results) = data.get("results") {
108 let results = results.as_array().ok_or_else(|| {
109 ProtocolError::message("push-response results must be an array")
110 })?;
111 for result in results {
112 let result: OperationResult = serde_json::from_value(result.clone())?;
113 validate_operation_result(&result)?;
114 }
115 }
116 Ok(())
117 }
118 "hello" | "heartbeat" | "error" => Ok(()),
119 event => Err(ProtocolError::message(format!(
120 "unsupported realtime server event: {event}"
121 ))),
122 }
123}
124
125fn validate_push_batch_request(push: &PushBatchRequest) -> Result<()> {
126 if push.commits.is_empty() {
127 return Err(ProtocolError::message(
128 "push request must include at least one commit",
129 ));
130 }
131 for commit in &push.commits {
132 validate_push_commit_request(commit)?;
133 }
134 Ok(())
135}
136
137fn validate_push_commit_request(commit: &PushCommitRequest) -> Result<()> {
138 ensure_non_empty("clientCommitId", &commit.client_commit_id)?;
139 if commit.schema_version < 1 {
140 return Err(ProtocolError::message(
141 "push commit schemaVersion must be positive",
142 ));
143 }
144 if commit.operations.is_empty() {
145 return Err(ProtocolError::message(
146 "push commit must include at least one operation",
147 ));
148 }
149 for operation in &commit.operations {
150 validate_operation(operation)?;
151 }
152 if let Some(auth_lease) = &commit.auth_lease {
153 validate_auth_lease_provenance(auth_lease)?;
154 }
155 Ok(())
156}
157
158fn validate_operation(operation: &SyncOperation) -> Result<()> {
159 ensure_non_empty("operation table", &operation.table)?;
160 ensure_non_empty("operation row_id", &operation.row_id)?;
161 match operation.op.as_str() {
162 "upsert" | "delete" => Ok(()),
163 op => Err(ProtocolError::message(format!(
164 "unsupported operation op: {op}"
165 ))),
166 }
167}
168
169fn validate_auth_lease_provenance(auth_lease: &AuthLeaseProvenance) -> Result<()> {
170 ensure_non_empty("authLease leaseId", &auth_lease.lease_id)?;
171 if auth_lease.lease_expires_at_ms < 0 {
172 return Err(ProtocolError::message(
173 "authLease leaseExpiresAtMs must be non-negative",
174 ));
175 }
176 ensure_non_empty(
177 "authLease leaseStatusAtEnqueue",
178 &auth_lease.lease_status_at_enqueue,
179 )?;
180 if auth_lease.lease_token.as_deref().is_some_and(str::is_empty) {
181 return Err(ProtocolError::message(
182 "authLease leaseToken must not be empty",
183 ));
184 }
185 Ok(())
186}
187
188fn validate_pull_request(pull: &PullRequest) -> Result<()> {
189 if pull.schema_version < 1 {
190 return Err(ProtocolError::message(
191 "pull request schemaVersion must be positive",
192 ));
193 }
194 if pull.limit_commits < 1 {
195 return Err(ProtocolError::message(
196 "pull request limitCommits must be positive",
197 ));
198 }
199 if pull.limit_snapshot_rows < 1 {
200 return Err(ProtocolError::message(
201 "pull request limitSnapshotRows must be positive",
202 ));
203 }
204 if pull.max_snapshot_pages < 1 {
205 return Err(ProtocolError::message(
206 "pull request maxSnapshotPages must be positive",
207 ));
208 }
209 if let Some(artifacts) = &pull.snapshot_artifacts {
210 if artifacts.artifact_kinds.is_empty() {
211 return Err(ProtocolError::message(
212 "snapshotArtifacts artifactKinds must not be empty",
213 ));
214 }
215 for artifact_kind in &artifacts.artifact_kinds {
216 if artifact_kind != crate::SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1 {
217 return Err(ProtocolError::message(format!(
218 "unsupported snapshot artifact kind: {artifact_kind}"
219 )));
220 }
221 }
222 for compression in &artifacts.compressions {
223 if compression != crate::SNAPSHOT_ARTIFACT_COMPRESSION_NONE
224 && compression != crate::SNAPSHOT_CHUNK_COMPRESSION_GZIP
225 {
226 return Err(ProtocolError::message(format!(
227 "unsupported snapshot artifact compression: {compression}"
228 )));
229 }
230 }
231 }
232 for subscription in &pull.subscriptions {
233 ensure_non_empty("subscription id", &subscription.id)?;
234 ensure_non_empty("subscription table", &subscription.table)?;
235 validate_request_scopes(&subscription.scopes)?;
236 if subscription.cursor < 0 {
237 return Err(ProtocolError::message(
238 "subscription cursor must be non-negative",
239 ));
240 }
241 if subscription
242 .verified_root
243 .as_deref()
244 .is_some_and(|root| validate_hex_root("subscription verifiedRoot", root).is_err())
245 {
246 return Err(ProtocolError::message(
247 "subscription verifiedRoot must be a 64-character hex root",
248 ));
249 }
250 }
251 Ok(())
252}
253
254fn validate_push_batch_response(push: &PushBatchResponse) -> Result<()> {
255 if !push.ok {
256 return Err(ProtocolError::message("push response ok must be true"));
257 }
258 for commit in &push.commits {
259 validate_push_commit_response(commit)?;
260 }
261 Ok(())
262}
263
264fn validate_push_commit_response(commit: &PushCommitResponse) -> Result<()> {
265 ensure_non_empty("push response clientCommitId", &commit.client_commit_id)?;
266 match commit.status.as_str() {
267 "applied" | "cached" | "rejected" => {}
268 status => {
269 return Err(ProtocolError::message(format!(
270 "unsupported push response status: {status}"
271 )))
272 }
273 }
274 for result in &commit.results {
275 validate_operation_result(result)?;
276 }
277 Ok(())
278}
279
280fn validate_operation_result(result: &OperationResult) -> Result<()> {
281 if result.op_index < 0 {
282 return Err(ProtocolError::message(
283 "operation result opIndex must be non-negative",
284 ));
285 }
286 match result.status.as_str() {
287 "applied" => Ok(()),
288 "conflict" => {
289 if result.message.as_deref().unwrap_or("").is_empty() {
290 return Err(ProtocolError::message(
291 "conflict operation result must include message",
292 ));
293 }
294 if result.server_version.is_none() {
295 return Err(ProtocolError::message(
296 "conflict operation result must include server_version",
297 ));
298 }
299 Ok(())
300 }
301 "error" => {
302 if result.error.as_deref().unwrap_or("").is_empty() {
303 return Err(ProtocolError::message(
304 "error operation result must include error",
305 ));
306 }
307 Ok(())
308 }
309 status => Err(ProtocolError::message(format!(
310 "unsupported operation result status: {status}"
311 ))),
312 }
313}
314
315fn validate_pull_response(pull: &PullResponse) -> Result<()> {
316 if !pull.ok {
317 return Err(ProtocolError::message("pull response ok must be true"));
318 }
319 validate_pull_snapshot_manifests(pull)?;
320 for subscription in &pull.subscriptions {
321 ensure_non_empty("pull subscription id", &subscription.id)?;
322 match subscription.status.as_str() {
323 "active" | "revoked" => {}
324 status => {
325 return Err(ProtocolError::message(format!(
326 "unsupported pull subscription status: {status}"
327 )))
328 }
329 }
330 validate_request_scopes(&subscription.scopes)?;
331 if subscription.next_cursor < 0 {
332 return Err(ProtocolError::message(
333 "pull subscription nextCursor must be non-negative",
334 ));
335 }
336 if let Some(integrity) = &subscription.integrity {
337 ensure_non_empty(
338 "subscription integrity partitionId",
339 &integrity.partition_id,
340 )?;
341 validate_hex_root(
342 "subscription integrity previousChainRoot",
343 &integrity.previous_chain_root,
344 )?;
345 validate_hex_root(
346 "subscription integrity commitChainRoot",
347 &integrity.commit_chain_root,
348 )?;
349 }
350 if let Some(snapshots) = &subscription.snapshots {
351 for snapshot in snapshots {
352 validate_snapshot(snapshot)?;
353 }
354 }
355 for commit in &subscription.commits {
356 if commit.commit_seq < 0 {
357 return Err(ProtocolError::message(
358 "sync commit commitSeq must be non-negative",
359 ));
360 }
361 ensure_non_empty("sync commit actorId", &commit.actor_id)?;
362 for change in &commit.changes {
363 validate_change(change)?;
364 }
365 }
366 }
367 Ok(())
368}
369
370fn validate_change(change: &SyncChange) -> Result<()> {
371 ensure_non_empty("sync change table", &change.table)?;
372 ensure_non_empty("sync change row_id", &change.row_id)?;
373 match change.op.as_str() {
374 "upsert" | "delete" => {}
375 op => {
376 return Err(ProtocolError::message(format!(
377 "unsupported sync change op: {op}"
378 )))
379 }
380 }
381 validate_stored_scopes(&change.scopes)
382}
383
384fn validate_snapshot(snapshot: &SyncSnapshot) -> Result<()> {
385 ensure_non_empty("snapshot table", &snapshot.table)?;
386 if let Some(chunks) = &snapshot.chunks {
387 for chunk in chunks {
388 validate_snapshot_chunk_ref(chunk)?;
389 }
390 }
391 if let Some(artifacts) = &snapshot.artifacts {
392 for artifact in artifacts {
393 validate_scoped_snapshot_artifact_ref(artifact)?;
394 }
395 }
396 Ok(())
397}
398
399fn validate_snapshot_chunk_ref(chunk: &SnapshotChunkRef) -> Result<()> {
400 ensure_non_empty("snapshot chunk id", &chunk.id)?;
401 if chunk.byte_length < 0 {
402 return Err(ProtocolError::message(
403 "snapshot chunk byteLength must be non-negative",
404 ));
405 }
406 validate_snapshot_chunk_format(chunk)?;
407 decode_snapshot_chunk_sha256(chunk)?;
408 Ok(())
409}
410
411fn validate_realtime_sync_data(value: &Value) -> Result<()> {
412 let data = value
413 .as_object()
414 .ok_or_else(|| ProtocolError::message("realtime sync data must be an object"))?;
415 if let Some(cursor) = data.get("cursor") {
416 if cursor.as_i64().is_none_or(|cursor| cursor < 0) {
417 return Err(ProtocolError::message(
418 "realtime sync cursor must be a non-negative integer",
419 ));
420 }
421 }
422 if let Some(dropped_count) = data.get("droppedCount") {
423 if dropped_count
424 .as_i64()
425 .is_none_or(|dropped_count| dropped_count < 0)
426 {
427 return Err(ProtocolError::message(
428 "realtime sync droppedCount must be a non-negative integer",
429 ));
430 }
431 }
432 if let Some(encoding) = data.get("syncPackEncoding").and_then(Value::as_str) {
433 if encoding != SYNC_PACK_ENCODING_BINARY_V1 {
434 return Err(ProtocolError::message(format!(
435 "unsupported realtime sync pack encoding: {encoding}"
436 )));
437 }
438 }
439 Ok(())
440}
441
442fn validate_request_scopes(scopes: &ScopeValues) -> Result<()> {
443 for (key, value) in scopes {
444 ensure_non_empty("scope key", key)?;
445 match value {
446 Value::String(_) => {}
447 Value::Array(values) if values.iter().all(Value::is_string) => {}
448 _ => {
449 return Err(ProtocolError::message(format!(
450 "scope {key} must be a string or string array"
451 )))
452 }
453 }
454 }
455 Ok(())
456}
457
458fn validate_stored_scopes(scopes: &ScopeValues) -> Result<()> {
459 for (key, value) in scopes {
460 ensure_non_empty("stored scope key", key)?;
461 if !value.is_string() {
462 return Err(ProtocolError::message(format!(
463 "stored scope {key} must be a string"
464 )));
465 }
466 }
467 Ok(())
468}
469
470fn ensure_value_string(label: &str, value: Option<&Value>) -> Result<()> {
471 let value = value
472 .and_then(Value::as_str)
473 .ok_or_else(|| ProtocolError::message(format!("{label} must be a string")))?;
474 ensure_non_empty(label, value)
475}
476
477fn validate_hex_root(label: &str, value: &str) -> Result<()> {
478 if value.len() != crate::COMMIT_INTEGRITY_HEX_LENGTH
479 || !value.bytes().all(|byte| byte.is_ascii_hexdigit())
480 {
481 return Err(ProtocolError::message(format!(
482 "{label} must be a 64-character hex root"
483 )));
484 }
485 Ok(())
486}
487
488fn ensure_non_empty(label: &str, value: &str) -> Result<()> {
489 if value.is_empty() {
490 return Err(ProtocolError::message(format!("{label} must not be empty")));
491 }
492 Ok(())
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498 use crate::{
499 binary_sync_pack::decode_binary_sync_pack, validate_blob_ref, BlobRef,
500 RealtimePresenceRequest, RealtimePushRequest,
501 };
502 use serde::Deserialize;
503 use serde_json::Value;
504
505 #[derive(Deserialize)]
506 #[serde(rename_all = "camelCase")]
507 struct RelayProtocolBoundaryFixture {
508 combined: CombinedFixture,
509 binary_sync_pack: BinarySyncPackFixture,
510 blob: BlobFixture,
511 realtime: RealtimeFixture,
512 }
513
514 #[derive(Deserialize)]
515 struct CombinedFixture {
516 request: CombinedRequest,
517 response: CombinedResponse,
518 }
519
520 #[derive(Deserialize)]
521 #[serde(rename_all = "camelCase")]
522 struct BinarySyncPackFixture {
523 encoded_hex: String,
524 decoded_response: CombinedResponse,
525 }
526
527 #[derive(Deserialize)]
528 #[serde(rename_all = "camelCase")]
529 struct BlobFixture {
530 r#ref: BlobRef,
531 }
532
533 #[derive(Deserialize)]
534 #[serde(rename_all = "camelCase")]
535 struct RealtimeFixture {
536 push_request: RealtimePushRequest,
537 presence_request: RealtimePresenceRequest,
538 server_sync_message: RealtimeServerMessage,
539 server_presence_message: RealtimeServerMessage,
540 server_push_response_message: RealtimeServerMessage,
541 binary_sync_pack_hex: String,
542 }
543
544 #[derive(Deserialize)]
545 #[serde(rename_all = "camelCase")]
546 struct RustCanonicalFixture {
547 combined_request: Value,
548 realtime_push_request: Value,
549 realtime_presence_request: Value,
550 blob_ref: Value,
551 }
552
553 #[test]
554 fn validates_relay_protocol_boundary_fixture() {
555 let fixture: RelayProtocolBoundaryFixture = serde_json::from_str(include_str!(
556 "../../runtime/tests/fixtures/relay-protocol-boundary-v1.json"
557 ))
558 .expect("relay boundary fixture");
559
560 validate_combined_request(&fixture.combined.request).expect("combined request");
561 validate_combined_response(&fixture.combined.response).expect("combined response");
562 validate_combined_response(&fixture.binary_sync_pack.decoded_response)
563 .expect("binary decoded response fixture");
564
565 let encoded = hex::decode(fixture.binary_sync_pack.encoded_hex).expect("binary hex");
566 let decoded = decode_binary_sync_pack(&encoded).expect("decode binary sync pack");
567 validate_combined_response(&decoded).expect("decoded binary response");
568
569 validate_blob_ref(&fixture.blob.r#ref).expect("blob ref");
570 validate_realtime_push_request(&fixture.realtime.push_request).expect("push request");
571 validate_realtime_presence_request(&fixture.realtime.presence_request)
572 .expect("presence request");
573 validate_realtime_server_message(&fixture.realtime.server_sync_message)
574 .expect("sync message");
575 validate_realtime_server_message(&fixture.realtime.server_presence_message)
576 .expect("presence message");
577 validate_realtime_server_message(&fixture.realtime.server_push_response_message)
578 .expect("push response message");
579
580 let realtime_pack =
581 hex::decode(fixture.realtime.binary_sync_pack_hex).expect("realtime sync pack hex");
582 let decoded_realtime_pack =
583 decode_binary_sync_pack(&realtime_pack).expect("decode realtime sync pack");
584 validate_combined_response(&decoded_realtime_pack).expect("realtime sync pack response");
585 }
586
587 #[test]
588 fn rejects_stale_binary_sync_pack_versions_for_relay_boundary() {
589 let fixture: Value = serde_json::from_str(include_str!(
590 "../../runtime/tests/fixtures/relay-protocol-boundary-v1.json"
591 ))
592 .expect("relay boundary fixture");
593 let mut encoded = hex::decode(
594 fixture["binarySyncPack"]["encodedHex"]
595 .as_str()
596 .expect("encoded hex"),
597 )
598 .expect("hex");
599 encoded[4..6].copy_from_slice(&10u16.to_le_bytes());
600
601 let error = decode_binary_sync_pack(&encoded).expect_err("old version rejects");
602 assert!(
603 error
604 .to_string()
605 .contains("unsupported binary sync pack version: 10"),
606 "{error}"
607 );
608 }
609
610 #[test]
611 fn keeps_rust_canonical_relay_examples_stable() {
612 let fixture: RustCanonicalFixture = serde_json::from_str(include_str!(
613 "../../runtime/tests/fixtures/rust-relay-protocol-canonical-v1.json"
614 ))
615 .expect("rust canonical relay fixture");
616 let operation = crate::SyncOperation {
617 table: "tasks".to_string(),
618 row_id: "rust-relay-task-1".to_string(),
619 op: "upsert".to_string(),
620 payload: Some(serde_json::json!({
621 "id": "rust-relay-task-1",
622 "title": "Rust relay canonical"
623 })),
624 base_version: None,
625 };
626 let commit = PushCommitRequest {
627 client_commit_id: "rust-relay-commit-1".to_string(),
628 operations: vec![operation.clone()],
629 schema_version: 7,
630 auth_lease: None,
631 };
632 let combined = CombinedRequest {
633 client_id: "rust-relay-client-1".to_string(),
634 push: Some(PushBatchRequest {
635 commits: vec![commit.clone()],
636 }),
637 pull: None,
638 };
639 let realtime_push =
640 RealtimePushRequest::from_commit("rust-relay-request-1", commit.clone());
641 let realtime_presence = RealtimePresenceRequest::new(
642 "join",
643 "project:rust-relay",
644 Some(serde_json::json!({"relayId": "rust-relay-1"})),
645 );
646 let blob = BlobRef {
647 hash: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
648 .to_string(),
649 size: 17,
650 mime_type: "text/plain".to_string(),
651 encrypted: true,
652 key_id: Some("rust-relay-key-1".to_string()),
653 };
654
655 validate_combined_request(&combined).expect("combined request");
656 validate_realtime_push_request(&realtime_push).expect("realtime push");
657 validate_realtime_presence_request(&realtime_presence).expect("presence");
658 validate_blob_ref(&blob).expect("blob ref");
659 assert_eq!(
660 serde_json::to_value(combined).expect("combined json"),
661 fixture.combined_request
662 );
663 assert_eq!(
664 serde_json::to_value(realtime_push).expect("push json"),
665 fixture.realtime_push_request
666 );
667 assert_eq!(
668 serde_json::to_value(realtime_presence).expect("presence json"),
669 fixture.realtime_presence_request
670 );
671 assert_eq!(
672 serde_json::to_value(blob).expect("blob json"),
673 fixture.blob_ref
674 );
675 }
676
677 #[test]
678 fn rejects_invalid_relay_protocol_shapes() {
679 let request = CombinedRequest {
680 client_id: "relay-client".to_string(),
681 push: None,
682 pull: None,
683 };
684 let error = validate_combined_request(&request).expect_err("invalid request");
685 assert!(
686 error
687 .to_string()
688 .contains("combined request must include push or pull"),
689 "{error}"
690 );
691 }
692}