Skip to main content

fory_core/row/
row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::types::{Date, Duration, Timestamp};
19use crate::{buffer::Writer, error::Error};
20use byteorder::{ByteOrder, LittleEndian};
21use std::collections::BTreeMap;
22use std::marker::PhantomData;
23
24use super::{
25    reader::{ArrayViewer, MapViewer},
26    writer::{ArrayWriter, MapWriter},
27};
28
29pub trait Row<'a> {
30    type ReadResult;
31
32    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error>;
33
34    fn cast(bytes: &'a [u8]) -> Self::ReadResult;
35}
36
37fn read_i8_from_bytes(bytes: &[u8]) -> i8 {
38    bytes[0] as i8
39}
40
41macro_rules! impl_row_for_number {
42    ($tt: tt, $writer: expr ,$visitor: expr) => {
43        impl<'a> Row<'a> for $tt {
44            type ReadResult = Self;
45
46            fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
47                $writer(writer, *v);
48                Ok(())
49            }
50
51            fn cast(bytes: &[u8]) -> Self::ReadResult {
52                $visitor(bytes)
53            }
54        }
55    };
56}
57impl_row_for_number!(i8, Writer::write_i8, read_i8_from_bytes);
58impl_row_for_number!(i16, Writer::write_i16, LittleEndian::read_i16);
59impl_row_for_number!(i32, Writer::write_i32, LittleEndian::read_i32);
60impl_row_for_number!(i64, Writer::write_i64, LittleEndian::read_i64);
61impl_row_for_number!(f32, Writer::write_f32, LittleEndian::read_f32);
62impl_row_for_number!(f64, Writer::write_f64, LittleEndian::read_f64);
63
64impl<'a> Row<'a> for String {
65    type ReadResult = &'a str;
66
67    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
68        writer.write_bytes(v.as_bytes());
69        Ok(())
70    }
71
72    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
73        unsafe { std::str::from_utf8_unchecked(bytes) }
74    }
75}
76
77impl Row<'_> for bool {
78    type ReadResult = Self;
79
80    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
81        writer.write_u8(if *v { 1 } else { 0 });
82        Ok(())
83    }
84
85    fn cast(bytes: &[u8]) -> Self::ReadResult {
86        bytes[0] == 1
87    }
88}
89
90/// ArrayGetter for fixed-size arrays, wrapping the underlying ArrayViewer
91pub struct FixedArrayGetter<'a, T, const N: usize> {
92    array_data: ArrayViewer<'a>,
93    _marker: PhantomData<T>,
94}
95
96impl<'a, T: Row<'a>, const N: usize> FixedArrayGetter<'a, T, N> {
97    pub fn size(&self) -> usize {
98        self.array_data.num_elements()
99    }
100
101    pub fn get(&self, idx: usize) -> Result<T::ReadResult, Error> {
102        if idx >= self.array_data.num_elements() {
103            return Err(Error::buffer_out_of_bound(
104                idx,
105                1,
106                self.array_data.num_elements(),
107            ));
108        }
109        let bytes = self.array_data.get_field_bytes(idx);
110        Ok(<T as Row>::cast(bytes))
111    }
112}
113
114impl<'a, T: Row<'a>, const N: usize> Row<'a> for [T; N] {
115    type ReadResult = FixedArrayGetter<'a, T, N>;
116
117    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
118        let mut array_writer = ArrayWriter::new(N, writer)?;
119        for (idx, item) in v.iter().enumerate() {
120            let callback_info = array_writer.write_start(idx);
121            <T as Row>::write(item, array_writer.get_writer())?;
122            array_writer.write_end(callback_info);
123        }
124        Ok(())
125    }
126
127    fn cast(row: &'a [u8]) -> Self::ReadResult {
128        FixedArrayGetter {
129            array_data: ArrayViewer::new(row),
130            _marker: PhantomData::<T>,
131        }
132    }
133}
134
135impl Row<'_> for Date {
136    type ReadResult = Result<Date, Error>;
137
138    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
139        let days = i32::try_from(v.epoch_days()).map_err(|_| {
140            Error::invalid_data(format!(
141                "row date day count {} exceeds date32 range",
142                v.epoch_days()
143            ))
144        })?;
145        writer.write_i32(days);
146        Ok(())
147    }
148
149    fn cast(bytes: &[u8]) -> Self::ReadResult {
150        Ok(Date::from_epoch_days(i64::from(LittleEndian::read_i32(
151            bytes,
152        ))))
153    }
154}
155
156impl Row<'_> for Timestamp {
157    type ReadResult = Result<Timestamp, Error>;
158
159    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
160        writer.write_i64(v.to_epoch_micros()?);
161        Ok(())
162    }
163
164    fn cast(bytes: &[u8]) -> Self::ReadResult {
165        Ok(Timestamp::from_epoch_micros(LittleEndian::read_i64(bytes)))
166    }
167}
168
169impl Row<'_> for Duration {
170    type ReadResult = Result<Duration, Error>;
171
172    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
173        writer.write_i64(v.to_micros()?);
174        Ok(())
175    }
176
177    fn cast(bytes: &[u8]) -> Self::ReadResult {
178        Ok(Duration::from_micros(LittleEndian::read_i64(bytes)))
179    }
180}
181
182impl<'a> Row<'a> for Vec<u8> {
183    type ReadResult = &'a [u8];
184
185    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
186        writer.write_bytes(v);
187        Ok(())
188    }
189
190    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
191        bytes
192    }
193}
194
195pub struct ArrayGetter<'a, T> {
196    array_data: ArrayViewer<'a>,
197    _marker: PhantomData<T>,
198}
199
200#[allow(clippy::needless_lifetimes)]
201impl<'a, T: Row<'a>> ArrayGetter<'a, T> {
202    pub fn size(&self) -> usize {
203        self.array_data.num_elements()
204    }
205
206    pub fn get(&self, idx: usize) -> Result<T::ReadResult, Error> {
207        if idx >= self.array_data.num_elements() {
208            return Err(Error::buffer_out_of_bound(
209                idx,
210                1,
211                self.array_data.num_elements(),
212            ));
213        }
214        let bytes = self.array_data.get_field_bytes(idx);
215        Ok(<T as Row>::cast(bytes))
216    }
217}
218
219#[allow(clippy::needless_lifetimes)]
220impl<'a, T: Row<'a>> Row<'a> for Vec<T> {
221    type ReadResult = ArrayGetter<'a, T>;
222
223    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
224        let mut array_writer = ArrayWriter::new(v.len(), writer)?;
225        for (idx, item) in v.iter().enumerate() {
226            let callback_info = array_writer.write_start(idx);
227            <T as Row>::write(item, array_writer.get_writer())?;
228            array_writer.write_end(callback_info);
229        }
230        Ok(())
231    }
232
233    fn cast(row: &'a [u8]) -> Self::ReadResult {
234        ArrayGetter {
235            array_data: ArrayViewer::new(row),
236            _marker: PhantomData::<T>,
237        }
238    }
239}
240
241pub struct MapGetter<'a, T1, T2>
242where
243    T1: Ord,
244    T2: Ord,
245{
246    map_data: MapViewer<'a>,
247    _key_marker: PhantomData<T1>,
248    _value_marker: PhantomData<T2>,
249}
250
251impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> MapGetter<'a, T1, T2> {
252    pub fn to_btree_map(&'a self) -> Result<BTreeMap<T1::ReadResult, T2::ReadResult>, Error>
253    where
254        <T1 as Row<'a>>::ReadResult: Ord,
255    {
256        let mut map = BTreeMap::new();
257        let keys = self.keys();
258        let values = self.values();
259
260        for i in 0..self.keys().size() {
261            map.insert(keys.get(i)?, values.get(i)?);
262        }
263        Ok(map)
264    }
265
266    pub fn keys(&'a self) -> ArrayGetter<'a, T1> {
267        ArrayGetter {
268            array_data: ArrayViewer::new(self.map_data.get_key_row()),
269            _marker: PhantomData::<T1>,
270        }
271    }
272
273    pub fn values(&'a self) -> ArrayGetter<'a, T2> {
274        ArrayGetter {
275            array_data: ArrayViewer::new(self.map_data.get_value_row()),
276            _marker: PhantomData::<T2>,
277        }
278    }
279}
280
281#[allow(clippy::needless_lifetimes)]
282impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> Row<'a> for BTreeMap<T1, T2> {
283    type ReadResult = MapGetter<'a, T1, T2>;
284
285    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
286        let mut map_writer = MapWriter::new(writer);
287        {
288            let callback_info = map_writer.write_start(0);
289            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
290            for (idx, item) in v.keys().enumerate() {
291                let callback_info = array_writer.write_start(idx);
292                <T1 as Row>::write(item, array_writer.get_writer())?;
293                array_writer.write_end(callback_info);
294            }
295            map_writer.write_end(callback_info);
296        }
297        {
298            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
299            for (idx, item) in v.values().enumerate() {
300                let callback_info = array_writer.write_start(idx);
301                <T2 as Row>::write(item, array_writer.get_writer())?;
302                array_writer.write_end(callback_info);
303            }
304        }
305        Ok(())
306    }
307
308    fn cast(row: &'a [u8]) -> Self::ReadResult {
309        MapGetter {
310            map_data: MapViewer::new(row),
311            _key_marker: PhantomData::<T1>,
312            _value_marker: PhantomData::<T2>,
313        }
314    }
315}