fory_core/row/
row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::util::EPOCH;
19use crate::{buffer::Writer, error::Error};
20use byteorder::{ByteOrder, LittleEndian};
21use chrono::{DateTime, Days, NaiveDate, NaiveDateTime};
22use std::collections::BTreeMap;
23use std::marker::PhantomData;
24
25use super::{
26    reader::{ArrayViewer, MapViewer},
27    writer::{ArrayWriter, MapWriter},
28};
29
30pub trait Row<'a> {
31    type ReadResult;
32
33    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error>;
34
35    fn cast(bytes: &'a [u8]) -> Self::ReadResult;
36}
37
38fn read_i8_from_bytes(bytes: &[u8]) -> i8 {
39    bytes[0] as i8
40}
41
42macro_rules! impl_row_for_number {
43    ($tt: tt, $writer: expr ,$visitor: expr) => {
44        impl<'a> Row<'a> for $tt {
45            type ReadResult = Self;
46
47            fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
48                $writer(writer, *v);
49                Ok(())
50            }
51
52            fn cast(bytes: &[u8]) -> Self::ReadResult {
53                $visitor(bytes)
54            }
55        }
56    };
57}
58impl_row_for_number!(i8, Writer::write_i8, read_i8_from_bytes);
59impl_row_for_number!(i16, Writer::write_i16, LittleEndian::read_i16);
60impl_row_for_number!(i32, Writer::write_i32, LittleEndian::read_i32);
61impl_row_for_number!(i64, Writer::write_i64, LittleEndian::read_i64);
62impl_row_for_number!(f32, Writer::write_f32, LittleEndian::read_f32);
63impl_row_for_number!(f64, Writer::write_f64, LittleEndian::read_f64);
64
65impl<'a> Row<'a> for String {
66    type ReadResult = &'a str;
67
68    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
69        writer.write_bytes(v.as_bytes());
70        Ok(())
71    }
72
73    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
74        unsafe { std::str::from_utf8_unchecked(bytes) }
75    }
76}
77
78impl Row<'_> for bool {
79    type ReadResult = Self;
80
81    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
82        writer.write_u8(if *v { 1 } else { 0 });
83        Ok(())
84    }
85
86    fn cast(bytes: &[u8]) -> Self::ReadResult {
87        bytes[0] == 1
88    }
89}
90
91impl Row<'_> for NaiveDate {
92    type ReadResult = Result<NaiveDate, Error>;
93
94    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
95        let days_since_epoch = v.signed_duration_since(EPOCH).num_days();
96        writer.write_u32(days_since_epoch as u32);
97        Ok(())
98    }
99
100    fn cast(bytes: &[u8]) -> Self::ReadResult {
101        let days = LittleEndian::read_u32(bytes);
102        EPOCH
103            .checked_add_days(Days::new(days.into()))
104            .ok_or(Error::invalid_data(format!(
105                "Date out of range, {days} days since epoch"
106            )))
107    }
108}
109
110impl Row<'_> for NaiveDateTime {
111    type ReadResult = Result<NaiveDateTime, Error>;
112
113    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
114        writer.write_i64(v.and_utc().timestamp_millis());
115        Ok(())
116    }
117
118    fn cast(bytes: &[u8]) -> Self::ReadResult {
119        let timestamp = LittleEndian::read_u64(bytes);
120        DateTime::from_timestamp_millis(timestamp as i64)
121            .map(|dt| dt.naive_utc())
122            .ok_or(Error::invalid_data(format!(
123                "Date out of range, timestamp:{timestamp}"
124            )))
125    }
126}
127
128impl<'a> Row<'a> for Vec<u8> {
129    type ReadResult = &'a [u8];
130
131    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
132        writer.write_bytes(v);
133        Ok(())
134    }
135
136    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
137        bytes
138    }
139}
140
141pub struct ArrayGetter<'a, T> {
142    array_data: ArrayViewer<'a>,
143    _marker: PhantomData<T>,
144}
145
146#[allow(clippy::needless_lifetimes)]
147impl<'a, T: Row<'a>> ArrayGetter<'a, T> {
148    pub fn size(&self) -> usize {
149        self.array_data.num_elements()
150    }
151
152    pub fn get(&self, idx: usize) -> T::ReadResult {
153        if idx >= self.array_data.num_elements() {
154            panic!("out of bound");
155        }
156        let bytes = self.array_data.get_field_bytes(idx);
157        <T as Row>::cast(bytes)
158    }
159}
160
161#[allow(clippy::needless_lifetimes)]
162impl<'a, T: Row<'a>> Row<'a> for Vec<T> {
163    type ReadResult = ArrayGetter<'a, T>;
164
165    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
166        let mut array_writer = ArrayWriter::new(v.len(), writer)?;
167        for (idx, item) in v.iter().enumerate() {
168            let callback_info = array_writer.write_start(idx);
169            <T as Row>::write(item, array_writer.get_writer())?;
170            array_writer.write_end(callback_info);
171        }
172        Ok(())
173    }
174
175    fn cast(row: &'a [u8]) -> Self::ReadResult {
176        ArrayGetter {
177            array_data: ArrayViewer::new(row),
178            _marker: PhantomData::<T>,
179        }
180    }
181}
182
183pub struct MapGetter<'a, T1, T2>
184where
185    T1: Ord,
186    T2: Ord,
187{
188    map_data: MapViewer<'a>,
189    _key_marker: PhantomData<T1>,
190    _value_marker: PhantomData<T2>,
191}
192
193impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> MapGetter<'a, T1, T2> {
194    pub fn to_btree_map(&'a self) -> Result<BTreeMap<T1::ReadResult, T2::ReadResult>, Error>
195    where
196        <T1 as Row<'a>>::ReadResult: Ord,
197    {
198        let mut map = BTreeMap::new();
199        let keys = self.keys();
200        let values = self.values();
201
202        for i in 0..self.keys().size() {
203            map.insert(keys.get(i), values.get(i));
204        }
205        Ok(map)
206    }
207
208    pub fn keys(&'a self) -> ArrayGetter<'a, T1> {
209        ArrayGetter {
210            array_data: ArrayViewer::new(self.map_data.get_key_row()),
211            _marker: PhantomData::<T1>,
212        }
213    }
214
215    pub fn values(&'a self) -> ArrayGetter<'a, T2> {
216        ArrayGetter {
217            array_data: ArrayViewer::new(self.map_data.get_value_row()),
218            _marker: PhantomData::<T2>,
219        }
220    }
221}
222
223#[allow(clippy::needless_lifetimes)]
224impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> Row<'a> for BTreeMap<T1, T2> {
225    type ReadResult = MapGetter<'a, T1, T2>;
226
227    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
228        let mut map_writer = MapWriter::new(writer);
229        {
230            let callback_info = map_writer.write_start(0);
231            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
232            for (idx, item) in v.keys().enumerate() {
233                let callback_info = array_writer.write_start(idx);
234                <T1 as Row>::write(item, array_writer.get_writer())?;
235                array_writer.write_end(callback_info);
236            }
237            map_writer.write_end(callback_info);
238        }
239        {
240            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
241            for (idx, item) in v.values().enumerate() {
242                let callback_info = array_writer.write_start(idx);
243                <T2 as Row>::write(item, array_writer.get_writer())?;
244                array_writer.write_end(callback_info);
245            }
246        }
247        Ok(())
248    }
249
250    fn cast(row: &'a [u8]) -> Self::ReadResult {
251        MapGetter {
252            map_data: MapViewer::new(row),
253            _key_marker: PhantomData::<T1>,
254            _value_marker: PhantomData::<T2>,
255        }
256    }
257}