Skip to main content

syncular_protocol/
binary_sync_pack.rs

1use crate::binary_snapshot::decode_binary_snapshot_table;
2use crate::error::{ProtocolError, Result};
3use crate::{
4    CombinedResponse, OperationResult, PullResponse, PushBatchResponse, PushCommitResponse,
5    SnapshotChunkRef, SubscriptionIntegrity, SubscriptionResponse, SyncChange, SyncCommit,
6    SyncSnapshot, BINARY_SYNC_PACK_WIRE_VERSION,
7};
8use serde_json::{Map, Value};
9
10pub use crate::{SYNC_PACK_CONTENT_TYPE, SYNC_PACK_ENCODING_BINARY_V1};
11
12const MAGIC: &[u8; 4] = b"SSP1";
13const VERSION: u16 = BINARY_SYNC_PACK_WIRE_VERSION;
14const FLAG_NONE: u16 = 0;
15
16struct PendingBinaryChangeRowRef {
17    change_index: usize,
18    table: String,
19    group_index: usize,
20    row_index: usize,
21}
22
23pub fn is_binary_sync_pack_content_type(content_type: Option<&str>) -> bool {
24    content_type
25        .and_then(|value| value.split(';').next())
26        .is_some_and(|value| value.trim() == SYNC_PACK_CONTENT_TYPE)
27}
28
29pub fn decode_binary_sync_pack(bytes: &[u8]) -> Result<CombinedResponse> {
30    let mut reader = BinarySyncPackReader::new(bytes);
31    reader.expect_magic(MAGIC, "binary sync pack")?;
32
33    let version = reader.read_u16("binary sync pack version")?;
34    if version != VERSION {
35        return Err(ProtocolError::message(format!(
36            "unsupported binary sync pack version: {version}"
37        )));
38    }
39    let flags = reader.read_u16("binary sync pack flags")?;
40    if flags != FLAG_NONE {
41        return Err(ProtocolError::message(format!(
42            "unsupported binary sync pack flags: {flags}"
43        )));
44    }
45
46    let response = CombinedResponse {
47        ok: reader.read_bool("combined response ok")?,
48        required_schema_version: reader.read_optional_i32("required schema version")?,
49        latest_schema_version: reader.read_optional_i32("latest schema version")?,
50        push: reader.read_optional_value(read_push_response)?,
51        pull: reader.read_optional_value(read_pull_response)?,
52    };
53    reader.assert_done()?;
54    Ok(response)
55}
56
57fn read_push_response(reader: &mut BinarySyncPackReader<'_>) -> Result<PushBatchResponse> {
58    Ok(PushBatchResponse {
59        ok: reader.read_bool("push response ok")?,
60        commits: reader.read_array("push commits", read_push_commit_response)?,
61    })
62}
63
64fn read_push_commit_response(reader: &mut BinarySyncPackReader<'_>) -> Result<PushCommitResponse> {
65    let _ok = reader.read_bool("push commit ok")?;
66    Ok(PushCommitResponse {
67        client_commit_id: reader.read_string32("push client commit id")?,
68        status: read_push_commit_status(reader)?,
69        commit_seq: reader.read_optional_i64("push commit seq")?,
70        results: reader.read_array("push operation results", read_operation_result)?,
71    })
72}
73
74fn read_operation_result(reader: &mut BinarySyncPackReader<'_>) -> Result<OperationResult> {
75    let op_index = reader.read_i32("operation result index")?;
76    let status = reader.read_u8("operation result status")?;
77    match status {
78        1 => Ok(OperationResult {
79            op_index,
80            status: "applied".to_string(),
81            message: None,
82            error: None,
83            code: None,
84            retriable: None,
85            server_version: None,
86            server_row: None,
87        }),
88        2 => Ok(OperationResult {
89            op_index,
90            status: "conflict".to_string(),
91            message: Some(reader.read_string32("operation result conflict message")?),
92            error: None,
93            code: reader.read_optional_string32("operation result conflict code")?,
94            retriable: None,
95            server_version: Some(reader.read_i64("operation result conflict server version")?),
96            server_row: Some(reader.read_json("operation result conflict server row")?),
97        }),
98        3 => Ok(OperationResult {
99            op_index,
100            status: "error".to_string(),
101            message: None,
102            error: Some(reader.read_string32("operation result error message")?),
103            code: reader.read_optional_string32("operation result error code")?,
104            retriable: reader.read_optional_bool("operation result error retriable")?,
105            server_version: None,
106            server_row: None,
107        }),
108        value => Err(ProtocolError::message(format!(
109            "unsupported binary sync pack operation result status byte: {value}"
110        ))),
111    }
112}
113
114fn read_push_commit_status(reader: &mut BinarySyncPackReader<'_>) -> Result<String> {
115    match reader.read_u8("push commit status")? {
116        1 => Ok("applied".to_string()),
117        2 => Ok("cached".to_string()),
118        3 => Ok("rejected".to_string()),
119        value => Err(ProtocolError::message(format!(
120            "unsupported binary sync pack push commit status byte: {value}"
121        ))),
122    }
123}
124
125fn read_pull_response(reader: &mut BinarySyncPackReader<'_>) -> Result<PullResponse> {
126    Ok(PullResponse {
127        ok: reader.read_bool("pull response ok")?,
128        subscriptions: reader.read_array("pull subscriptions", read_subscription_response)?,
129    })
130}
131
132fn read_subscription_response(
133    reader: &mut BinarySyncPackReader<'_>,
134) -> Result<SubscriptionResponse> {
135    let id = reader.read_string32("subscription id")?;
136    let status = reader.read_string16("subscription status")?;
137    let scopes = reader.read_json_map("subscription scopes")?;
138    let bootstrap = reader.read_bool("subscription bootstrap")?;
139    let bootstrap_state = reader
140        .read_optional_json("subscription bootstrap state")?
141        .map(serde_json::from_value)
142        .transpose()?;
143    let next_cursor = reader.read_i64("subscription next cursor")?;
144    let integrity = reader.read_optional_value(read_subscription_integrity)?;
145    let commits = reader.read_array("subscription commits", read_commit)?;
146    let snapshots = reader.read_optional_array("subscription snapshots", read_snapshot)?;
147    Ok(SubscriptionResponse {
148        id,
149        status,
150        scopes,
151        bootstrap,
152        bootstrap_state,
153        next_cursor,
154        integrity,
155        commits,
156        snapshots,
157    })
158}
159
160fn read_subscription_integrity(
161    reader: &mut BinarySyncPackReader<'_>,
162) -> Result<SubscriptionIntegrity> {
163    Ok(SubscriptionIntegrity {
164        partition_id: reader.read_string32("subscription integrity partitionId")?,
165        previous_chain_root: reader.read_string32("subscription integrity previous root")?,
166        commit_chain_root: reader.read_string32("subscription integrity chain root")?,
167        commit_seq: reader.read_i64("subscription integrity commit seq")?,
168    })
169}
170
171fn read_commit(reader: &mut BinarySyncPackReader<'_>) -> Result<SyncCommit> {
172    Ok(SyncCommit {
173        commit_seq: reader.read_i64("commit seq")?,
174        created_at: reader.read_string32("commit createdAt")?,
175        actor_id: reader.read_string32("commit actorId")?,
176        changes: read_changes_v8(reader)?,
177    })
178}
179
180fn read_changes_v8(reader: &mut BinarySyncPackReader<'_>) -> Result<Vec<SyncChange>> {
181    let table_names = reader.read_array("commit change table dictionary", |reader| {
182        reader.read_string16("commit change table")
183    })?;
184    let scope_values_by_index = reader.read_array("commit change scope dictionary", |reader| {
185        reader.read_string_map("commit change scopes")
186    })?;
187    let change_count = reader.read_u32("commit changes length")? as usize;
188    let mut changes = Vec::with_capacity(change_count);
189    let mut row_refs = Vec::new();
190    for change_index in 0..change_count {
191        changes.push(read_change_metadata_v8(
192            reader,
193            change_index,
194            &table_names,
195            &scope_values_by_index,
196            &mut row_refs,
197        )?);
198    }
199
200    let group_count = reader.read_u32("binary change row group count")? as usize;
201    let mut group_rows = Vec::with_capacity(group_count);
202    for _ in 0..group_count {
203        let table = table_name_at(
204            &table_names,
205            reader.read_u16("binary change row group table index")? as usize,
206        )?;
207        let payload = reader.read_bytes32("binary change row group payload")?;
208        let decoded = decode_binary_snapshot_table(&payload)?;
209        if decoded.table != table {
210            return Err(ProtocolError::message(format!(
211                "binary sync pack row group table mismatch: expected {table}, got {}",
212                decoded.table
213            )));
214        }
215        group_rows.push(decoded.rows.into_iter().map(Some).collect::<Vec<_>>());
216    }
217
218    for row_ref in row_refs {
219        let Some(rows) = group_rows.get_mut(row_ref.group_index) else {
220            return Err(ProtocolError::message(format!(
221                "binary sync pack change row ref has invalid group index: {}",
222                row_ref.group_index
223            )));
224        };
225        let Some(row) = rows.get_mut(row_ref.row_index) else {
226            return Err(ProtocolError::message(format!(
227                "binary sync pack change row ref has invalid row index: group={}, row={}",
228                row_ref.group_index, row_ref.row_index
229            )));
230        };
231        let Some(row) = row.take() else {
232            return Err(ProtocolError::message(format!(
233                "binary sync pack change row ref was already consumed: group={}, row={}",
234                row_ref.group_index, row_ref.row_index
235            )));
236        };
237        let Some(change) = changes.get_mut(row_ref.change_index) else {
238            return Err(ProtocolError::message(format!(
239                "binary sync pack change row ref has invalid change index: {}",
240                row_ref.change_index
241            )));
242        };
243        if change.table != row_ref.table {
244            return Err(ProtocolError::message(
245                "binary sync pack row ref table mismatch",
246            ));
247        }
248        change.row_json = Some(Value::Object(row));
249    }
250
251    Ok(changes)
252}
253
254fn read_change_metadata_v8(
255    reader: &mut BinarySyncPackReader<'_>,
256    change_index: usize,
257    table_names: &[String],
258    scope_values_by_index: &[Map<String, Value>],
259    row_refs: &mut Vec<PendingBinaryChangeRowRef>,
260) -> Result<SyncChange> {
261    let table = table_name_at(table_names, reader.read_u16("change table index")? as usize)?;
262    let row_id = reader.read_string32("change row id")?;
263    let op = match reader.read_u8("change op")? {
264        1 => "upsert".to_string(),
265        2 => "delete".to_string(),
266        value => {
267            return Err(ProtocolError::message(format!(
268                "unsupported binary sync pack change op byte: {value}"
269            )));
270        }
271    };
272    let row_json = match reader.read_u8("change row payload kind")? {
273        0 => None,
274        1 => Some(reader.read_json("change row json")?),
275        2 => {
276            row_refs.push(PendingBinaryChangeRowRef {
277                change_index,
278                table: table.clone(),
279                group_index: reader.read_u32("change row group index")? as usize,
280                row_index: reader.read_u32("change row group row index")? as usize,
281            });
282            None
283        }
284        value => {
285            return Err(ProtocolError::message(format!(
286                "unsupported binary sync pack change row payload kind: {value}"
287            )));
288        }
289    };
290    Ok(SyncChange {
291        table,
292        row_id,
293        op,
294        row_json,
295        row_version: reader.read_optional_i64("change row version")?,
296        scopes: scope_values_at(
297            scope_values_by_index,
298            reader.read_u32("change scopes index")? as usize,
299        )?,
300    })
301}
302
303fn table_name_at(table_names: &[String], index: usize) -> Result<String> {
304    table_names.get(index).cloned().ok_or_else(|| {
305        ProtocolError::message(format!("binary sync pack table index is invalid: {index}"))
306    })
307}
308
309fn scope_values_at(
310    scope_values_by_index: &[Map<String, Value>],
311    index: usize,
312) -> Result<Map<String, Value>> {
313    scope_values_by_index.get(index).cloned().ok_or_else(|| {
314        ProtocolError::message(format!("binary sync pack scope index is invalid: {index}"))
315    })
316}
317
318fn read_snapshot(reader: &mut BinarySyncPackReader<'_>) -> Result<SyncSnapshot> {
319    let mut snapshot = SyncSnapshot {
320        table: reader.read_string16("snapshot table")?,
321        rows: reader.read_array("snapshot rows", |reader| reader.read_json("snapshot row"))?,
322        chunks: reader.read_optional_array("snapshot chunks", read_snapshot_chunk_ref)?,
323        artifacts: None,
324        manifest: None,
325        is_first_page: reader.read_bool("snapshot first page")?,
326        is_last_page: reader.read_bool("snapshot last page")?,
327        bootstrap_state_after: reader
328            .read_optional_json("snapshot bootstrap state after")?
329            .map(serde_json::from_value)
330            .transpose()?,
331    };
332    snapshot.manifest = reader
333        .read_optional_json("snapshot manifest")?
334        .map(serde_json::from_value)
335        .transpose()?;
336    snapshot.artifacts = reader
337        .read_optional_json("snapshot artifacts")?
338        .map(serde_json::from_value)
339        .transpose()?;
340    Ok(snapshot)
341}
342
343fn read_snapshot_chunk_ref(reader: &mut BinarySyncPackReader<'_>) -> Result<SnapshotChunkRef> {
344    let id = reader.read_string32("snapshot chunk id")?;
345    let byte_length = reader.read_i64("snapshot chunk byte length")?;
346    let sha256 = reader.read_string16("snapshot chunk sha256")?;
347    let encoding = reader.read_string16("snapshot chunk encoding")?;
348    let compression = reader.read_string16("snapshot chunk compression")?;
349    Ok(SnapshotChunkRef {
350        id,
351        byte_length,
352        sha256,
353        encoding,
354        compression,
355    })
356}
357
358struct BinarySyncPackReader<'a> {
359    bytes: &'a [u8],
360    offset: usize,
361}
362
363impl<'a> BinarySyncPackReader<'a> {
364    fn new(bytes: &'a [u8]) -> Self {
365        Self { bytes, offset: 0 }
366    }
367
368    fn expect_magic(&mut self, magic: &[u8], label: &str) -> Result<()> {
369        let actual = self.read_bytes(magic.len(), &format!("{label} magic"))?;
370        if actual != magic {
371            return Err(ProtocolError::message(format!("unexpected {label} magic")));
372        }
373        Ok(())
374    }
375
376    fn read_bool(&mut self, label: &str) -> Result<bool> {
377        match self.read_u8(label)? {
378            0 => Ok(false),
379            1 => Ok(true),
380            value => Err(ProtocolError::message(format!(
381                "{label} expected boolean byte, got {value}"
382            ))),
383        }
384    }
385
386    fn read_optional_bool(&mut self, label: &str) -> Result<Option<bool>> {
387        self.read_optional_value(|reader| reader.read_bool(label))
388    }
389
390    fn read_u8(&mut self, label: &str) -> Result<u8> {
391        self.require(1, label)?;
392        let value = self.bytes[self.offset];
393        self.offset += 1;
394        Ok(value)
395    }
396
397    fn read_u16(&mut self, label: &str) -> Result<u16> {
398        let bytes = self.read_array_bytes::<2>(label)?;
399        Ok(u16::from_le_bytes(bytes))
400    }
401
402    fn read_u32(&mut self, label: &str) -> Result<u32> {
403        let bytes = self.read_array_bytes::<4>(label)?;
404        Ok(u32::from_le_bytes(bytes))
405    }
406
407    fn read_i32(&mut self, label: &str) -> Result<i32> {
408        let bytes = self.read_array_bytes::<4>(label)?;
409        Ok(i32::from_le_bytes(bytes))
410    }
411
412    fn read_optional_i32(&mut self, label: &str) -> Result<Option<i32>> {
413        self.read_optional_value(|reader| reader.read_i32(label))
414    }
415
416    fn read_i64(&mut self, label: &str) -> Result<i64> {
417        let bytes = self.read_array_bytes::<8>(label)?;
418        Ok(i64::from_le_bytes(bytes))
419    }
420
421    fn read_optional_i64(&mut self, label: &str) -> Result<Option<i64>> {
422        self.read_optional_value(|reader| reader.read_i64(label))
423    }
424
425    fn read_string16(&mut self, label: &str) -> Result<String> {
426        let length = self.read_u16(&format!("{label} length"))? as usize;
427        self.read_string(length, label)
428    }
429
430    fn read_string32(&mut self, label: &str) -> Result<String> {
431        let length = self.read_u32(&format!("{label} length"))? as usize;
432        self.read_string(length, label)
433    }
434
435    fn read_optional_string32(&mut self, label: &str) -> Result<Option<String>> {
436        self.read_optional_value(|reader| reader.read_string32(label))
437    }
438
439    fn read_bytes32(&mut self, label: &str) -> Result<Vec<u8>> {
440        let length = self.read_u32(&format!("{label} length"))? as usize;
441        Ok(self.read_bytes(length, label)?.to_vec())
442    }
443
444    fn read_json(&mut self, label: &str) -> Result<Value> {
445        Ok(serde_json::from_str(&self.read_string32(label)?)?)
446    }
447
448    fn read_optional_json(&mut self, label: &str) -> Result<Option<Value>> {
449        self.read_optional_value(|reader| reader.read_json(label))
450    }
451
452    fn read_json_map(&mut self, label: &str) -> Result<Map<String, Value>> {
453        match self.read_json(label)? {
454            Value::Object(map) => Ok(map),
455            _ => Err(ProtocolError::message(format!(
456                "{label} expected JSON object"
457            ))),
458        }
459    }
460
461    fn read_string_map(&mut self, label: &str) -> Result<Map<String, Value>> {
462        let length = self.read_u32(&format!("{label} length"))? as usize;
463        let mut map = Map::with_capacity(length);
464        for _ in 0..length {
465            let key = self.read_string16(&format!("{label} key"))?;
466            let value = self.read_string32(&format!("{label} value"))?;
467            map.insert(key, Value::String(value));
468        }
469        Ok(map)
470    }
471
472    fn read_array<T>(
473        &mut self,
474        label: &str,
475        mut read: impl FnMut(&mut Self) -> Result<T>,
476    ) -> Result<Vec<T>> {
477        let length = self.read_u32(&format!("{label} length"))? as usize;
478        let mut values = Vec::with_capacity(length);
479        for _ in 0..length {
480            values.push(read(self)?);
481        }
482        Ok(values)
483    }
484
485    fn read_optional_array<T>(
486        &mut self,
487        label: &str,
488        mut read: impl FnMut(&mut Self) -> Result<T>,
489    ) -> Result<Option<Vec<T>>> {
490        self.read_optional_value(|reader| reader.read_array(label, &mut read))
491    }
492
493    fn read_optional_value<T>(
494        &mut self,
495        read: impl FnOnce(&mut Self) -> Result<T>,
496    ) -> Result<Option<T>> {
497        match self.read_u8("optional value present")? {
498            0 => Ok(None),
499            1 => read(self).map(Some),
500            value => Err(ProtocolError::message(format!(
501                "optional value marker must be 0 or 1, got {value}"
502            ))),
503        }
504    }
505
506    fn read_string(&mut self, length: usize, label: &str) -> Result<String> {
507        String::from_utf8(self.read_bytes(length, label)?.to_vec())
508            .map_err(|err| ProtocolError::message(format!("{label} is not valid UTF-8: {err}")))
509    }
510
511    fn read_array_bytes<const N: usize>(&mut self, label: &str) -> Result<[u8; N]> {
512        let mut out = [0u8; N];
513        out.copy_from_slice(self.read_bytes(N, label)?);
514        Ok(out)
515    }
516
517    fn read_bytes(&mut self, length: usize, label: &str) -> Result<&'a [u8]> {
518        self.require(length, label)?;
519        let value = &self.bytes[self.offset..self.offset + length];
520        self.offset += length;
521        Ok(value)
522    }
523
524    fn assert_done(&self) -> Result<()> {
525        if self.offset != self.bytes.len() {
526            return Err(ProtocolError::message(
527                "binary sync pack has trailing bytes",
528            ));
529        }
530        Ok(())
531    }
532
533    fn require(&self, length: usize, label: &str) -> Result<()> {
534        if self.offset + length > self.bytes.len() {
535            return Err(ProtocolError::message(format!(
536                "{label} exceeds binary sync pack bounds"
537            )));
538        }
539        Ok(())
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::{
546        decode_binary_sync_pack, is_binary_sync_pack_content_type, SYNC_PACK_CONTENT_TYPE,
547    };
548
549    #[test]
550    fn decodes_current_typescript_fixture() {
551        let fixture: serde_json::Value = serde_json::from_str(include_str!(
552            "../../runtime/tests/fixtures/binary-sync-pack-v1-combined-response.json"
553        ))
554        .expect("fixture json");
555        assert_eq!(
556            fixture["contentType"].as_str(),
557            Some(SYNC_PACK_CONTENT_TYPE)
558        );
559        assert!(is_binary_sync_pack_content_type(Some(
560            "application/vnd.syncular.sync-pack.v1; charset=binary"
561        )));
562
563        let encoded = hex::decode(
564            fixture["encodedHex"]
565                .as_str()
566                .expect("fixture encodedHex string"),
567        )
568        .expect("fixture encoded hex");
569        let response = decode_binary_sync_pack(&encoded).expect("decode current fixture");
570        assert_eq!(response.required_schema_version, Some(2));
571        assert_eq!(response.latest_schema_version, Some(3));
572        let push = response.push.as_ref().expect("push response");
573        assert_eq!(push.commits[0].client_commit_id, "fixture-local-1");
574        assert_eq!(push.commits[1].status, "rejected");
575        assert_eq!(push.commits[1].results[0].server_version, Some(7));
576        let pull = response.pull.unwrap();
577        let subscription = &pull.subscriptions[0];
578        assert_eq!(
579            subscription
580                .integrity
581                .as_ref()
582                .map(|integrity| integrity.commit_seq),
583            Some(42)
584        );
585        let change = &pull.subscriptions[0].commits[0].changes[0];
586        assert_eq!(change.table, "tasks");
587        assert_eq!(change.row_id, "task-1");
588        assert_eq!(
589            change.row_json.as_ref().unwrap()["title"].as_str(),
590            Some("Remote")
591        );
592        assert_eq!(
593            subscription.snapshots.as_ref().unwrap()[0]
594                .manifest
595                .as_ref()
596                .map(|manifest| manifest.digest.as_str()),
597            Some("28906bb034df33f281391be2cc697cdf669646f5e2158f07b6b9a35277cc4b6b")
598        );
599    }
600
601    #[test]
602    fn rejects_old_binary_sync_pack_versions() {
603        let fixture: serde_json::Value = serde_json::from_str(include_str!(
604            "../../runtime/tests/fixtures/binary-sync-pack-v1-combined-response.json"
605        ))
606        .expect("fixture json");
607        let mut encoded = hex::decode(
608            fixture["encodedHex"]
609                .as_str()
610                .expect("fixture encodedHex string"),
611        )
612        .expect("fixture encoded hex");
613        encoded[4..6].copy_from_slice(&10u16.to_le_bytes());
614        let error = decode_binary_sync_pack(&encoded).expect_err("old version is rejected");
615        assert!(error
616            .to_string()
617            .contains("unsupported binary sync pack version: 10"));
618    }
619}