1use crate::util::EPOCH;
19use crate::{buffer::Writer, error::Error};
20use byteorder::{ByteOrder, LittleEndian};
21use chrono::{DateTime, Days, NaiveDate, NaiveDateTime};
22use std::collections::BTreeMap;
23use std::marker::PhantomData;
24
25use super::{
26 reader::{ArrayViewer, MapViewer},
27 writer::{ArrayWriter, MapWriter},
28};
29
30pub trait Row<'a> {
31 type ReadResult;
32
33 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error>;
34
35 fn cast(bytes: &'a [u8]) -> Self::ReadResult;
36}
37
38fn read_i8_from_bytes(bytes: &[u8]) -> i8 {
39 bytes[0] as i8
40}
41
42macro_rules! impl_row_for_number {
43 ($tt: tt, $writer: expr ,$visitor: expr) => {
44 impl<'a> Row<'a> for $tt {
45 type ReadResult = Self;
46
47 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
48 $writer(writer, *v);
49 Ok(())
50 }
51
52 fn cast(bytes: &[u8]) -> Self::ReadResult {
53 $visitor(bytes)
54 }
55 }
56 };
57}
58impl_row_for_number!(i8, Writer::write_i8, read_i8_from_bytes);
59impl_row_for_number!(i16, Writer::write_i16, LittleEndian::read_i16);
60impl_row_for_number!(i32, Writer::write_i32, LittleEndian::read_i32);
61impl_row_for_number!(i64, Writer::write_i64, LittleEndian::read_i64);
62impl_row_for_number!(f32, Writer::write_f32, LittleEndian::read_f32);
63impl_row_for_number!(f64, Writer::write_f64, LittleEndian::read_f64);
64
65impl<'a> Row<'a> for String {
66 type ReadResult = &'a str;
67
68 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
69 writer.write_bytes(v.as_bytes());
70 Ok(())
71 }
72
73 fn cast(bytes: &'a [u8]) -> Self::ReadResult {
74 unsafe { std::str::from_utf8_unchecked(bytes) }
75 }
76}
77
78impl Row<'_> for bool {
79 type ReadResult = Self;
80
81 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
82 writer.write_u8(if *v { 1 } else { 0 });
83 Ok(())
84 }
85
86 fn cast(bytes: &[u8]) -> Self::ReadResult {
87 bytes[0] == 1
88 }
89}
90
91pub struct FixedArrayGetter<'a, T, const N: usize> {
93 array_data: ArrayViewer<'a>,
94 _marker: PhantomData<T>,
95}
96
97impl<'a, T: Row<'a>, const N: usize> FixedArrayGetter<'a, T, N> {
98 pub fn size(&self) -> usize {
99 self.array_data.num_elements()
100 }
101
102 pub fn get(&self, idx: usize) -> T::ReadResult {
103 if idx >= self.array_data.num_elements() {
104 panic!("out of bound");
105 }
106 let bytes = self.array_data.get_field_bytes(idx);
107 <T as Row>::cast(bytes)
108 }
109}
110
111impl<'a, T: Row<'a>, const N: usize> Row<'a> for [T; N] {
112 type ReadResult = FixedArrayGetter<'a, T, N>;
113
114 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
115 let mut array_writer = ArrayWriter::new(N, writer)?;
116 for (idx, item) in v.iter().enumerate() {
117 let callback_info = array_writer.write_start(idx);
118 <T as Row>::write(item, array_writer.get_writer())?;
119 array_writer.write_end(callback_info);
120 }
121 Ok(())
122 }
123
124 fn cast(row: &'a [u8]) -> Self::ReadResult {
125 FixedArrayGetter {
126 array_data: ArrayViewer::new(row),
127 _marker: PhantomData::<T>,
128 }
129 }
130}
131
132impl Row<'_> for NaiveDate {
133 type ReadResult = Result<NaiveDate, Error>;
134
135 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
136 let days_since_epoch = v.signed_duration_since(EPOCH).num_days();
137 writer.write_u32(days_since_epoch as u32);
138 Ok(())
139 }
140
141 fn cast(bytes: &[u8]) -> Self::ReadResult {
142 let days = LittleEndian::read_u32(bytes);
143 EPOCH
144 .checked_add_days(Days::new(days.into()))
145 .ok_or(Error::invalid_data(format!(
146 "Date out of range, {days} days since epoch"
147 )))
148 }
149}
150
151impl Row<'_> for NaiveDateTime {
152 type ReadResult = Result<NaiveDateTime, Error>;
153
154 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
155 writer.write_i64(v.and_utc().timestamp_millis());
156 Ok(())
157 }
158
159 fn cast(bytes: &[u8]) -> Self::ReadResult {
160 let timestamp = LittleEndian::read_u64(bytes);
161 DateTime::from_timestamp_millis(timestamp as i64)
162 .map(|dt| dt.naive_utc())
163 .ok_or(Error::invalid_data(format!(
164 "Date out of range, timestamp:{timestamp}"
165 )))
166 }
167}
168
169impl<'a> Row<'a> for Vec<u8> {
170 type ReadResult = &'a [u8];
171
172 fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
173 writer.write_bytes(v);
174 Ok(())
175 }
176
177 fn cast(bytes: &'a [u8]) -> Self::ReadResult {
178 bytes
179 }
180}
181
182pub struct ArrayGetter<'a, T> {
183 array_data: ArrayViewer<'a>,
184 _marker: PhantomData<T>,
185}
186
187#[allow(clippy::needless_lifetimes)]
188impl<'a, T: Row<'a>> ArrayGetter<'a, T> {
189 pub fn size(&self) -> usize {
190 self.array_data.num_elements()
191 }
192
193 pub fn get(&self, idx: usize) -> T::ReadResult {
194 if idx >= self.array_data.num_elements() {
195 panic!("out of bound");
196 }
197 let bytes = self.array_data.get_field_bytes(idx);
198 <T as Row>::cast(bytes)
199 }
200}
201
202#[allow(clippy::needless_lifetimes)]
203impl<'a, T: Row<'a>> Row<'a> for Vec<T> {
204 type ReadResult = ArrayGetter<'a, T>;
205
206 fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
207 let mut array_writer = ArrayWriter::new(v.len(), writer)?;
208 for (idx, item) in v.iter().enumerate() {
209 let callback_info = array_writer.write_start(idx);
210 <T as Row>::write(item, array_writer.get_writer())?;
211 array_writer.write_end(callback_info);
212 }
213 Ok(())
214 }
215
216 fn cast(row: &'a [u8]) -> Self::ReadResult {
217 ArrayGetter {
218 array_data: ArrayViewer::new(row),
219 _marker: PhantomData::<T>,
220 }
221 }
222}
223
224pub struct MapGetter<'a, T1, T2>
225where
226 T1: Ord,
227 T2: Ord,
228{
229 map_data: MapViewer<'a>,
230 _key_marker: PhantomData<T1>,
231 _value_marker: PhantomData<T2>,
232}
233
234impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> MapGetter<'a, T1, T2> {
235 pub fn to_btree_map(&'a self) -> Result<BTreeMap<T1::ReadResult, T2::ReadResult>, Error>
236 where
237 <T1 as Row<'a>>::ReadResult: Ord,
238 {
239 let mut map = BTreeMap::new();
240 let keys = self.keys();
241 let values = self.values();
242
243 for i in 0..self.keys().size() {
244 map.insert(keys.get(i), values.get(i));
245 }
246 Ok(map)
247 }
248
249 pub fn keys(&'a self) -> ArrayGetter<'a, T1> {
250 ArrayGetter {
251 array_data: ArrayViewer::new(self.map_data.get_key_row()),
252 _marker: PhantomData::<T1>,
253 }
254 }
255
256 pub fn values(&'a self) -> ArrayGetter<'a, T2> {
257 ArrayGetter {
258 array_data: ArrayViewer::new(self.map_data.get_value_row()),
259 _marker: PhantomData::<T2>,
260 }
261 }
262}
263
264#[allow(clippy::needless_lifetimes)]
265impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> Row<'a> for BTreeMap<T1, T2> {
266 type ReadResult = MapGetter<'a, T1, T2>;
267
268 fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
269 let mut map_writer = MapWriter::new(writer);
270 {
271 let callback_info = map_writer.write_start(0);
272 let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
273 for (idx, item) in v.keys().enumerate() {
274 let callback_info = array_writer.write_start(idx);
275 <T1 as Row>::write(item, array_writer.get_writer())?;
276 array_writer.write_end(callback_info);
277 }
278 map_writer.write_end(callback_info);
279 }
280 {
281 let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
282 for (idx, item) in v.values().enumerate() {
283 let callback_info = array_writer.write_start(idx);
284 <T2 as Row>::write(item, array_writer.get_writer())?;
285 array_writer.write_end(callback_info);
286 }
287 }
288 Ok(())
289 }
290
291 fn cast(row: &'a [u8]) -> Self::ReadResult {
292 MapGetter {
293 map_data: MapViewer::new(row),
294 _key_marker: PhantomData::<T1>,
295 _value_marker: PhantomData::<T2>,
296 }
297 }
298}