1use crate::error::{ProtocolError, Result};
2use serde_json::{Map, Number, Value};
3use std::fmt;
4
5const MAGIC: &[u8; 4] = b"SBT1";
6const VERSION: u16 = 1;
7const FLAG_NONE: u16 = 0;
8const COLUMN_FLAG_NULLABLE: u8 = 1;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum BinarySnapshotColumnType {
12 String,
13 Integer,
14 Float,
15 Boolean,
16 Json,
17 Bytes,
18}
19
20impl BinarySnapshotColumnType {
21 fn from_tag(tag: u8) -> Result<Self> {
22 match tag {
23 1 => Ok(Self::String),
24 2 => Ok(Self::Integer),
25 3 => Ok(Self::Float),
26 4 => Ok(Self::Boolean),
27 5 => Ok(Self::Json),
28 6 => Ok(Self::Bytes),
29 _ => Err(ProtocolError::message(format!(
30 "unsupported binary snapshot type tag: {tag}"
31 ))),
32 }
33 }
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct BinarySnapshotColumn {
38 pub name: String,
39 pub column_type: BinarySnapshotColumnType,
40 pub nullable: bool,
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct DecodedBinarySnapshotTable {
45 pub table: String,
46 pub columns: Vec<BinarySnapshotColumn>,
47 pub rows: Vec<Map<String, Value>>,
48}
49
50#[derive(Debug, Clone, PartialEq)]
51pub enum BinarySnapshotCell {
52 Null,
53 String(String),
54 Integer(i64),
55 Float(f64),
56 Boolean(bool),
57 Json(Value),
58 Bytes(Vec<u8>),
59}
60
61impl BinarySnapshotCell {
62 pub fn into_json_value(self) -> Value {
63 match self {
64 Self::Null => Value::Null,
65 Self::String(value) => Value::String(value),
66 Self::Integer(value) => Value::Number(Number::from(value)),
67 Self::Float(value) => Number::from_f64(value)
68 .map(Value::Number)
69 .unwrap_or(Value::Null),
70 Self::Boolean(value) => Value::Bool(value),
71 Self::Json(value) => value,
72 Self::Bytes(bytes) => Value::Array(
73 bytes
74 .into_iter()
75 .map(|byte| Value::Number(Number::from(byte)))
76 .collect(),
77 ),
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq)]
83pub struct DecodedBinarySnapshotRows {
84 pub table: String,
85 pub columns: Vec<BinarySnapshotColumn>,
86 pub rows: Vec<Vec<BinarySnapshotCell>>,
87}
88
89impl DecodedBinarySnapshotRows {
90 pub fn row_count(&self) -> usize {
91 self.rows.len()
92 }
93
94 pub fn into_value_rows(self) -> Vec<Value> {
95 self.into_maps().into_iter().map(Value::Object).collect()
96 }
97
98 pub fn into_maps(self) -> Vec<Map<String, Value>> {
99 let columns = self.columns;
100 self.rows
101 .into_iter()
102 .map(|row| {
103 columns
104 .iter()
105 .zip(row)
106 .map(|(column, value)| (column.name.clone(), value.into_json_value()))
107 .collect()
108 })
109 .collect()
110 }
111}
112
113#[derive(Debug, Clone, PartialEq)]
114pub struct BinarySnapshotPayload {
115 bytes: Vec<u8>,
116 pub table: String,
117 pub columns: Vec<BinarySnapshotColumn>,
118 pub row_count: usize,
119 rows_offset: usize,
120}
121
122impl BinarySnapshotPayload {
123 pub fn row_count(&self) -> usize {
124 self.row_count
125 }
126
127 pub fn bytes(&self) -> &[u8] {
128 &self.bytes
129 }
130
131 pub fn row_cursor(&self) -> BinarySnapshotRowCursor<'_> {
132 BinarySnapshotRowCursor {
133 reader: BinarySnapshotReader {
134 bytes: &self.bytes,
135 offset: self.rows_offset,
136 },
137 columns: &self.columns,
138 null_bitmap_bytes: self.columns.len().div_ceil(8),
139 remaining: self.row_count,
140 }
141 }
142
143 pub fn into_decoded_rows(self) -> Result<DecodedBinarySnapshotRows> {
144 decode_binary_snapshot_rows(&self.bytes)
145 }
146
147 pub fn into_value_rows(self) -> Result<Vec<Value>> {
148 self.into_decoded_rows()
149 .map(DecodedBinarySnapshotRows::into_value_rows)
150 }
151}
152
153#[derive(Debug, Clone, Copy, PartialEq)]
154pub enum BorrowedBinarySnapshotCell<'a> {
155 Null,
156 String(&'a str),
157 Integer(i64),
158 Float(f64),
159 Boolean(bool),
160 Json(&'a str),
161 Bytes(&'a [u8]),
162}
163
164#[derive(Debug)]
165pub enum BinarySnapshotVisitError<E> {
166 Protocol(ProtocolError),
167 Visitor(E),
168}
169
170impl<E> BinarySnapshotVisitError<E> {
171 fn protocol(error: ProtocolError) -> Self {
172 Self::Protocol(error)
173 }
174
175 fn visitor(error: E) -> Self {
176 Self::Visitor(error)
177 }
178}
179
180impl<E: fmt::Display> fmt::Display for BinarySnapshotVisitError<E> {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 match self {
183 Self::Protocol(error) => error.fmt(f),
184 Self::Visitor(error) => error.fmt(f),
185 }
186 }
187}
188
189impl<E> std::error::Error for BinarySnapshotVisitError<E> where E: std::error::Error + 'static {}
190
191pub trait BorrowedBinarySnapshotRawCellVisitor<'a> {
192 type Error;
193
194 fn visit_null(&mut self) -> std::result::Result<(), Self::Error>;
195 fn visit_string_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error>;
196 fn visit_integer(&mut self, value: i64) -> std::result::Result<(), Self::Error>;
197 fn visit_float(&mut self, value: f64) -> std::result::Result<(), Self::Error>;
198 fn visit_boolean(&mut self, value: bool) -> std::result::Result<(), Self::Error>;
199 fn visit_json_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error>;
200 fn visit_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error>;
201}
202
203pub struct BinarySnapshotRowCursor<'a> {
204 reader: BinarySnapshotReader<'a>,
205 columns: &'a [BinarySnapshotColumn],
206 null_bitmap_bytes: usize,
207 remaining: usize,
208}
209
210impl<'a> BinarySnapshotRowCursor<'a> {
211 pub fn read_next_row<F, E>(
212 &mut self,
213 mut on_cell: F,
214 ) -> std::result::Result<bool, BinarySnapshotVisitError<E>>
215 where
216 F: FnMut(
217 usize,
218 &BinarySnapshotColumn,
219 BorrowedBinarySnapshotCell<'a>,
220 ) -> std::result::Result<(), E>,
221 {
222 if self.remaining == 0 {
223 return Ok(false);
224 }
225
226 let null_bitmap = self
227 .reader
228 .read_bytes(self.null_bitmap_bytes, "binary snapshot row null bitmap")
229 .map_err(BinarySnapshotVisitError::protocol)?;
230 for (column_index, column) in self.columns.iter().enumerate() {
231 let is_null = null_bitmap[column_index / 8] & (1u8 << (column_index % 8)) != 0;
232 if is_null {
233 if !column.nullable {
234 return Err(BinarySnapshotVisitError::protocol(ProtocolError::message(
235 format!("binary snapshot column {} is not nullable", column.name),
236 )));
237 }
238 on_cell(column_index, column, BorrowedBinarySnapshotCell::Null)
239 .map_err(BinarySnapshotVisitError::visitor)?;
240 continue;
241 }
242 let value = self
243 .reader
244 .read_borrowed_cell(column.column_type, &column.name)
245 .map_err(BinarySnapshotVisitError::protocol)?;
246 on_cell(column_index, column, value).map_err(BinarySnapshotVisitError::visitor)?;
247 }
248 self.remaining -= 1;
249 Ok(true)
250 }
251
252 pub fn read_next_row_with_raw_visitor_trusted<V>(
253 &mut self,
254 visitor: &mut V,
255 ) -> std::result::Result<bool, BinarySnapshotVisitError<V::Error>>
256 where
257 V: BorrowedBinarySnapshotRawCellVisitor<'a>,
258 {
259 if self.remaining == 0 {
260 return Ok(false);
261 }
262
263 let null_bitmap = self
264 .reader
265 .read_bytes(self.null_bitmap_bytes, "binary snapshot row null bitmap")
266 .map_err(BinarySnapshotVisitError::protocol)?;
267 for (column_index, column) in self.columns.iter().enumerate() {
268 let is_null = null_bitmap[column_index / 8] & (1u8 << (column_index % 8)) != 0;
269 if is_null {
270 visitor
271 .visit_null()
272 .map_err(BinarySnapshotVisitError::visitor)?;
273 continue;
274 }
275 self.reader
276 .visit_raw_cell_trusted(column.column_type, visitor)?;
277 }
278 self.remaining -= 1;
279 Ok(true)
280 }
281
282 pub fn assert_done(&self) -> Result<()> {
283 self.reader.assert_done()
284 }
285}
286
287pub fn decode_binary_snapshot_table(bytes: &[u8]) -> Result<DecodedBinarySnapshotTable> {
288 let DecodedBinarySnapshotRows {
289 table,
290 columns,
291 rows,
292 } = decode_binary_snapshot_rows(bytes)?;
293 let value_rows = rows
294 .into_iter()
295 .map(|row| {
296 columns
297 .iter()
298 .zip(row)
299 .map(|(column, value)| (column.name.clone(), value.into_json_value()))
300 .collect()
301 })
302 .collect();
303 Ok(DecodedBinarySnapshotTable {
304 table,
305 columns,
306 rows: value_rows,
307 })
308}
309
310pub fn decode_binary_snapshot_rows(bytes: &[u8]) -> Result<DecodedBinarySnapshotRows> {
311 let mut reader = BinarySnapshotReader::new(bytes);
312 let (table, columns, row_count, _) = read_binary_snapshot_header(&mut reader)?;
313 let null_bitmap_bytes = columns.len().div_ceil(8);
314 let mut rows = Vec::with_capacity(row_count);
315 for _ in 0..row_count {
316 let null_bitmap =
317 reader.read_bytes(null_bitmap_bytes, "binary snapshot row null bitmap")?;
318 let mut row = Vec::with_capacity(columns.len());
319 for (column_index, column) in columns.iter().enumerate() {
320 let is_null = null_bitmap[column_index / 8] & (1u8 << (column_index % 8) as u32) != 0;
321 if is_null {
322 if !column.nullable {
323 return Err(ProtocolError::message(format!(
324 "binary snapshot column {} is not nullable",
325 column.name
326 )));
327 }
328 row.push(BinarySnapshotCell::Null);
329 continue;
330 }
331 row.push(reader.read_cell(column.column_type, &column.name)?);
332 }
333 rows.push(row);
334 }
335 reader.assert_done()?;
336
337 Ok(DecodedBinarySnapshotRows {
338 table,
339 columns,
340 rows,
341 })
342}
343
344pub fn decode_binary_snapshot_payload(bytes: Vec<u8>) -> Result<BinarySnapshotPayload> {
345 let mut reader = BinarySnapshotReader::new(&bytes);
346 let (table, columns, row_count, rows_offset) = read_binary_snapshot_header(&mut reader)?;
347 Ok(BinarySnapshotPayload {
348 bytes,
349 table,
350 columns,
351 row_count,
352 rows_offset,
353 })
354}
355
356fn read_binary_snapshot_header(
357 reader: &mut BinarySnapshotReader<'_>,
358) -> Result<(String, Vec<BinarySnapshotColumn>, usize, usize)> {
359 reader.expect_magic(MAGIC, "binary snapshot table")?;
360
361 let version = reader.read_u16("binary snapshot version")?;
362 if version != VERSION {
363 return Err(ProtocolError::message(format!(
364 "unsupported binary snapshot version: {version}"
365 )));
366 }
367 let flags = reader.read_u16("binary snapshot flags")?;
368 if flags != FLAG_NONE {
369 return Err(ProtocolError::message(format!(
370 "unsupported binary snapshot flags: {flags}"
371 )));
372 }
373
374 let table = reader.read_string16("binary snapshot table name")?;
375 let column_count = reader.read_u16("binary snapshot column count")? as usize;
376 let mut columns = Vec::with_capacity(column_count);
377 for _ in 0..column_count {
378 let name = reader.read_string16("binary snapshot column name")?;
379 let column_type =
380 BinarySnapshotColumnType::from_tag(reader.read_u8("binary snapshot column type")?)?;
381 let column_flags = reader.read_u8("binary snapshot column flags")?;
382 if column_flags & !COLUMN_FLAG_NULLABLE != 0 {
383 return Err(ProtocolError::message(format!(
384 "unsupported binary snapshot column flags: {column_flags}"
385 )));
386 }
387 columns.push(BinarySnapshotColumn {
388 name,
389 column_type,
390 nullable: column_flags & COLUMN_FLAG_NULLABLE != 0,
391 });
392 }
393
394 let row_count = reader.read_u32("binary snapshot row count")? as usize;
395 let rows_offset = reader.offset;
396 Ok((table, columns, row_count, rows_offset))
397}
398
399struct BinarySnapshotReader<'a> {
400 bytes: &'a [u8],
401 offset: usize,
402}
403
404impl<'a> BinarySnapshotReader<'a> {
405 fn new(bytes: &'a [u8]) -> Self {
406 Self { bytes, offset: 0 }
407 }
408
409 fn expect_magic(&mut self, magic: &[u8], label: &str) -> Result<()> {
410 let actual = self.read_bytes(magic.len(), &format!("{label} magic"))?;
411 if actual != magic {
412 return Err(ProtocolError::message(format!("unexpected {label} magic")));
413 }
414 Ok(())
415 }
416
417 fn read_u8(&mut self, label: &str) -> Result<u8> {
418 self.require(1, label)?;
419 let value = self.bytes[self.offset];
420 self.offset += 1;
421 Ok(value)
422 }
423
424 fn read_u16(&mut self, label: &str) -> Result<u16> {
425 self.require(2, label)?;
426 let value = u16::from_le_bytes(
427 self.bytes[self.offset..self.offset + 2]
428 .try_into()
429 .expect("slice length checked"),
430 );
431 self.offset += 2;
432 Ok(value)
433 }
434
435 fn read_u32(&mut self, label: &str) -> Result<u32> {
436 self.require(4, label)?;
437 let value = u32::from_le_bytes(
438 self.bytes[self.offset..self.offset + 4]
439 .try_into()
440 .expect("slice length checked"),
441 );
442 self.offset += 4;
443 Ok(value)
444 }
445
446 fn read_i64(&mut self, label: &str) -> Result<i64> {
447 self.require(8, label)?;
448 let value = i64::from_le_bytes(
449 self.bytes[self.offset..self.offset + 8]
450 .try_into()
451 .expect("slice length checked"),
452 );
453 self.offset += 8;
454 Ok(value)
455 }
456
457 fn read_f64(&mut self, label: &str) -> Result<f64> {
458 self.require(8, label)?;
459 let value = f64::from_le_bytes(
460 self.bytes[self.offset..self.offset + 8]
461 .try_into()
462 .expect("slice length checked"),
463 );
464 self.offset += 8;
465 Ok(value)
466 }
467
468 fn read_string16(&mut self, label: &str) -> Result<String> {
469 let len = self.read_u16(&format!("{label} length"))? as usize;
470 let bytes = self.read_bytes(len, label)?;
471 String::from_utf8(bytes.to_vec())
472 .map_err(|err| ProtocolError::message(format!("decode {label} as utf8: {err}")))
473 }
474
475 fn read_string32(&mut self, label: &str) -> Result<String> {
476 let len = self.read_u32(&format!("{label} length"))? as usize;
477 let bytes = self.read_bytes(len, label)?;
478 String::from_utf8(bytes.to_vec())
479 .map_err(|err| ProtocolError::message(format!("decode {label} as utf8: {err}")))
480 }
481
482 fn read_str32(&mut self, label: &str) -> Result<&'a str> {
483 let len = self.read_u32(&format!("{label} length"))? as usize;
484 let bytes = self.read_bytes(len, label)?;
485 std::str::from_utf8(bytes)
486 .map_err(|err| ProtocolError::message(format!("decode {label} as utf8: {err}")))
487 }
488
489 fn read_bytes32(&mut self, label: &str) -> Result<&'a [u8]> {
490 let len = self.read_u32(&format!("{label} length"))? as usize;
491 self.read_bytes(len, label)
492 }
493
494 fn read_bytes(&mut self, len: usize, label: &str) -> Result<&'a [u8]> {
495 self.require(len, label)?;
496 let bytes = &self.bytes[self.offset..self.offset + len];
497 self.offset += len;
498 Ok(bytes)
499 }
500
501 fn read_cell(
502 &mut self,
503 column_type: BinarySnapshotColumnType,
504 column: &str,
505 ) -> Result<BinarySnapshotCell> {
506 match column_type {
507 BinarySnapshotColumnType::String => Ok(BinarySnapshotCell::String(
508 self.read_string32("binary snapshot string")?,
509 )),
510 BinarySnapshotColumnType::Integer => Ok(BinarySnapshotCell::Integer(
511 self.read_i64("binary snapshot integer")?,
512 )),
513 BinarySnapshotColumnType::Float => {
514 let value = self.read_f64("binary snapshot float")?;
515 Number::from_f64(value).ok_or_else(|| {
516 ProtocolError::message(format!(
517 "binary snapshot {column} contained non-finite float"
518 ))
519 })?;
520 Ok(BinarySnapshotCell::Float(value))
521 }
522 BinarySnapshotColumnType::Boolean => {
523 let value = self.read_u8("binary snapshot boolean")?;
524 match value {
525 0 => Ok(BinarySnapshotCell::Boolean(false)),
526 1 => Ok(BinarySnapshotCell::Boolean(true)),
527 _ => Err(ProtocolError::message(format!(
528 "binary snapshot {column} expected boolean byte"
529 ))),
530 }
531 }
532 BinarySnapshotColumnType::Json => {
533 let value = self.read_string32("binary snapshot json")?;
534 Ok(BinarySnapshotCell::Json(serde_json::from_str(&value)?))
535 }
536 BinarySnapshotColumnType::Bytes => {
537 let len = self.read_u32("binary snapshot bytes length")? as usize;
538 let bytes = self.read_bytes(len, "binary snapshot bytes")?;
539 Ok(BinarySnapshotCell::Bytes(bytes.to_vec()))
540 }
541 }
542 }
543
544 fn read_borrowed_cell(
545 &mut self,
546 column_type: BinarySnapshotColumnType,
547 column: &str,
548 ) -> Result<BorrowedBinarySnapshotCell<'a>> {
549 match column_type {
550 BinarySnapshotColumnType::String => Ok(BorrowedBinarySnapshotCell::String(
551 self.read_str32("binary snapshot string")?,
552 )),
553 BinarySnapshotColumnType::Integer => Ok(BorrowedBinarySnapshotCell::Integer(
554 self.read_i64("binary snapshot integer")?,
555 )),
556 BinarySnapshotColumnType::Float => {
557 let value = self.read_f64("binary snapshot float")?;
558 Number::from_f64(value).ok_or_else(|| {
559 ProtocolError::message(format!(
560 "binary snapshot {column} contained non-finite float"
561 ))
562 })?;
563 Ok(BorrowedBinarySnapshotCell::Float(value))
564 }
565 BinarySnapshotColumnType::Boolean => {
566 let value = self.read_u8("binary snapshot boolean")?;
567 match value {
568 0 => Ok(BorrowedBinarySnapshotCell::Boolean(false)),
569 1 => Ok(BorrowedBinarySnapshotCell::Boolean(true)),
570 _ => Err(ProtocolError::message(format!(
571 "binary snapshot {column} expected boolean byte"
572 ))),
573 }
574 }
575 BinarySnapshotColumnType::Json => Ok(BorrowedBinarySnapshotCell::Json(
576 self.read_str32("binary snapshot json")?,
577 )),
578 BinarySnapshotColumnType::Bytes => {
579 let len = self.read_u32("binary snapshot bytes length")? as usize;
580 let bytes = self.read_bytes(len, "binary snapshot bytes")?;
581 Ok(BorrowedBinarySnapshotCell::Bytes(bytes))
582 }
583 }
584 }
585
586 fn visit_raw_cell_trusted<V>(
587 &mut self,
588 column_type: BinarySnapshotColumnType,
589 visitor: &mut V,
590 ) -> std::result::Result<(), BinarySnapshotVisitError<V::Error>>
591 where
592 V: BorrowedBinarySnapshotRawCellVisitor<'a>,
593 {
594 match column_type {
595 BinarySnapshotColumnType::String => {
596 let value = self
597 .read_bytes32("binary snapshot string")
598 .map_err(BinarySnapshotVisitError::protocol)?;
599 visitor
600 .visit_string_bytes(value)
601 .map_err(BinarySnapshotVisitError::visitor)
602 }
603 BinarySnapshotColumnType::Integer => {
604 let value = self
605 .read_i64("binary snapshot integer")
606 .map_err(BinarySnapshotVisitError::protocol)?;
607 visitor
608 .visit_integer(value)
609 .map_err(BinarySnapshotVisitError::visitor)
610 }
611 BinarySnapshotColumnType::Float => {
612 let value = self
613 .read_f64("binary snapshot float")
614 .map_err(BinarySnapshotVisitError::protocol)?;
615 if !value.is_finite() {
616 return Err(BinarySnapshotVisitError::protocol(ProtocolError::message(
617 "binary snapshot contained non-finite float",
618 )));
619 }
620 visitor
621 .visit_float(value)
622 .map_err(BinarySnapshotVisitError::visitor)
623 }
624 BinarySnapshotColumnType::Boolean => {
625 let value = self
626 .read_u8("binary snapshot boolean")
627 .map_err(BinarySnapshotVisitError::protocol)?;
628 match value {
629 0 => visitor
630 .visit_boolean(false)
631 .map_err(BinarySnapshotVisitError::visitor),
632 1 => visitor
633 .visit_boolean(true)
634 .map_err(BinarySnapshotVisitError::visitor),
635 _ => Err(BinarySnapshotVisitError::protocol(ProtocolError::message(
636 "binary snapshot expected boolean byte",
637 ))),
638 }
639 }
640 BinarySnapshotColumnType::Json => {
641 let value = self
642 .read_bytes32("binary snapshot json")
643 .map_err(BinarySnapshotVisitError::protocol)?;
644 visitor
645 .visit_json_bytes(value)
646 .map_err(BinarySnapshotVisitError::visitor)
647 }
648 BinarySnapshotColumnType::Bytes => {
649 let len =
650 self.read_u32("binary snapshot bytes length")
651 .map_err(BinarySnapshotVisitError::protocol)? as usize;
652 let bytes = self
653 .read_bytes(len, "binary snapshot bytes")
654 .map_err(BinarySnapshotVisitError::protocol)?;
655 visitor
656 .visit_bytes(bytes)
657 .map_err(BinarySnapshotVisitError::visitor)
658 }
659 }
660 }
661
662 fn assert_done(&self) -> Result<()> {
663 if self.offset != self.bytes.len() {
664 return Err(ProtocolError::message(
665 "binary snapshot payload has trailing bytes",
666 ));
667 }
668 Ok(())
669 }
670
671 fn require(&self, len: usize, label: &str) -> Result<()> {
672 if self.offset + len > self.bytes.len() {
673 return Err(ProtocolError::message(format!(
674 "{label} exceeds binary snapshot payload bounds"
675 )));
676 }
677 Ok(())
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684 use serde_json::json;
685
686 fn push_u16(bytes: &mut Vec<u8>, value: u16) {
687 bytes.extend_from_slice(&value.to_le_bytes());
688 }
689
690 fn push_u32(bytes: &mut Vec<u8>, value: u32) {
691 bytes.extend_from_slice(&value.to_le_bytes());
692 }
693
694 fn push_i64(bytes: &mut Vec<u8>, value: i64) {
695 bytes.extend_from_slice(&value.to_le_bytes());
696 }
697
698 fn push_f64(bytes: &mut Vec<u8>, value: f64) {
699 bytes.extend_from_slice(&value.to_le_bytes());
700 }
701
702 fn push_string16(bytes: &mut Vec<u8>, value: &str) {
703 push_u16(bytes, value.len() as u16);
704 bytes.extend_from_slice(value.as_bytes());
705 }
706
707 fn push_string32(bytes: &mut Vec<u8>, value: &str) {
708 push_u32(bytes, value.len() as u32);
709 bytes.extend_from_slice(value.as_bytes());
710 }
711
712 #[test]
713 fn decodes_binary_snapshot_table_rows() {
714 let mut bytes = Vec::new();
715 bytes.extend_from_slice(b"SBT1");
716 push_u16(&mut bytes, 1);
717 push_u16(&mut bytes, 0);
718 push_string16(&mut bytes, "tasks");
719 push_u16(&mut bytes, 6);
720 for (name, tag, flags) in [
721 ("id", 1u8, 0u8),
722 ("completed", 4, 0),
723 ("server_version", 2, 0),
724 ("score", 3, 0),
725 ("metadata", 5, COLUMN_FLAG_NULLABLE),
726 ("payload", 6, 0),
727 ] {
728 push_string16(&mut bytes, name);
729 bytes.push(tag);
730 bytes.push(flags);
731 }
732 push_u32(&mut bytes, 2);
733
734 bytes.push(0);
735 push_string32(&mut bytes, "task-1");
736 bytes.push(0);
737 push_i64(&mut bytes, 42);
738 push_f64(&mut bytes, 1.5);
739 push_string32(&mut bytes, r#"{"priority":"high"}"#);
740 push_u32(&mut bytes, 3);
741 bytes.extend_from_slice(&[1, 2, 3]);
742
743 bytes.push(1 << 4);
744 push_string32(&mut bytes, "task-2");
745 bytes.push(1);
746 push_i64(&mut bytes, 43);
747 push_f64(&mut bytes, 2.25);
748 push_u32(&mut bytes, 0);
749
750 let decoded = decode_binary_snapshot_table(&bytes).unwrap();
751
752 assert_eq!(decoded.table, "tasks");
753 assert_eq!(decoded.columns.len(), 6);
754 assert_eq!(decoded.rows[0]["id"], json!("task-1"));
755 assert_eq!(decoded.rows[0]["completed"], json!(false));
756 assert_eq!(decoded.rows[0]["server_version"], json!(42));
757 assert_eq!(decoded.rows[0]["score"], json!(1.5));
758 assert_eq!(decoded.rows[0]["metadata"], json!({"priority": "high"}));
759 assert_eq!(decoded.rows[0]["payload"], json!([1, 2, 3]));
760 assert_eq!(decoded.rows[1]["metadata"], Value::Null);
761
762 let payload = decode_binary_snapshot_payload(bytes).unwrap();
763 let mut cursor = payload.row_cursor();
764 let mut first_row = Vec::new();
765 assert!(cursor
766 .read_next_row(|_, _, value| {
767 first_row.push(value);
768 Ok::<(), ProtocolError>(())
769 })
770 .unwrap());
771 assert_eq!(first_row[0], BorrowedBinarySnapshotCell::String("task-1"));
772 assert_eq!(first_row[1], BorrowedBinarySnapshotCell::Boolean(false));
773 assert_eq!(first_row[2], BorrowedBinarySnapshotCell::Integer(42));
774 assert_eq!(first_row[3], BorrowedBinarySnapshotCell::Float(1.5));
775 assert_eq!(
776 first_row[4],
777 BorrowedBinarySnapshotCell::Json(r#"{"priority":"high"}"#)
778 );
779 assert_eq!(first_row[5], BorrowedBinarySnapshotCell::Bytes(&[1, 2, 3]));
780
781 #[derive(Debug, PartialEq)]
782 enum RawCell<'a> {
783 Null,
784 String(&'a [u8]),
785 Integer(i64),
786 Float(f64),
787 Boolean(bool),
788 Json(&'a [u8]),
789 Bytes(&'a [u8]),
790 }
791
792 struct RawRecordingVisitor<'a> {
793 values: Vec<RawCell<'a>>,
794 }
795
796 impl<'a> BorrowedBinarySnapshotRawCellVisitor<'a> for RawRecordingVisitor<'a> {
797 type Error = ProtocolError;
798
799 fn visit_null(&mut self) -> Result<()> {
800 self.values.push(RawCell::Null);
801 Ok(())
802 }
803
804 fn visit_string_bytes(&mut self, value: &'a [u8]) -> Result<()> {
805 self.values.push(RawCell::String(value));
806 Ok(())
807 }
808
809 fn visit_integer(&mut self, value: i64) -> Result<()> {
810 self.values.push(RawCell::Integer(value));
811 Ok(())
812 }
813
814 fn visit_float(&mut self, value: f64) -> Result<()> {
815 self.values.push(RawCell::Float(value));
816 Ok(())
817 }
818
819 fn visit_boolean(&mut self, value: bool) -> Result<()> {
820 self.values.push(RawCell::Boolean(value));
821 Ok(())
822 }
823
824 fn visit_json_bytes(&mut self, value: &'a [u8]) -> Result<()> {
825 self.values.push(RawCell::Json(value));
826 Ok(())
827 }
828
829 fn visit_bytes(&mut self, value: &'a [u8]) -> Result<()> {
830 self.values.push(RawCell::Bytes(value));
831 Ok(())
832 }
833 }
834
835 let mut cursor = payload.row_cursor();
836 let mut raw_visitor = RawRecordingVisitor { values: Vec::new() };
837 assert!(cursor
838 .read_next_row_with_raw_visitor_trusted(&mut raw_visitor)
839 .unwrap());
840 assert_eq!(
841 raw_visitor.values,
842 vec![
843 RawCell::String(&b"task-1"[..]),
844 RawCell::Boolean(false),
845 RawCell::Integer(42),
846 RawCell::Float(1.5),
847 RawCell::Json(&br#"{"priority":"high"}"#[..]),
848 RawCell::Bytes(&[1, 2, 3]),
849 ]
850 );
851 }
852}