Skip to main content

syncular_protocol/
binary_snapshot.rs

1use crate::error::{ProtocolError, Result};
2use serde_json::{Map, Number, Value};
3use std::fmt;
4
5const MAGIC: &[u8; 4] = b"SBT1";
6const VERSION: u16 = 1;
7const FLAG_NONE: u16 = 0;
8const COLUMN_FLAG_NULLABLE: u8 = 1;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum BinarySnapshotColumnType {
12    String,
13    Integer,
14    Float,
15    Boolean,
16    Json,
17    Bytes,
18}
19
20impl BinarySnapshotColumnType {
21    fn from_tag(tag: u8) -> Result<Self> {
22        match tag {
23            1 => Ok(Self::String),
24            2 => Ok(Self::Integer),
25            3 => Ok(Self::Float),
26            4 => Ok(Self::Boolean),
27            5 => Ok(Self::Json),
28            6 => Ok(Self::Bytes),
29            _ => Err(ProtocolError::message(format!(
30                "unsupported binary snapshot type tag: {tag}"
31            ))),
32        }
33    }
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct BinarySnapshotColumn {
38    pub name: String,
39    pub column_type: BinarySnapshotColumnType,
40    pub nullable: bool,
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct DecodedBinarySnapshotTable {
45    pub table: String,
46    pub columns: Vec<BinarySnapshotColumn>,
47    pub rows: Vec<Map<String, Value>>,
48}
49
50#[derive(Debug, Clone, PartialEq)]
51pub enum BinarySnapshotCell {
52    Null,
53    String(String),
54    Integer(i64),
55    Float(f64),
56    Boolean(bool),
57    Json(Value),
58    Bytes(Vec<u8>),
59}
60
61impl BinarySnapshotCell {
62    pub fn into_json_value(self) -> Value {
63        match self {
64            Self::Null => Value::Null,
65            Self::String(value) => Value::String(value),
66            Self::Integer(value) => Value::Number(Number::from(value)),
67            Self::Float(value) => Number::from_f64(value)
68                .map(Value::Number)
69                .unwrap_or(Value::Null),
70            Self::Boolean(value) => Value::Bool(value),
71            Self::Json(value) => value,
72            Self::Bytes(bytes) => Value::Array(
73                bytes
74                    .into_iter()
75                    .map(|byte| Value::Number(Number::from(byte)))
76                    .collect(),
77            ),
78        }
79    }
80}
81
82#[derive(Debug, Clone, PartialEq)]
83pub struct DecodedBinarySnapshotRows {
84    pub table: String,
85    pub columns: Vec<BinarySnapshotColumn>,
86    pub rows: Vec<Vec<BinarySnapshotCell>>,
87}
88
89impl DecodedBinarySnapshotRows {
90    pub fn row_count(&self) -> usize {
91        self.rows.len()
92    }
93
94    pub fn into_value_rows(self) -> Vec<Value> {
95        self.into_maps().into_iter().map(Value::Object).collect()
96    }
97
98    pub fn into_maps(self) -> Vec<Map<String, Value>> {
99        let columns = self.columns;
100        self.rows
101            .into_iter()
102            .map(|row| {
103                columns
104                    .iter()
105                    .zip(row)
106                    .map(|(column, value)| (column.name.clone(), value.into_json_value()))
107                    .collect()
108            })
109            .collect()
110    }
111}
112
113#[derive(Debug, Clone, PartialEq)]
114pub struct BinarySnapshotPayload {
115    bytes: Vec<u8>,
116    pub table: String,
117    pub columns: Vec<BinarySnapshotColumn>,
118    pub row_count: usize,
119    rows_offset: usize,
120}
121
122impl BinarySnapshotPayload {
123    pub fn row_count(&self) -> usize {
124        self.row_count
125    }
126
127    pub fn bytes(&self) -> &[u8] {
128        &self.bytes
129    }
130
131    pub fn row_cursor(&self) -> BinarySnapshotRowCursor<'_> {
132        BinarySnapshotRowCursor {
133            reader: BinarySnapshotReader {
134                bytes: &self.bytes,
135                offset: self.rows_offset,
136            },
137            columns: &self.columns,
138            null_bitmap_bytes: self.columns.len().div_ceil(8),
139            remaining: self.row_count,
140        }
141    }
142
143    pub fn into_decoded_rows(self) -> Result<DecodedBinarySnapshotRows> {
144        decode_binary_snapshot_rows(&self.bytes)
145    }
146
147    pub fn into_value_rows(self) -> Result<Vec<Value>> {
148        self.into_decoded_rows()
149            .map(DecodedBinarySnapshotRows::into_value_rows)
150    }
151}
152
153#[derive(Debug, Clone, Copy, PartialEq)]
154pub enum BorrowedBinarySnapshotCell<'a> {
155    Null,
156    String(&'a str),
157    Integer(i64),
158    Float(f64),
159    Boolean(bool),
160    Json(&'a str),
161    Bytes(&'a [u8]),
162}
163
164#[derive(Debug)]
165pub enum BinarySnapshotVisitError<E> {
166    Protocol(ProtocolError),
167    Visitor(E),
168}
169
170impl<E> BinarySnapshotVisitError<E> {
171    fn protocol(error: ProtocolError) -> Self {
172        Self::Protocol(error)
173    }
174
175    fn visitor(error: E) -> Self {
176        Self::Visitor(error)
177    }
178}
179
180impl<E: fmt::Display> fmt::Display for BinarySnapshotVisitError<E> {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        match self {
183            Self::Protocol(error) => error.fmt(f),
184            Self::Visitor(error) => error.fmt(f),
185        }
186    }
187}
188
189impl<E> std::error::Error for BinarySnapshotVisitError<E> where E: std::error::Error + 'static {}
190
191pub trait BorrowedBinarySnapshotRawCellVisitor<'a> {
192    type Error;
193
194    fn visit_null(&mut self) -> std::result::Result<(), Self::Error>;
195    fn visit_string_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error>;
196    fn visit_integer(&mut self, value: i64) -> std::result::Result<(), Self::Error>;
197    fn visit_float(&mut self, value: f64) -> std::result::Result<(), Self::Error>;
198    fn visit_boolean(&mut self, value: bool) -> std::result::Result<(), Self::Error>;
199    fn visit_json_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error>;
200    fn visit_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error>;
201}
202
203pub struct BinarySnapshotRowCursor<'a> {
204    reader: BinarySnapshotReader<'a>,
205    columns: &'a [BinarySnapshotColumn],
206    null_bitmap_bytes: usize,
207    remaining: usize,
208}
209
210impl<'a> BinarySnapshotRowCursor<'a> {
211    pub fn read_next_row<F, E>(
212        &mut self,
213        mut on_cell: F,
214    ) -> std::result::Result<bool, BinarySnapshotVisitError<E>>
215    where
216        F: FnMut(
217            usize,
218            &BinarySnapshotColumn,
219            BorrowedBinarySnapshotCell<'a>,
220        ) -> std::result::Result<(), E>,
221    {
222        if self.remaining == 0 {
223            return Ok(false);
224        }
225
226        let null_bitmap = self
227            .reader
228            .read_bytes(self.null_bitmap_bytes, "binary snapshot row null bitmap")
229            .map_err(BinarySnapshotVisitError::protocol)?;
230        for (column_index, column) in self.columns.iter().enumerate() {
231            let is_null = null_bitmap[column_index / 8] & (1u8 << (column_index % 8)) != 0;
232            if is_null {
233                if !column.nullable {
234                    return Err(BinarySnapshotVisitError::protocol(ProtocolError::message(
235                        format!("binary snapshot column {} is not nullable", column.name),
236                    )));
237                }
238                on_cell(column_index, column, BorrowedBinarySnapshotCell::Null)
239                    .map_err(BinarySnapshotVisitError::visitor)?;
240                continue;
241            }
242            let value = self
243                .reader
244                .read_borrowed_cell(column.column_type, &column.name)
245                .map_err(BinarySnapshotVisitError::protocol)?;
246            on_cell(column_index, column, value).map_err(BinarySnapshotVisitError::visitor)?;
247        }
248        self.remaining -= 1;
249        Ok(true)
250    }
251
252    pub fn read_next_row_with_raw_visitor_trusted<V>(
253        &mut self,
254        visitor: &mut V,
255    ) -> std::result::Result<bool, BinarySnapshotVisitError<V::Error>>
256    where
257        V: BorrowedBinarySnapshotRawCellVisitor<'a>,
258    {
259        if self.remaining == 0 {
260            return Ok(false);
261        }
262
263        let null_bitmap = self
264            .reader
265            .read_bytes(self.null_bitmap_bytes, "binary snapshot row null bitmap")
266            .map_err(BinarySnapshotVisitError::protocol)?;
267        for (column_index, column) in self.columns.iter().enumerate() {
268            let is_null = null_bitmap[column_index / 8] & (1u8 << (column_index % 8)) != 0;
269            if is_null {
270                visitor
271                    .visit_null()
272                    .map_err(BinarySnapshotVisitError::visitor)?;
273                continue;
274            }
275            self.reader
276                .visit_raw_cell_trusted(column.column_type, visitor)?;
277        }
278        self.remaining -= 1;
279        Ok(true)
280    }
281
282    pub fn assert_done(&self) -> Result<()> {
283        self.reader.assert_done()
284    }
285}
286
287pub fn decode_binary_snapshot_table(bytes: &[u8]) -> Result<DecodedBinarySnapshotTable> {
288    let DecodedBinarySnapshotRows {
289        table,
290        columns,
291        rows,
292    } = decode_binary_snapshot_rows(bytes)?;
293    let value_rows = rows
294        .into_iter()
295        .map(|row| {
296            columns
297                .iter()
298                .zip(row)
299                .map(|(column, value)| (column.name.clone(), value.into_json_value()))
300                .collect()
301        })
302        .collect();
303    Ok(DecodedBinarySnapshotTable {
304        table,
305        columns,
306        rows: value_rows,
307    })
308}
309
310pub fn decode_binary_snapshot_rows(bytes: &[u8]) -> Result<DecodedBinarySnapshotRows> {
311    let mut reader = BinarySnapshotReader::new(bytes);
312    let (table, columns, row_count, _) = read_binary_snapshot_header(&mut reader)?;
313    let null_bitmap_bytes = columns.len().div_ceil(8);
314    let mut rows = Vec::with_capacity(row_count);
315    for _ in 0..row_count {
316        let null_bitmap =
317            reader.read_bytes(null_bitmap_bytes, "binary snapshot row null bitmap")?;
318        let mut row = Vec::with_capacity(columns.len());
319        for (column_index, column) in columns.iter().enumerate() {
320            let is_null = null_bitmap[column_index / 8] & (1u8 << (column_index % 8) as u32) != 0;
321            if is_null {
322                if !column.nullable {
323                    return Err(ProtocolError::message(format!(
324                        "binary snapshot column {} is not nullable",
325                        column.name
326                    )));
327                }
328                row.push(BinarySnapshotCell::Null);
329                continue;
330            }
331            row.push(reader.read_cell(column.column_type, &column.name)?);
332        }
333        rows.push(row);
334    }
335    reader.assert_done()?;
336
337    Ok(DecodedBinarySnapshotRows {
338        table,
339        columns,
340        rows,
341    })
342}
343
344pub fn decode_binary_snapshot_payload(bytes: Vec<u8>) -> Result<BinarySnapshotPayload> {
345    let mut reader = BinarySnapshotReader::new(&bytes);
346    let (table, columns, row_count, rows_offset) = read_binary_snapshot_header(&mut reader)?;
347    Ok(BinarySnapshotPayload {
348        bytes,
349        table,
350        columns,
351        row_count,
352        rows_offset,
353    })
354}
355
356fn read_binary_snapshot_header(
357    reader: &mut BinarySnapshotReader<'_>,
358) -> Result<(String, Vec<BinarySnapshotColumn>, usize, usize)> {
359    reader.expect_magic(MAGIC, "binary snapshot table")?;
360
361    let version = reader.read_u16("binary snapshot version")?;
362    if version != VERSION {
363        return Err(ProtocolError::message(format!(
364            "unsupported binary snapshot version: {version}"
365        )));
366    }
367    let flags = reader.read_u16("binary snapshot flags")?;
368    if flags != FLAG_NONE {
369        return Err(ProtocolError::message(format!(
370            "unsupported binary snapshot flags: {flags}"
371        )));
372    }
373
374    let table = reader.read_string16("binary snapshot table name")?;
375    let column_count = reader.read_u16("binary snapshot column count")? as usize;
376    let mut columns = Vec::with_capacity(column_count);
377    for _ in 0..column_count {
378        let name = reader.read_string16("binary snapshot column name")?;
379        let column_type =
380            BinarySnapshotColumnType::from_tag(reader.read_u8("binary snapshot column type")?)?;
381        let column_flags = reader.read_u8("binary snapshot column flags")?;
382        if column_flags & !COLUMN_FLAG_NULLABLE != 0 {
383            return Err(ProtocolError::message(format!(
384                "unsupported binary snapshot column flags: {column_flags}"
385            )));
386        }
387        columns.push(BinarySnapshotColumn {
388            name,
389            column_type,
390            nullable: column_flags & COLUMN_FLAG_NULLABLE != 0,
391        });
392    }
393
394    let row_count = reader.read_u32("binary snapshot row count")? as usize;
395    let rows_offset = reader.offset;
396    Ok((table, columns, row_count, rows_offset))
397}
398
399struct BinarySnapshotReader<'a> {
400    bytes: &'a [u8],
401    offset: usize,
402}
403
404impl<'a> BinarySnapshotReader<'a> {
405    fn new(bytes: &'a [u8]) -> Self {
406        Self { bytes, offset: 0 }
407    }
408
409    fn expect_magic(&mut self, magic: &[u8], label: &str) -> Result<()> {
410        let actual = self.read_bytes(magic.len(), &format!("{label} magic"))?;
411        if actual != magic {
412            return Err(ProtocolError::message(format!("unexpected {label} magic")));
413        }
414        Ok(())
415    }
416
417    fn read_u8(&mut self, label: &str) -> Result<u8> {
418        self.require(1, label)?;
419        let value = self.bytes[self.offset];
420        self.offset += 1;
421        Ok(value)
422    }
423
424    fn read_u16(&mut self, label: &str) -> Result<u16> {
425        self.require(2, label)?;
426        let value = u16::from_le_bytes(
427            self.bytes[self.offset..self.offset + 2]
428                .try_into()
429                .expect("slice length checked"),
430        );
431        self.offset += 2;
432        Ok(value)
433    }
434
435    fn read_u32(&mut self, label: &str) -> Result<u32> {
436        self.require(4, label)?;
437        let value = u32::from_le_bytes(
438            self.bytes[self.offset..self.offset + 4]
439                .try_into()
440                .expect("slice length checked"),
441        );
442        self.offset += 4;
443        Ok(value)
444    }
445
446    fn read_i64(&mut self, label: &str) -> Result<i64> {
447        self.require(8, label)?;
448        let value = i64::from_le_bytes(
449            self.bytes[self.offset..self.offset + 8]
450                .try_into()
451                .expect("slice length checked"),
452        );
453        self.offset += 8;
454        Ok(value)
455    }
456
457    fn read_f64(&mut self, label: &str) -> Result<f64> {
458        self.require(8, label)?;
459        let value = f64::from_le_bytes(
460            self.bytes[self.offset..self.offset + 8]
461                .try_into()
462                .expect("slice length checked"),
463        );
464        self.offset += 8;
465        Ok(value)
466    }
467
468    fn read_string16(&mut self, label: &str) -> Result<String> {
469        let len = self.read_u16(&format!("{label} length"))? as usize;
470        let bytes = self.read_bytes(len, label)?;
471        String::from_utf8(bytes.to_vec())
472            .map_err(|err| ProtocolError::message(format!("decode {label} as utf8: {err}")))
473    }
474
475    fn read_string32(&mut self, label: &str) -> Result<String> {
476        let len = self.read_u32(&format!("{label} length"))? as usize;
477        let bytes = self.read_bytes(len, label)?;
478        String::from_utf8(bytes.to_vec())
479            .map_err(|err| ProtocolError::message(format!("decode {label} as utf8: {err}")))
480    }
481
482    fn read_str32(&mut self, label: &str) -> Result<&'a str> {
483        let len = self.read_u32(&format!("{label} length"))? as usize;
484        let bytes = self.read_bytes(len, label)?;
485        std::str::from_utf8(bytes)
486            .map_err(|err| ProtocolError::message(format!("decode {label} as utf8: {err}")))
487    }
488
489    fn read_bytes32(&mut self, label: &str) -> Result<&'a [u8]> {
490        let len = self.read_u32(&format!("{label} length"))? as usize;
491        self.read_bytes(len, label)
492    }
493
494    fn read_bytes(&mut self, len: usize, label: &str) -> Result<&'a [u8]> {
495        self.require(len, label)?;
496        let bytes = &self.bytes[self.offset..self.offset + len];
497        self.offset += len;
498        Ok(bytes)
499    }
500
501    fn read_cell(
502        &mut self,
503        column_type: BinarySnapshotColumnType,
504        column: &str,
505    ) -> Result<BinarySnapshotCell> {
506        match column_type {
507            BinarySnapshotColumnType::String => Ok(BinarySnapshotCell::String(
508                self.read_string32("binary snapshot string")?,
509            )),
510            BinarySnapshotColumnType::Integer => Ok(BinarySnapshotCell::Integer(
511                self.read_i64("binary snapshot integer")?,
512            )),
513            BinarySnapshotColumnType::Float => {
514                let value = self.read_f64("binary snapshot float")?;
515                Number::from_f64(value).ok_or_else(|| {
516                    ProtocolError::message(format!(
517                        "binary snapshot {column} contained non-finite float"
518                    ))
519                })?;
520                Ok(BinarySnapshotCell::Float(value))
521            }
522            BinarySnapshotColumnType::Boolean => {
523                let value = self.read_u8("binary snapshot boolean")?;
524                match value {
525                    0 => Ok(BinarySnapshotCell::Boolean(false)),
526                    1 => Ok(BinarySnapshotCell::Boolean(true)),
527                    _ => Err(ProtocolError::message(format!(
528                        "binary snapshot {column} expected boolean byte"
529                    ))),
530                }
531            }
532            BinarySnapshotColumnType::Json => {
533                let value = self.read_string32("binary snapshot json")?;
534                Ok(BinarySnapshotCell::Json(serde_json::from_str(&value)?))
535            }
536            BinarySnapshotColumnType::Bytes => {
537                let len = self.read_u32("binary snapshot bytes length")? as usize;
538                let bytes = self.read_bytes(len, "binary snapshot bytes")?;
539                Ok(BinarySnapshotCell::Bytes(bytes.to_vec()))
540            }
541        }
542    }
543
544    fn read_borrowed_cell(
545        &mut self,
546        column_type: BinarySnapshotColumnType,
547        column: &str,
548    ) -> Result<BorrowedBinarySnapshotCell<'a>> {
549        match column_type {
550            BinarySnapshotColumnType::String => Ok(BorrowedBinarySnapshotCell::String(
551                self.read_str32("binary snapshot string")?,
552            )),
553            BinarySnapshotColumnType::Integer => Ok(BorrowedBinarySnapshotCell::Integer(
554                self.read_i64("binary snapshot integer")?,
555            )),
556            BinarySnapshotColumnType::Float => {
557                let value = self.read_f64("binary snapshot float")?;
558                Number::from_f64(value).ok_or_else(|| {
559                    ProtocolError::message(format!(
560                        "binary snapshot {column} contained non-finite float"
561                    ))
562                })?;
563                Ok(BorrowedBinarySnapshotCell::Float(value))
564            }
565            BinarySnapshotColumnType::Boolean => {
566                let value = self.read_u8("binary snapshot boolean")?;
567                match value {
568                    0 => Ok(BorrowedBinarySnapshotCell::Boolean(false)),
569                    1 => Ok(BorrowedBinarySnapshotCell::Boolean(true)),
570                    _ => Err(ProtocolError::message(format!(
571                        "binary snapshot {column} expected boolean byte"
572                    ))),
573                }
574            }
575            BinarySnapshotColumnType::Json => Ok(BorrowedBinarySnapshotCell::Json(
576                self.read_str32("binary snapshot json")?,
577            )),
578            BinarySnapshotColumnType::Bytes => {
579                let len = self.read_u32("binary snapshot bytes length")? as usize;
580                let bytes = self.read_bytes(len, "binary snapshot bytes")?;
581                Ok(BorrowedBinarySnapshotCell::Bytes(bytes))
582            }
583        }
584    }
585
586    fn visit_raw_cell_trusted<V>(
587        &mut self,
588        column_type: BinarySnapshotColumnType,
589        visitor: &mut V,
590    ) -> std::result::Result<(), BinarySnapshotVisitError<V::Error>>
591    where
592        V: BorrowedBinarySnapshotRawCellVisitor<'a>,
593    {
594        match column_type {
595            BinarySnapshotColumnType::String => {
596                let value = self
597                    .read_bytes32("binary snapshot string")
598                    .map_err(BinarySnapshotVisitError::protocol)?;
599                visitor
600                    .visit_string_bytes(value)
601                    .map_err(BinarySnapshotVisitError::visitor)
602            }
603            BinarySnapshotColumnType::Integer => {
604                let value = self
605                    .read_i64("binary snapshot integer")
606                    .map_err(BinarySnapshotVisitError::protocol)?;
607                visitor
608                    .visit_integer(value)
609                    .map_err(BinarySnapshotVisitError::visitor)
610            }
611            BinarySnapshotColumnType::Float => {
612                let value = self
613                    .read_f64("binary snapshot float")
614                    .map_err(BinarySnapshotVisitError::protocol)?;
615                if !value.is_finite() {
616                    return Err(BinarySnapshotVisitError::protocol(ProtocolError::message(
617                        "binary snapshot contained non-finite float",
618                    )));
619                }
620                visitor
621                    .visit_float(value)
622                    .map_err(BinarySnapshotVisitError::visitor)
623            }
624            BinarySnapshotColumnType::Boolean => {
625                let value = self
626                    .read_u8("binary snapshot boolean")
627                    .map_err(BinarySnapshotVisitError::protocol)?;
628                match value {
629                    0 => visitor
630                        .visit_boolean(false)
631                        .map_err(BinarySnapshotVisitError::visitor),
632                    1 => visitor
633                        .visit_boolean(true)
634                        .map_err(BinarySnapshotVisitError::visitor),
635                    _ => Err(BinarySnapshotVisitError::protocol(ProtocolError::message(
636                        "binary snapshot expected boolean byte",
637                    ))),
638                }
639            }
640            BinarySnapshotColumnType::Json => {
641                let value = self
642                    .read_bytes32("binary snapshot json")
643                    .map_err(BinarySnapshotVisitError::protocol)?;
644                visitor
645                    .visit_json_bytes(value)
646                    .map_err(BinarySnapshotVisitError::visitor)
647            }
648            BinarySnapshotColumnType::Bytes => {
649                let len =
650                    self.read_u32("binary snapshot bytes length")
651                        .map_err(BinarySnapshotVisitError::protocol)? as usize;
652                let bytes = self
653                    .read_bytes(len, "binary snapshot bytes")
654                    .map_err(BinarySnapshotVisitError::protocol)?;
655                visitor
656                    .visit_bytes(bytes)
657                    .map_err(BinarySnapshotVisitError::visitor)
658            }
659        }
660    }
661
662    fn assert_done(&self) -> Result<()> {
663        if self.offset != self.bytes.len() {
664            return Err(ProtocolError::message(
665                "binary snapshot payload has trailing bytes",
666            ));
667        }
668        Ok(())
669    }
670
671    fn require(&self, len: usize, label: &str) -> Result<()> {
672        if self.offset + len > self.bytes.len() {
673            return Err(ProtocolError::message(format!(
674                "{label} exceeds binary snapshot payload bounds"
675            )));
676        }
677        Ok(())
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684    use serde_json::json;
685
686    fn push_u16(bytes: &mut Vec<u8>, value: u16) {
687        bytes.extend_from_slice(&value.to_le_bytes());
688    }
689
690    fn push_u32(bytes: &mut Vec<u8>, value: u32) {
691        bytes.extend_from_slice(&value.to_le_bytes());
692    }
693
694    fn push_i64(bytes: &mut Vec<u8>, value: i64) {
695        bytes.extend_from_slice(&value.to_le_bytes());
696    }
697
698    fn push_f64(bytes: &mut Vec<u8>, value: f64) {
699        bytes.extend_from_slice(&value.to_le_bytes());
700    }
701
702    fn push_string16(bytes: &mut Vec<u8>, value: &str) {
703        push_u16(bytes, value.len() as u16);
704        bytes.extend_from_slice(value.as_bytes());
705    }
706
707    fn push_string32(bytes: &mut Vec<u8>, value: &str) {
708        push_u32(bytes, value.len() as u32);
709        bytes.extend_from_slice(value.as_bytes());
710    }
711
712    #[test]
713    fn decodes_binary_snapshot_table_rows() {
714        let mut bytes = Vec::new();
715        bytes.extend_from_slice(b"SBT1");
716        push_u16(&mut bytes, 1);
717        push_u16(&mut bytes, 0);
718        push_string16(&mut bytes, "tasks");
719        push_u16(&mut bytes, 6);
720        for (name, tag, flags) in [
721            ("id", 1u8, 0u8),
722            ("completed", 4, 0),
723            ("server_version", 2, 0),
724            ("score", 3, 0),
725            ("metadata", 5, COLUMN_FLAG_NULLABLE),
726            ("payload", 6, 0),
727        ] {
728            push_string16(&mut bytes, name);
729            bytes.push(tag);
730            bytes.push(flags);
731        }
732        push_u32(&mut bytes, 2);
733
734        bytes.push(0);
735        push_string32(&mut bytes, "task-1");
736        bytes.push(0);
737        push_i64(&mut bytes, 42);
738        push_f64(&mut bytes, 1.5);
739        push_string32(&mut bytes, r#"{"priority":"high"}"#);
740        push_u32(&mut bytes, 3);
741        bytes.extend_from_slice(&[1, 2, 3]);
742
743        bytes.push(1 << 4);
744        push_string32(&mut bytes, "task-2");
745        bytes.push(1);
746        push_i64(&mut bytes, 43);
747        push_f64(&mut bytes, 2.25);
748        push_u32(&mut bytes, 0);
749
750        let decoded = decode_binary_snapshot_table(&bytes).unwrap();
751
752        assert_eq!(decoded.table, "tasks");
753        assert_eq!(decoded.columns.len(), 6);
754        assert_eq!(decoded.rows[0]["id"], json!("task-1"));
755        assert_eq!(decoded.rows[0]["completed"], json!(false));
756        assert_eq!(decoded.rows[0]["server_version"], json!(42));
757        assert_eq!(decoded.rows[0]["score"], json!(1.5));
758        assert_eq!(decoded.rows[0]["metadata"], json!({"priority": "high"}));
759        assert_eq!(decoded.rows[0]["payload"], json!([1, 2, 3]));
760        assert_eq!(decoded.rows[1]["metadata"], Value::Null);
761
762        let payload = decode_binary_snapshot_payload(bytes).unwrap();
763        let mut cursor = payload.row_cursor();
764        let mut first_row = Vec::new();
765        assert!(cursor
766            .read_next_row(|_, _, value| {
767                first_row.push(value);
768                Ok::<(), ProtocolError>(())
769            })
770            .unwrap());
771        assert_eq!(first_row[0], BorrowedBinarySnapshotCell::String("task-1"));
772        assert_eq!(first_row[1], BorrowedBinarySnapshotCell::Boolean(false));
773        assert_eq!(first_row[2], BorrowedBinarySnapshotCell::Integer(42));
774        assert_eq!(first_row[3], BorrowedBinarySnapshotCell::Float(1.5));
775        assert_eq!(
776            first_row[4],
777            BorrowedBinarySnapshotCell::Json(r#"{"priority":"high"}"#)
778        );
779        assert_eq!(first_row[5], BorrowedBinarySnapshotCell::Bytes(&[1, 2, 3]));
780
781        #[derive(Debug, PartialEq)]
782        enum RawCell<'a> {
783            Null,
784            String(&'a [u8]),
785            Integer(i64),
786            Float(f64),
787            Boolean(bool),
788            Json(&'a [u8]),
789            Bytes(&'a [u8]),
790        }
791
792        struct RawRecordingVisitor<'a> {
793            values: Vec<RawCell<'a>>,
794        }
795
796        impl<'a> BorrowedBinarySnapshotRawCellVisitor<'a> for RawRecordingVisitor<'a> {
797            type Error = ProtocolError;
798
799            fn visit_null(&mut self) -> Result<()> {
800                self.values.push(RawCell::Null);
801                Ok(())
802            }
803
804            fn visit_string_bytes(&mut self, value: &'a [u8]) -> Result<()> {
805                self.values.push(RawCell::String(value));
806                Ok(())
807            }
808
809            fn visit_integer(&mut self, value: i64) -> Result<()> {
810                self.values.push(RawCell::Integer(value));
811                Ok(())
812            }
813
814            fn visit_float(&mut self, value: f64) -> Result<()> {
815                self.values.push(RawCell::Float(value));
816                Ok(())
817            }
818
819            fn visit_boolean(&mut self, value: bool) -> Result<()> {
820                self.values.push(RawCell::Boolean(value));
821                Ok(())
822            }
823
824            fn visit_json_bytes(&mut self, value: &'a [u8]) -> Result<()> {
825                self.values.push(RawCell::Json(value));
826                Ok(())
827            }
828
829            fn visit_bytes(&mut self, value: &'a [u8]) -> Result<()> {
830                self.values.push(RawCell::Bytes(value));
831                Ok(())
832            }
833        }
834
835        let mut cursor = payload.row_cursor();
836        let mut raw_visitor = RawRecordingVisitor { values: Vec::new() };
837        assert!(cursor
838            .read_next_row_with_raw_visitor_trusted(&mut raw_visitor)
839            .unwrap());
840        assert_eq!(
841            raw_visitor.values,
842            vec![
843                RawCell::String(&b"task-1"[..]),
844                RawCell::Boolean(false),
845                RawCell::Integer(42),
846                RawCell::Float(1.5),
847                RawCell::Json(&br#"{"priority":"high"}"#[..]),
848                RawCell::Bytes(&[1, 2, 3]),
849            ]
850        );
851    }
852}