1use crate::util::EPOCH;
19use crate::{buffer::Writer, error::Error};
20use byteorder::{ByteOrder, LittleEndian};
21use chrono::{DateTime, Days, NaiveDate, NaiveDateTime};
22use std::collections::BTreeMap;
23use std::marker::PhantomData;
24
25use super::{
26 reader::{ArrayViewer, MapViewer},
27 writer::{ArrayWriter, MapWriter},
28};
29
30pub trait Row<'a> {
31 type ReadResult;
32
33 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error>;
34
35 fn cast(bytes: &'a [u8]) -> Self::ReadResult;
36}
37
38fn read_i8_from_bytes(bytes: &[u8]) -> i8 {
39 bytes[0] as i8
40}
41
42macro_rules! impl_row_for_number {
43 ($tt: tt, $writer: expr ,$visitor: expr) => {
44 impl<'a> Row<'a> for $tt {
45 type ReadResult = Self;
46
47 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
48 $writer(writer, *v);
49 Ok(())
50 }
51
52 fn cast(bytes: &[u8]) -> Self::ReadResult {
53 $visitor(bytes)
54 }
55 }
56 };
57}
58impl_row_for_number!(i8, Writer::write_i8, read_i8_from_bytes);
59impl_row_for_number!(i16, Writer::write_i16, LittleEndian::read_i16);
60impl_row_for_number!(i32, Writer::write_i32, LittleEndian::read_i32);
61impl_row_for_number!(i64, Writer::write_i64, LittleEndian::read_i64);
62impl_row_for_number!(f32, Writer::write_f32, LittleEndian::read_f32);
63impl_row_for_number!(f64, Writer::write_f64, LittleEndian::read_f64);
64
65impl<'a> Row<'a> for String {
66 type ReadResult = &'a str;
67
68 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
69 writer.write_bytes(v.as_bytes());
70 Ok(())
71 }
72
73 fn cast(bytes: &'a [u8]) -> Self::ReadResult {
74 unsafe { std::str::from_utf8_unchecked(bytes) }
75 }
76}
77
78impl Row<'_> for bool {
79 type ReadResult = Self;
80
81 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
82 writer.write_u8(if *v { 1 } else { 0 });
83 Ok(())
84 }
85
86 fn cast(bytes: &[u8]) -> Self::ReadResult {
87 bytes[0] == 1
88 }
89}
90
91impl Row<'_> for NaiveDate {
92 type ReadResult = Result<NaiveDate, Error>;
93
94 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
95 let days_since_epoch = v.signed_duration_since(EPOCH).num_days();
96 writer.write_u32(days_since_epoch as u32);
97 Ok(())
98 }
99
100 fn cast(bytes: &[u8]) -> Self::ReadResult {
101 let days = LittleEndian::read_u32(bytes);
102 EPOCH
103 .checked_add_days(Days::new(days.into()))
104 .ok_or(Error::invalid_data(format!(
105 "Date out of range, {days} days since epoch"
106 )))
107 }
108}
109
110impl Row<'_> for NaiveDateTime {
111 type ReadResult = Result<NaiveDateTime, Error>;
112
113 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
114 writer.write_i64(v.and_utc().timestamp_millis());
115 Ok(())
116 }
117
118 fn cast(bytes: &[u8]) -> Self::ReadResult {
119 let timestamp = LittleEndian::read_u64(bytes);
120 DateTime::from_timestamp_millis(timestamp as i64)
121 .map(|dt| dt.naive_utc())
122 .ok_or(Error::invalid_data(format!(
123 "Date out of range, timestamp:{timestamp}"
124 )))
125 }
126}
127
128impl<'a> Row<'a> for Vec<u8> {
129 type ReadResult = &'a [u8];
130
131 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
132 writer.write_bytes(v);
133 Ok(())
134 }
135
136 fn cast(bytes: &'a [u8]) -> Self::ReadResult {
137 bytes
138 }
139}
140
141pub struct ArrayGetter<'a, T> {
142 array_data: ArrayViewer<'a>,
143 _marker: PhantomData<T>,
144}
145
146#[allow(clippy::needless_lifetimes)]
147impl<'a, T: Row<'a>> ArrayGetter<'a, T> {
148 pub fn size(&self) -> usize {
149 self.array_data.num_elements()
150 }
151
152 pub fn get(&self, idx: usize) -> T::ReadResult {
153 if idx >= self.array_data.num_elements() {
154 panic!("out of bound");
155 }
156 let bytes = self.array_data.get_field_bytes(idx);
157 <T as Row>::cast(bytes)
158 }
159}
160
161#[allow(clippy::needless_lifetimes)]
162impl<'a, T: Row<'a>> Row<'a> for Vec<T> {
163 type ReadResult = ArrayGetter<'a, T>;
164
165 fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
166 let mut array_writer = ArrayWriter::new(v.len(), writer)?;
167 for (idx, item) in v.iter().enumerate() {
168 let callback_info = array_writer.write_start(idx);
169 <T as Row>::write(item, array_writer.get_writer())?;
170 array_writer.write_end(callback_info);
171 }
172 Ok(())
173 }
174
175 fn cast(row: &'a [u8]) -> Self::ReadResult {
176 ArrayGetter {
177 array_data: ArrayViewer::new(row),
178 _marker: PhantomData::<T>,
179 }
180 }
181}
182
183pub struct MapGetter<'a, T1, T2>
184where
185 T1: Ord,
186 T2: Ord,
187{
188 map_data: MapViewer<'a>,
189 _key_marker: PhantomData<T1>,
190 _value_marker: PhantomData<T2>,
191}
192
193impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> MapGetter<'a, T1, T2> {
194 pub fn to_btree_map(&'a self) -> Result<BTreeMap<T1::ReadResult, T2::ReadResult>, Error>
195 where
196 <T1 as Row<'a>>::ReadResult: Ord,
197 {
198 let mut map = BTreeMap::new();
199 let keys = self.keys();
200 let values = self.values();
201
202 for i in 0..self.keys().size() {
203 map.insert(keys.get(i), values.get(i));
204 }
205 Ok(map)
206 }
207
208 pub fn keys(&'a self) -> ArrayGetter<'a, T1> {
209 ArrayGetter {
210 array_data: ArrayViewer::new(self.map_data.get_key_row()),
211 _marker: PhantomData::<T1>,
212 }
213 }
214
215 pub fn values(&'a self) -> ArrayGetter<'a, T2> {
216 ArrayGetter {
217 array_data: ArrayViewer::new(self.map_data.get_value_row()),
218 _marker: PhantomData::<T2>,
219 }
220 }
221}
222
223#[allow(clippy::needless_lifetimes)]
224impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> Row<'a> for BTreeMap<T1, T2> {
225 type ReadResult = MapGetter<'a, T1, T2>;
226
227 fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
228 let mut map_writer = MapWriter::new(writer);
229 {
230 let callback_info = map_writer.write_start(0);
231 let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
232 for (idx, item) in v.keys().enumerate() {
233 let callback_info = array_writer.write_start(idx);
234 <T1 as Row>::write(item, array_writer.get_writer())?;
235 array_writer.write_end(callback_info);
236 }
237 map_writer.write_end(callback_info);
238 }
239 {
240 let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
241 for (idx, item) in v.values().enumerate() {
242 let callback_info = array_writer.write_start(idx);
243 <T2 as Row>::write(item, array_writer.get_writer())?;
244 array_writer.write_end(callback_info);
245 }
246 }
247 Ok(())
248 }
249
250 fn cast(row: &'a [u8]) -> Self::ReadResult {
251 MapGetter {
252 map_data: MapViewer::new(row),
253 _key_marker: PhantomData::<T1>,
254 _value_marker: PhantomData::<T2>,
255 }
256 }
257}