1use crate::binary_snapshot::decode_binary_snapshot_table;
2use crate::error::{ProtocolError, Result};
3use crate::{
4 CombinedResponse, OperationResult, PullResponse, PushBatchResponse, PushCommitResponse,
5 SnapshotChunkRef, SubscriptionIntegrity, SubscriptionResponse, SyncChange, SyncCommit,
6 SyncSnapshot, BINARY_SYNC_PACK_WIRE_VERSION,
7};
8use serde_json::{Map, Value};
9
10pub use crate::{SYNC_PACK_CONTENT_TYPE, SYNC_PACK_ENCODING_BINARY_V1};
11
12const MAGIC: &[u8; 4] = b"SSP1";
13const VERSION: u16 = BINARY_SYNC_PACK_WIRE_VERSION;
14const FLAG_NONE: u16 = 0;
15
16struct PendingBinaryChangeRowRef {
17 change_index: usize,
18 table: String,
19 group_index: usize,
20 row_index: usize,
21}
22
23pub fn is_binary_sync_pack_content_type(content_type: Option<&str>) -> bool {
24 content_type
25 .and_then(|value| value.split(';').next())
26 .is_some_and(|value| value.trim() == SYNC_PACK_CONTENT_TYPE)
27}
28
29pub fn decode_binary_sync_pack(bytes: &[u8]) -> Result<CombinedResponse> {
30 let mut reader = BinarySyncPackReader::new(bytes);
31 reader.expect_magic(MAGIC, "binary sync pack")?;
32
33 let version = reader.read_u16("binary sync pack version")?;
34 if version != VERSION {
35 return Err(ProtocolError::message(format!(
36 "unsupported binary sync pack version: {version}"
37 )));
38 }
39 let flags = reader.read_u16("binary sync pack flags")?;
40 if flags != FLAG_NONE {
41 return Err(ProtocolError::message(format!(
42 "unsupported binary sync pack flags: {flags}"
43 )));
44 }
45
46 let response = CombinedResponse {
47 ok: reader.read_bool("combined response ok")?,
48 required_schema_version: reader.read_optional_i32("required schema version")?,
49 latest_schema_version: reader.read_optional_i32("latest schema version")?,
50 push: reader.read_optional_value(read_push_response)?,
51 pull: reader.read_optional_value(read_pull_response)?,
52 };
53 reader.assert_done()?;
54 Ok(response)
55}
56
57fn read_push_response(reader: &mut BinarySyncPackReader<'_>) -> Result<PushBatchResponse> {
58 Ok(PushBatchResponse {
59 ok: reader.read_bool("push response ok")?,
60 commits: reader.read_array("push commits", read_push_commit_response)?,
61 })
62}
63
64fn read_push_commit_response(reader: &mut BinarySyncPackReader<'_>) -> Result<PushCommitResponse> {
65 let _ok = reader.read_bool("push commit ok")?;
66 Ok(PushCommitResponse {
67 client_commit_id: reader.read_string32("push client commit id")?,
68 status: read_push_commit_status(reader)?,
69 commit_seq: reader.read_optional_i64("push commit seq")?,
70 results: reader.read_array("push operation results", read_operation_result)?,
71 })
72}
73
74fn read_operation_result(reader: &mut BinarySyncPackReader<'_>) -> Result<OperationResult> {
75 let op_index = reader.read_i32("operation result index")?;
76 let status = reader.read_u8("operation result status")?;
77 match status {
78 1 => Ok(OperationResult {
79 op_index,
80 status: "applied".to_string(),
81 message: None,
82 error: None,
83 code: None,
84 retriable: None,
85 server_version: None,
86 server_row: None,
87 }),
88 2 => Ok(OperationResult {
89 op_index,
90 status: "conflict".to_string(),
91 message: Some(reader.read_string32("operation result conflict message")?),
92 error: None,
93 code: reader.read_optional_string32("operation result conflict code")?,
94 retriable: None,
95 server_version: Some(reader.read_i64("operation result conflict server version")?),
96 server_row: Some(reader.read_json("operation result conflict server row")?),
97 }),
98 3 => Ok(OperationResult {
99 op_index,
100 status: "error".to_string(),
101 message: None,
102 error: Some(reader.read_string32("operation result error message")?),
103 code: reader.read_optional_string32("operation result error code")?,
104 retriable: reader.read_optional_bool("operation result error retriable")?,
105 server_version: None,
106 server_row: None,
107 }),
108 value => Err(ProtocolError::message(format!(
109 "unsupported binary sync pack operation result status byte: {value}"
110 ))),
111 }
112}
113
114fn read_push_commit_status(reader: &mut BinarySyncPackReader<'_>) -> Result<String> {
115 match reader.read_u8("push commit status")? {
116 1 => Ok("applied".to_string()),
117 2 => Ok("cached".to_string()),
118 3 => Ok("rejected".to_string()),
119 value => Err(ProtocolError::message(format!(
120 "unsupported binary sync pack push commit status byte: {value}"
121 ))),
122 }
123}
124
125fn read_pull_response(reader: &mut BinarySyncPackReader<'_>) -> Result<PullResponse> {
126 Ok(PullResponse {
127 ok: reader.read_bool("pull response ok")?,
128 subscriptions: reader.read_array("pull subscriptions", read_subscription_response)?,
129 })
130}
131
132fn read_subscription_response(
133 reader: &mut BinarySyncPackReader<'_>,
134) -> Result<SubscriptionResponse> {
135 let id = reader.read_string32("subscription id")?;
136 let status = reader.read_string16("subscription status")?;
137 let scopes = reader.read_json_map("subscription scopes")?;
138 let bootstrap = reader.read_bool("subscription bootstrap")?;
139 let bootstrap_state = reader
140 .read_optional_json("subscription bootstrap state")?
141 .map(serde_json::from_value)
142 .transpose()?;
143 let next_cursor = reader.read_i64("subscription next cursor")?;
144 let integrity = reader.read_optional_value(read_subscription_integrity)?;
145 let commits = reader.read_array("subscription commits", read_commit)?;
146 let snapshots = reader.read_optional_array("subscription snapshots", read_snapshot)?;
147 Ok(SubscriptionResponse {
148 id,
149 status,
150 scopes,
151 bootstrap,
152 bootstrap_state,
153 next_cursor,
154 integrity,
155 commits,
156 snapshots,
157 })
158}
159
160fn read_subscription_integrity(
161 reader: &mut BinarySyncPackReader<'_>,
162) -> Result<SubscriptionIntegrity> {
163 Ok(SubscriptionIntegrity {
164 partition_id: reader.read_string32("subscription integrity partitionId")?,
165 previous_chain_root: reader.read_string32("subscription integrity previous root")?,
166 commit_chain_root: reader.read_string32("subscription integrity chain root")?,
167 commit_seq: reader.read_i64("subscription integrity commit seq")?,
168 })
169}
170
171fn read_commit(reader: &mut BinarySyncPackReader<'_>) -> Result<SyncCommit> {
172 Ok(SyncCommit {
173 commit_seq: reader.read_i64("commit seq")?,
174 created_at: reader.read_string32("commit createdAt")?,
175 actor_id: reader.read_string32("commit actorId")?,
176 changes: read_changes_v8(reader)?,
177 })
178}
179
180fn read_changes_v8(reader: &mut BinarySyncPackReader<'_>) -> Result<Vec<SyncChange>> {
181 let table_names = reader.read_array("commit change table dictionary", |reader| {
182 reader.read_string16("commit change table")
183 })?;
184 let scope_values_by_index = reader.read_array("commit change scope dictionary", |reader| {
185 reader.read_string_map("commit change scopes")
186 })?;
187 let change_count = reader.read_u32("commit changes length")? as usize;
188 let mut changes = Vec::with_capacity(change_count);
189 let mut row_refs = Vec::new();
190 for change_index in 0..change_count {
191 changes.push(read_change_metadata_v8(
192 reader,
193 change_index,
194 &table_names,
195 &scope_values_by_index,
196 &mut row_refs,
197 )?);
198 }
199
200 let group_count = reader.read_u32("binary change row group count")? as usize;
201 let mut group_rows = Vec::with_capacity(group_count);
202 for _ in 0..group_count {
203 let table = table_name_at(
204 &table_names,
205 reader.read_u16("binary change row group table index")? as usize,
206 )?;
207 let payload = reader.read_bytes32("binary change row group payload")?;
208 let decoded = decode_binary_snapshot_table(&payload)?;
209 if decoded.table != table {
210 return Err(ProtocolError::message(format!(
211 "binary sync pack row group table mismatch: expected {table}, got {}",
212 decoded.table
213 )));
214 }
215 group_rows.push(decoded.rows.into_iter().map(Some).collect::<Vec<_>>());
216 }
217
218 for row_ref in row_refs {
219 let Some(rows) = group_rows.get_mut(row_ref.group_index) else {
220 return Err(ProtocolError::message(format!(
221 "binary sync pack change row ref has invalid group index: {}",
222 row_ref.group_index
223 )));
224 };
225 let Some(row) = rows.get_mut(row_ref.row_index) else {
226 return Err(ProtocolError::message(format!(
227 "binary sync pack change row ref has invalid row index: group={}, row={}",
228 row_ref.group_index, row_ref.row_index
229 )));
230 };
231 let Some(row) = row.take() else {
232 return Err(ProtocolError::message(format!(
233 "binary sync pack change row ref was already consumed: group={}, row={}",
234 row_ref.group_index, row_ref.row_index
235 )));
236 };
237 let Some(change) = changes.get_mut(row_ref.change_index) else {
238 return Err(ProtocolError::message(format!(
239 "binary sync pack change row ref has invalid change index: {}",
240 row_ref.change_index
241 )));
242 };
243 if change.table != row_ref.table {
244 return Err(ProtocolError::message(
245 "binary sync pack row ref table mismatch",
246 ));
247 }
248 change.row_json = Some(Value::Object(row));
249 }
250
251 Ok(changes)
252}
253
254fn read_change_metadata_v8(
255 reader: &mut BinarySyncPackReader<'_>,
256 change_index: usize,
257 table_names: &[String],
258 scope_values_by_index: &[Map<String, Value>],
259 row_refs: &mut Vec<PendingBinaryChangeRowRef>,
260) -> Result<SyncChange> {
261 let table = table_name_at(table_names, reader.read_u16("change table index")? as usize)?;
262 let row_id = reader.read_string32("change row id")?;
263 let op = match reader.read_u8("change op")? {
264 1 => "upsert".to_string(),
265 2 => "delete".to_string(),
266 value => {
267 return Err(ProtocolError::message(format!(
268 "unsupported binary sync pack change op byte: {value}"
269 )));
270 }
271 };
272 let row_json = match reader.read_u8("change row payload kind")? {
273 0 => None,
274 1 => Some(reader.read_json("change row json")?),
275 2 => {
276 row_refs.push(PendingBinaryChangeRowRef {
277 change_index,
278 table: table.clone(),
279 group_index: reader.read_u32("change row group index")? as usize,
280 row_index: reader.read_u32("change row group row index")? as usize,
281 });
282 None
283 }
284 value => {
285 return Err(ProtocolError::message(format!(
286 "unsupported binary sync pack change row payload kind: {value}"
287 )));
288 }
289 };
290 Ok(SyncChange {
291 table,
292 row_id,
293 op,
294 row_json,
295 row_version: reader.read_optional_i64("change row version")?,
296 scopes: scope_values_at(
297 scope_values_by_index,
298 reader.read_u32("change scopes index")? as usize,
299 )?,
300 })
301}
302
303fn table_name_at(table_names: &[String], index: usize) -> Result<String> {
304 table_names.get(index).cloned().ok_or_else(|| {
305 ProtocolError::message(format!("binary sync pack table index is invalid: {index}"))
306 })
307}
308
309fn scope_values_at(
310 scope_values_by_index: &[Map<String, Value>],
311 index: usize,
312) -> Result<Map<String, Value>> {
313 scope_values_by_index.get(index).cloned().ok_or_else(|| {
314 ProtocolError::message(format!("binary sync pack scope index is invalid: {index}"))
315 })
316}
317
318fn read_snapshot(reader: &mut BinarySyncPackReader<'_>) -> Result<SyncSnapshot> {
319 let mut snapshot = SyncSnapshot {
320 table: reader.read_string16("snapshot table")?,
321 rows: reader.read_array("snapshot rows", |reader| reader.read_json("snapshot row"))?,
322 chunks: reader.read_optional_array("snapshot chunks", read_snapshot_chunk_ref)?,
323 artifacts: None,
324 manifest: None,
325 is_first_page: reader.read_bool("snapshot first page")?,
326 is_last_page: reader.read_bool("snapshot last page")?,
327 bootstrap_state_after: reader
328 .read_optional_json("snapshot bootstrap state after")?
329 .map(serde_json::from_value)
330 .transpose()?,
331 };
332 snapshot.manifest = reader
333 .read_optional_json("snapshot manifest")?
334 .map(serde_json::from_value)
335 .transpose()?;
336 snapshot.artifacts = reader
337 .read_optional_json("snapshot artifacts")?
338 .map(serde_json::from_value)
339 .transpose()?;
340 Ok(snapshot)
341}
342
343fn read_snapshot_chunk_ref(reader: &mut BinarySyncPackReader<'_>) -> Result<SnapshotChunkRef> {
344 let id = reader.read_string32("snapshot chunk id")?;
345 let byte_length = reader.read_i64("snapshot chunk byte length")?;
346 let sha256 = reader.read_string16("snapshot chunk sha256")?;
347 let encoding = reader.read_string16("snapshot chunk encoding")?;
348 let compression = reader.read_string16("snapshot chunk compression")?;
349 Ok(SnapshotChunkRef {
350 id,
351 byte_length,
352 sha256,
353 encoding,
354 compression,
355 })
356}
357
358struct BinarySyncPackReader<'a> {
359 bytes: &'a [u8],
360 offset: usize,
361}
362
363impl<'a> BinarySyncPackReader<'a> {
364 fn new(bytes: &'a [u8]) -> Self {
365 Self { bytes, offset: 0 }
366 }
367
368 fn expect_magic(&mut self, magic: &[u8], label: &str) -> Result<()> {
369 let actual = self.read_bytes(magic.len(), &format!("{label} magic"))?;
370 if actual != magic {
371 return Err(ProtocolError::message(format!("unexpected {label} magic")));
372 }
373 Ok(())
374 }
375
376 fn read_bool(&mut self, label: &str) -> Result<bool> {
377 match self.read_u8(label)? {
378 0 => Ok(false),
379 1 => Ok(true),
380 value => Err(ProtocolError::message(format!(
381 "{label} expected boolean byte, got {value}"
382 ))),
383 }
384 }
385
386 fn read_optional_bool(&mut self, label: &str) -> Result<Option<bool>> {
387 self.read_optional_value(|reader| reader.read_bool(label))
388 }
389
390 fn read_u8(&mut self, label: &str) -> Result<u8> {
391 self.require(1, label)?;
392 let value = self.bytes[self.offset];
393 self.offset += 1;
394 Ok(value)
395 }
396
397 fn read_u16(&mut self, label: &str) -> Result<u16> {
398 let bytes = self.read_array_bytes::<2>(label)?;
399 Ok(u16::from_le_bytes(bytes))
400 }
401
402 fn read_u32(&mut self, label: &str) -> Result<u32> {
403 let bytes = self.read_array_bytes::<4>(label)?;
404 Ok(u32::from_le_bytes(bytes))
405 }
406
407 fn read_i32(&mut self, label: &str) -> Result<i32> {
408 let bytes = self.read_array_bytes::<4>(label)?;
409 Ok(i32::from_le_bytes(bytes))
410 }
411
412 fn read_optional_i32(&mut self, label: &str) -> Result<Option<i32>> {
413 self.read_optional_value(|reader| reader.read_i32(label))
414 }
415
416 fn read_i64(&mut self, label: &str) -> Result<i64> {
417 let bytes = self.read_array_bytes::<8>(label)?;
418 Ok(i64::from_le_bytes(bytes))
419 }
420
421 fn read_optional_i64(&mut self, label: &str) -> Result<Option<i64>> {
422 self.read_optional_value(|reader| reader.read_i64(label))
423 }
424
425 fn read_string16(&mut self, label: &str) -> Result<String> {
426 let length = self.read_u16(&format!("{label} length"))? as usize;
427 self.read_string(length, label)
428 }
429
430 fn read_string32(&mut self, label: &str) -> Result<String> {
431 let length = self.read_u32(&format!("{label} length"))? as usize;
432 self.read_string(length, label)
433 }
434
435 fn read_optional_string32(&mut self, label: &str) -> Result<Option<String>> {
436 self.read_optional_value(|reader| reader.read_string32(label))
437 }
438
439 fn read_bytes32(&mut self, label: &str) -> Result<Vec<u8>> {
440 let length = self.read_u32(&format!("{label} length"))? as usize;
441 Ok(self.read_bytes(length, label)?.to_vec())
442 }
443
444 fn read_json(&mut self, label: &str) -> Result<Value> {
445 Ok(serde_json::from_str(&self.read_string32(label)?)?)
446 }
447
448 fn read_optional_json(&mut self, label: &str) -> Result<Option<Value>> {
449 self.read_optional_value(|reader| reader.read_json(label))
450 }
451
452 fn read_json_map(&mut self, label: &str) -> Result<Map<String, Value>> {
453 match self.read_json(label)? {
454 Value::Object(map) => Ok(map),
455 _ => Err(ProtocolError::message(format!(
456 "{label} expected JSON object"
457 ))),
458 }
459 }
460
461 fn read_string_map(&mut self, label: &str) -> Result<Map<String, Value>> {
462 let length = self.read_u32(&format!("{label} length"))? as usize;
463 let mut map = Map::with_capacity(length);
464 for _ in 0..length {
465 let key = self.read_string16(&format!("{label} key"))?;
466 let value = self.read_string32(&format!("{label} value"))?;
467 map.insert(key, Value::String(value));
468 }
469 Ok(map)
470 }
471
472 fn read_array<T>(
473 &mut self,
474 label: &str,
475 mut read: impl FnMut(&mut Self) -> Result<T>,
476 ) -> Result<Vec<T>> {
477 let length = self.read_u32(&format!("{label} length"))? as usize;
478 let mut values = Vec::with_capacity(length);
479 for _ in 0..length {
480 values.push(read(self)?);
481 }
482 Ok(values)
483 }
484
485 fn read_optional_array<T>(
486 &mut self,
487 label: &str,
488 mut read: impl FnMut(&mut Self) -> Result<T>,
489 ) -> Result<Option<Vec<T>>> {
490 self.read_optional_value(|reader| reader.read_array(label, &mut read))
491 }
492
493 fn read_optional_value<T>(
494 &mut self,
495 read: impl FnOnce(&mut Self) -> Result<T>,
496 ) -> Result<Option<T>> {
497 match self.read_u8("optional value present")? {
498 0 => Ok(None),
499 1 => read(self).map(Some),
500 value => Err(ProtocolError::message(format!(
501 "optional value marker must be 0 or 1, got {value}"
502 ))),
503 }
504 }
505
506 fn read_string(&mut self, length: usize, label: &str) -> Result<String> {
507 String::from_utf8(self.read_bytes(length, label)?.to_vec())
508 .map_err(|err| ProtocolError::message(format!("{label} is not valid UTF-8: {err}")))
509 }
510
511 fn read_array_bytes<const N: usize>(&mut self, label: &str) -> Result<[u8; N]> {
512 let mut out = [0u8; N];
513 out.copy_from_slice(self.read_bytes(N, label)?);
514 Ok(out)
515 }
516
517 fn read_bytes(&mut self, length: usize, label: &str) -> Result<&'a [u8]> {
518 self.require(length, label)?;
519 let value = &self.bytes[self.offset..self.offset + length];
520 self.offset += length;
521 Ok(value)
522 }
523
524 fn assert_done(&self) -> Result<()> {
525 if self.offset != self.bytes.len() {
526 return Err(ProtocolError::message(
527 "binary sync pack has trailing bytes",
528 ));
529 }
530 Ok(())
531 }
532
533 fn require(&self, length: usize, label: &str) -> Result<()> {
534 if self.offset + length > self.bytes.len() {
535 return Err(ProtocolError::message(format!(
536 "{label} exceeds binary sync pack bounds"
537 )));
538 }
539 Ok(())
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use super::{
546 decode_binary_sync_pack, is_binary_sync_pack_content_type, SYNC_PACK_CONTENT_TYPE,
547 };
548
549 #[test]
550 fn decodes_current_typescript_fixture() {
551 let fixture: serde_json::Value = serde_json::from_str(include_str!(
552 "../../runtime/tests/fixtures/binary-sync-pack-v1-combined-response.json"
553 ))
554 .expect("fixture json");
555 assert_eq!(
556 fixture["contentType"].as_str(),
557 Some(SYNC_PACK_CONTENT_TYPE)
558 );
559 assert!(is_binary_sync_pack_content_type(Some(
560 "application/vnd.syncular.sync-pack.v1; charset=binary"
561 )));
562
563 let encoded = hex::decode(
564 fixture["encodedHex"]
565 .as_str()
566 .expect("fixture encodedHex string"),
567 )
568 .expect("fixture encoded hex");
569 let response = decode_binary_sync_pack(&encoded).expect("decode current fixture");
570 assert_eq!(response.required_schema_version, Some(2));
571 assert_eq!(response.latest_schema_version, Some(3));
572 let push = response.push.as_ref().expect("push response");
573 assert_eq!(push.commits[0].client_commit_id, "fixture-local-1");
574 assert_eq!(push.commits[1].status, "rejected");
575 assert_eq!(push.commits[1].results[0].server_version, Some(7));
576 let pull = response.pull.unwrap();
577 let subscription = &pull.subscriptions[0];
578 assert_eq!(
579 subscription
580 .integrity
581 .as_ref()
582 .map(|integrity| integrity.commit_seq),
583 Some(42)
584 );
585 let change = &pull.subscriptions[0].commits[0].changes[0];
586 assert_eq!(change.table, "tasks");
587 assert_eq!(change.row_id, "task-1");
588 assert_eq!(
589 change.row_json.as_ref().unwrap()["title"].as_str(),
590 Some("Remote")
591 );
592 assert_eq!(
593 subscription.snapshots.as_ref().unwrap()[0]
594 .manifest
595 .as_ref()
596 .map(|manifest| manifest.digest.as_str()),
597 Some("28906bb034df33f281391be2cc697cdf669646f5e2158f07b6b9a35277cc4b6b")
598 );
599 }
600
601 #[test]
602 fn rejects_old_binary_sync_pack_versions() {
603 let fixture: serde_json::Value = serde_json::from_str(include_str!(
604 "../../runtime/tests/fixtures/binary-sync-pack-v1-combined-response.json"
605 ))
606 .expect("fixture json");
607 let mut encoded = hex::decode(
608 fixture["encodedHex"]
609 .as_str()
610 .expect("fixture encodedHex string"),
611 )
612 .expect("fixture encoded hex");
613 encoded[4..6].copy_from_slice(&10u16.to_le_bytes());
614 let error = decode_binary_sync_pack(&encoded).expect_err("old version is rejected");
615 assert!(error
616 .to_string()
617 .contains("unsupported binary sync pack version: 10"));
618 }
619}