Skip to main content

syncular_runtime/core/
binary_snapshot.rs

1use crate::error::{Result, SyncularError};
2use serde_json::Value;
3use std::ops::Deref;
4use syncular_protocol::binary_snapshot as protocol_binary_snapshot;
5
6pub use protocol_binary_snapshot::{
7    BinarySnapshotCell, BinarySnapshotColumn, BinarySnapshotColumnType, BorrowedBinarySnapshotCell,
8    DecodedBinarySnapshotRows, DecodedBinarySnapshotTable,
9};
10
11#[derive(Debug, Clone, PartialEq)]
12pub struct BinarySnapshotPayload(protocol_binary_snapshot::BinarySnapshotPayload);
13
14impl BinarySnapshotPayload {
15    pub fn row_count(&self) -> usize {
16        self.0.row_count()
17    }
18
19    pub fn bytes(&self) -> &[u8] {
20        self.0.bytes()
21    }
22
23    pub fn row_cursor(&self) -> BinarySnapshotRowCursor<'_> {
24        BinarySnapshotRowCursor {
25            inner: self.0.row_cursor(),
26        }
27    }
28
29    pub fn into_decoded_rows(self) -> Result<DecodedBinarySnapshotRows> {
30        Ok(self.0.into_decoded_rows()?)
31    }
32
33    pub fn into_value_rows(self) -> Result<Vec<Value>> {
34        Ok(self.0.into_value_rows()?)
35    }
36}
37
38impl Deref for BinarySnapshotPayload {
39    type Target = protocol_binary_snapshot::BinarySnapshotPayload;
40
41    fn deref(&self) -> &Self::Target {
42        &self.0
43    }
44}
45
46impl From<protocol_binary_snapshot::BinarySnapshotPayload> for BinarySnapshotPayload {
47    fn from(payload: protocol_binary_snapshot::BinarySnapshotPayload) -> Self {
48        Self(payload)
49    }
50}
51
52pub trait BorrowedBinarySnapshotRawCellVisitor<'a> {
53    fn visit_null(&mut self) -> Result<()>;
54    fn visit_string_bytes(&mut self, value: &'a [u8]) -> Result<()>;
55    fn visit_integer(&mut self, value: i64) -> Result<()>;
56    fn visit_float(&mut self, value: f64) -> Result<()>;
57    fn visit_boolean(&mut self, value: bool) -> Result<()>;
58    fn visit_json_bytes(&mut self, value: &'a [u8]) -> Result<()>;
59    fn visit_bytes(&mut self, value: &'a [u8]) -> Result<()>;
60}
61
62pub struct BinarySnapshotRowCursor<'a> {
63    inner: protocol_binary_snapshot::BinarySnapshotRowCursor<'a>,
64}
65
66impl<'a> BinarySnapshotRowCursor<'a> {
67    pub fn read_next_row<F>(&mut self, mut on_cell: F) -> Result<bool>
68    where
69        F: FnMut(usize, &BinarySnapshotColumn, BorrowedBinarySnapshotCell<'a>) -> Result<()>,
70    {
71        self.inner
72            .read_next_row(|column_index, column, cell| on_cell(column_index, column, cell))
73            .map_err(syncular_error_from_visit)
74    }
75
76    pub fn read_next_row_with_raw_visitor_trusted<V>(&mut self, visitor: &mut V) -> Result<bool>
77    where
78        V: BorrowedBinarySnapshotRawCellVisitor<'a>,
79    {
80        let mut adapter = RawCellVisitorAdapter { visitor };
81        self.inner
82            .read_next_row_with_raw_visitor_trusted(&mut adapter)
83            .map_err(syncular_error_from_visit)
84    }
85
86    pub fn assert_done(&self) -> Result<()> {
87        Ok(self.inner.assert_done()?)
88    }
89}
90
91struct RawCellVisitorAdapter<'v, V> {
92    visitor: &'v mut V,
93}
94
95impl<'a, V> protocol_binary_snapshot::BorrowedBinarySnapshotRawCellVisitor<'a>
96    for RawCellVisitorAdapter<'_, V>
97where
98    V: BorrowedBinarySnapshotRawCellVisitor<'a>,
99{
100    type Error = SyncularError;
101
102    fn visit_null(&mut self) -> std::result::Result<(), Self::Error> {
103        self.visitor.visit_null()
104    }
105
106    fn visit_string_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error> {
107        self.visitor.visit_string_bytes(value)
108    }
109
110    fn visit_integer(&mut self, value: i64) -> std::result::Result<(), Self::Error> {
111        self.visitor.visit_integer(value)
112    }
113
114    fn visit_float(&mut self, value: f64) -> std::result::Result<(), Self::Error> {
115        self.visitor.visit_float(value)
116    }
117
118    fn visit_boolean(&mut self, value: bool) -> std::result::Result<(), Self::Error> {
119        self.visitor.visit_boolean(value)
120    }
121
122    fn visit_json_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error> {
123        self.visitor.visit_json_bytes(value)
124    }
125
126    fn visit_bytes(&mut self, value: &'a [u8]) -> std::result::Result<(), Self::Error> {
127        self.visitor.visit_bytes(value)
128    }
129}
130
131#[derive(Debug, Clone, PartialEq)]
132pub enum SnapshotChunkRows {
133    Json(Vec<Value>),
134    Binary(DecodedBinarySnapshotRows),
135    BinaryPayload(BinarySnapshotPayload),
136}
137
138impl SnapshotChunkRows {
139    pub fn row_count(&self) -> usize {
140        match self {
141            Self::Json(rows) => rows.len(),
142            Self::Binary(rows) => rows.row_count(),
143            Self::BinaryPayload(rows) => rows.row_count(),
144        }
145    }
146
147    pub fn is_empty(&self) -> bool {
148        self.row_count() == 0
149    }
150
151    pub fn into_value_rows(self) -> Vec<Value> {
152        self.try_into_value_rows()
153            .expect("binary snapshot payload decodes into JSON rows")
154    }
155
156    pub fn try_into_value_rows(self) -> Result<Vec<Value>> {
157        match self {
158            Self::Json(rows) => Ok(rows),
159            Self::Binary(rows) => Ok(rows.into_value_rows()),
160            Self::BinaryPayload(rows) => rows.into_value_rows(),
161        }
162    }
163}
164
165pub fn decode_binary_snapshot_table(bytes: &[u8]) -> Result<DecodedBinarySnapshotTable> {
166    Ok(protocol_binary_snapshot::decode_binary_snapshot_table(
167        bytes,
168    )?)
169}
170
171pub fn decode_binary_snapshot_rows(bytes: &[u8]) -> Result<DecodedBinarySnapshotRows> {
172    Ok(protocol_binary_snapshot::decode_binary_snapshot_rows(
173        bytes,
174    )?)
175}
176
177pub fn decode_binary_snapshot_payload(bytes: Vec<u8>) -> Result<BinarySnapshotPayload> {
178    Ok(protocol_binary_snapshot::decode_binary_snapshot_payload(bytes)?.into())
179}
180
181fn syncular_error_from_visit(
182    error: protocol_binary_snapshot::BinarySnapshotVisitError<SyncularError>,
183) -> SyncularError {
184    match error {
185        protocol_binary_snapshot::BinarySnapshotVisitError::Protocol(error) => error.into(),
186        protocol_binary_snapshot::BinarySnapshotVisitError::Visitor(error) => error,
187    }
188}