Skip to main content

fory_core/row/
row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::util::EPOCH;
19use crate::{buffer::Writer, error::Error};
20use byteorder::{ByteOrder, LittleEndian};
21use chrono::{DateTime, Days, NaiveDate, NaiveDateTime};
22use std::collections::BTreeMap;
23use std::marker::PhantomData;
24
25use super::{
26    reader::{ArrayViewer, MapViewer},
27    writer::{ArrayWriter, MapWriter},
28};
29
30pub trait Row<'a> {
31    type ReadResult;
32
33    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error>;
34
35    fn cast(bytes: &'a [u8]) -> Self::ReadResult;
36}
37
38fn read_i8_from_bytes(bytes: &[u8]) -> i8 {
39    bytes[0] as i8
40}
41
42macro_rules! impl_row_for_number {
43    ($tt: tt, $writer: expr ,$visitor: expr) => {
44        impl<'a> Row<'a> for $tt {
45            type ReadResult = Self;
46
47            fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
48                $writer(writer, *v);
49                Ok(())
50            }
51
52            fn cast(bytes: &[u8]) -> Self::ReadResult {
53                $visitor(bytes)
54            }
55        }
56    };
57}
58impl_row_for_number!(i8, Writer::write_i8, read_i8_from_bytes);
59impl_row_for_number!(i16, Writer::write_i16, LittleEndian::read_i16);
60impl_row_for_number!(i32, Writer::write_i32, LittleEndian::read_i32);
61impl_row_for_number!(i64, Writer::write_i64, LittleEndian::read_i64);
62impl_row_for_number!(f32, Writer::write_f32, LittleEndian::read_f32);
63impl_row_for_number!(f64, Writer::write_f64, LittleEndian::read_f64);
64
65impl<'a> Row<'a> for String {
66    type ReadResult = &'a str;
67
68    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
69        writer.write_bytes(v.as_bytes());
70        Ok(())
71    }
72
73    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
74        unsafe { std::str::from_utf8_unchecked(bytes) }
75    }
76}
77
78impl Row<'_> for bool {
79    type ReadResult = Self;
80
81    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
82        writer.write_u8(if *v { 1 } else { 0 });
83        Ok(())
84    }
85
86    fn cast(bytes: &[u8]) -> Self::ReadResult {
87        bytes[0] == 1
88    }
89}
90
91/// ArrayGetter for fixed-size arrays, wrapping the underlying ArrayViewer
92pub struct FixedArrayGetter<'a, T, const N: usize> {
93    array_data: ArrayViewer<'a>,
94    _marker: PhantomData<T>,
95}
96
97impl<'a, T: Row<'a>, const N: usize> FixedArrayGetter<'a, T, N> {
98    pub fn size(&self) -> usize {
99        self.array_data.num_elements()
100    }
101
102    pub fn get(&self, idx: usize) -> T::ReadResult {
103        if idx >= self.array_data.num_elements() {
104            panic!("out of bound");
105        }
106        let bytes = self.array_data.get_field_bytes(idx);
107        <T as Row>::cast(bytes)
108    }
109}
110
111impl<'a, T: Row<'a>, const N: usize> Row<'a> for [T; N] {
112    type ReadResult = FixedArrayGetter<'a, T, N>;
113
114    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
115        let mut array_writer = ArrayWriter::new(N, writer)?;
116        for (idx, item) in v.iter().enumerate() {
117            let callback_info = array_writer.write_start(idx);
118            <T as Row>::write(item, array_writer.get_writer())?;
119            array_writer.write_end(callback_info);
120        }
121        Ok(())
122    }
123
124    fn cast(row: &'a [u8]) -> Self::ReadResult {
125        FixedArrayGetter {
126            array_data: ArrayViewer::new(row),
127            _marker: PhantomData::<T>,
128        }
129    }
130}
131
132impl Row<'_> for NaiveDate {
133    type ReadResult = Result<NaiveDate, Error>;
134
135    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
136        let days_since_epoch = v.signed_duration_since(EPOCH).num_days();
137        writer.write_u32(days_since_epoch as u32);
138        Ok(())
139    }
140
141    fn cast(bytes: &[u8]) -> Self::ReadResult {
142        let days = LittleEndian::read_u32(bytes);
143        EPOCH
144            .checked_add_days(Days::new(days.into()))
145            .ok_or(Error::invalid_data(format!(
146                "Date out of range, {days} days since epoch"
147            )))
148    }
149}
150
151impl Row<'_> for NaiveDateTime {
152    type ReadResult = Result<NaiveDateTime, Error>;
153
154    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
155        writer.write_i64(v.and_utc().timestamp_millis());
156        Ok(())
157    }
158
159    fn cast(bytes: &[u8]) -> Self::ReadResult {
160        let timestamp = LittleEndian::read_u64(bytes);
161        DateTime::from_timestamp_millis(timestamp as i64)
162            .map(|dt| dt.naive_utc())
163            .ok_or(Error::invalid_data(format!(
164                "Date out of range, timestamp:{timestamp}"
165            )))
166    }
167}
168
169impl<'a> Row<'a> for Vec<u8> {
170    type ReadResult = &'a [u8];
171
172    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
173        writer.write_bytes(v);
174        Ok(())
175    }
176
177    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
178        bytes
179    }
180}
181
182pub struct ArrayGetter<'a, T> {
183    array_data: ArrayViewer<'a>,
184    _marker: PhantomData<T>,
185}
186
187#[allow(clippy::needless_lifetimes)]
188impl<'a, T: Row<'a>> ArrayGetter<'a, T> {
189    pub fn size(&self) -> usize {
190        self.array_data.num_elements()
191    }
192
193    pub fn get(&self, idx: usize) -> T::ReadResult {
194        if idx >= self.array_data.num_elements() {
195            panic!("out of bound");
196        }
197        let bytes = self.array_data.get_field_bytes(idx);
198        <T as Row>::cast(bytes)
199    }
200}
201
202#[allow(clippy::needless_lifetimes)]
203impl<'a, T: Row<'a>> Row<'a> for Vec<T> {
204    type ReadResult = ArrayGetter<'a, T>;
205
206    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
207        let mut array_writer = ArrayWriter::new(v.len(), writer)?;
208        for (idx, item) in v.iter().enumerate() {
209            let callback_info = array_writer.write_start(idx);
210            <T as Row>::write(item, array_writer.get_writer())?;
211            array_writer.write_end(callback_info);
212        }
213        Ok(())
214    }
215
216    fn cast(row: &'a [u8]) -> Self::ReadResult {
217        ArrayGetter {
218            array_data: ArrayViewer::new(row),
219            _marker: PhantomData::<T>,
220        }
221    }
222}
223
224pub struct MapGetter<'a, T1, T2>
225where
226    T1: Ord,
227    T2: Ord,
228{
229    map_data: MapViewer<'a>,
230    _key_marker: PhantomData<T1>,
231    _value_marker: PhantomData<T2>,
232}
233
234impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> MapGetter<'a, T1, T2> {
235    pub fn to_btree_map(&'a self) -> Result<BTreeMap<T1::ReadResult, T2::ReadResult>, Error>
236    where
237        <T1 as Row<'a>>::ReadResult: Ord,
238    {
239        let mut map = BTreeMap::new();
240        let keys = self.keys();
241        let values = self.values();
242
243        for i in 0..self.keys().size() {
244            map.insert(keys.get(i), values.get(i));
245        }
246        Ok(map)
247    }
248
249    pub fn keys(&'a self) -> ArrayGetter<'a, T1> {
250        ArrayGetter {
251            array_data: ArrayViewer::new(self.map_data.get_key_row()),
252            _marker: PhantomData::<T1>,
253        }
254    }
255
256    pub fn values(&'a self) -> ArrayGetter<'a, T2> {
257        ArrayGetter {
258            array_data: ArrayViewer::new(self.map_data.get_value_row()),
259            _marker: PhantomData::<T2>,
260        }
261    }
262}
263
264#[allow(clippy::needless_lifetimes)]
265impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> Row<'a> for BTreeMap<T1, T2> {
266    type ReadResult = MapGetter<'a, T1, T2>;
267
268    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
269        let mut map_writer = MapWriter::new(writer);
270        {
271            let callback_info = map_writer.write_start(0);
272            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
273            for (idx, item) in v.keys().enumerate() {
274                let callback_info = array_writer.write_start(idx);
275                <T1 as Row>::write(item, array_writer.get_writer())?;
276                array_writer.write_end(callback_info);
277            }
278            map_writer.write_end(callback_info);
279        }
280        {
281            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
282            for (idx, item) in v.values().enumerate() {
283                let callback_info = array_writer.write_start(idx);
284                <T2 as Row>::write(item, array_writer.get_writer())?;
285                array_writer.write_end(callback_info);
286            }
287        }
288        Ok(())
289    }
290
291    fn cast(row: &'a [u8]) -> Self::ReadResult {
292        MapGetter {
293            map_data: MapViewer::new(row),
294            _key_marker: PhantomData::<T1>,
295            _value_marker: PhantomData::<T2>,
296        }
297    }
298}