Skip to main content

fory_core/row/
row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::util::EPOCH;
19use crate::{buffer::Writer, error::Error};
20use byteorder::{ByteOrder, LittleEndian};
21use chrono::{DateTime, Days, NaiveDate, NaiveDateTime};
22use std::collections::BTreeMap;
23use std::marker::PhantomData;
24
25use super::{
26    reader::{ArrayViewer, MapViewer},
27    writer::{ArrayWriter, MapWriter},
28};
29
30pub trait Row<'a> {
31    type ReadResult;
32
33    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error>;
34
35    fn cast(bytes: &'a [u8]) -> Self::ReadResult;
36}
37
38fn read_i8_from_bytes(bytes: &[u8]) -> i8 {
39    bytes[0] as i8
40}
41
42macro_rules! impl_row_for_number {
43    ($tt: tt, $writer: expr ,$visitor: expr) => {
44        impl<'a> Row<'a> for $tt {
45            type ReadResult = Self;
46
47            fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
48                $writer(writer, *v);
49                Ok(())
50            }
51
52            fn cast(bytes: &[u8]) -> Self::ReadResult {
53                $visitor(bytes)
54            }
55        }
56    };
57}
58impl_row_for_number!(i8, Writer::write_i8, read_i8_from_bytes);
59impl_row_for_number!(i16, Writer::write_i16, LittleEndian::read_i16);
60impl_row_for_number!(i32, Writer::write_i32, LittleEndian::read_i32);
61impl_row_for_number!(i64, Writer::write_i64, LittleEndian::read_i64);
62impl_row_for_number!(f32, Writer::write_f32, LittleEndian::read_f32);
63impl_row_for_number!(f64, Writer::write_f64, LittleEndian::read_f64);
64
65impl<'a> Row<'a> for String {
66    type ReadResult = &'a str;
67
68    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
69        writer.write_bytes(v.as_bytes());
70        Ok(())
71    }
72
73    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
74        unsafe { std::str::from_utf8_unchecked(bytes) }
75    }
76}
77
78impl Row<'_> for bool {
79    type ReadResult = Self;
80
81    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
82        writer.write_u8(if *v { 1 } else { 0 });
83        Ok(())
84    }
85
86    fn cast(bytes: &[u8]) -> Self::ReadResult {
87        bytes[0] == 1
88    }
89}
90
91/// ArrayGetter for fixed-size arrays, wrapping the underlying ArrayViewer
92pub struct FixedArrayGetter<'a, T, const N: usize> {
93    array_data: ArrayViewer<'a>,
94    _marker: PhantomData<T>,
95}
96
97impl<'a, T: Row<'a>, const N: usize> FixedArrayGetter<'a, T, N> {
98    pub fn size(&self) -> usize {
99        self.array_data.num_elements()
100    }
101
102    pub fn get(&self, idx: usize) -> Result<T::ReadResult, Error> {
103        if idx >= self.array_data.num_elements() {
104            return Err(Error::buffer_out_of_bound(
105                idx,
106                1,
107                self.array_data.num_elements(),
108            ));
109        }
110        let bytes = self.array_data.get_field_bytes(idx);
111        Ok(<T as Row>::cast(bytes))
112    }
113}
114
115impl<'a, T: Row<'a>, const N: usize> Row<'a> for [T; N] {
116    type ReadResult = FixedArrayGetter<'a, T, N>;
117
118    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
119        let mut array_writer = ArrayWriter::new(N, writer)?;
120        for (idx, item) in v.iter().enumerate() {
121            let callback_info = array_writer.write_start(idx);
122            <T as Row>::write(item, array_writer.get_writer())?;
123            array_writer.write_end(callback_info);
124        }
125        Ok(())
126    }
127
128    fn cast(row: &'a [u8]) -> Self::ReadResult {
129        FixedArrayGetter {
130            array_data: ArrayViewer::new(row),
131            _marker: PhantomData::<T>,
132        }
133    }
134}
135
136impl Row<'_> for NaiveDate {
137    type ReadResult = Result<NaiveDate, Error>;
138
139    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
140        let days_since_epoch = v.signed_duration_since(EPOCH).num_days();
141        writer.write_u32(days_since_epoch as u32);
142        Ok(())
143    }
144
145    fn cast(bytes: &[u8]) -> Self::ReadResult {
146        let days = LittleEndian::read_u32(bytes);
147        EPOCH
148            .checked_add_days(Days::new(days.into()))
149            .ok_or(Error::invalid_data(format!(
150                "Date out of range, {days} days since epoch"
151            )))
152    }
153}
154
155impl Row<'_> for NaiveDateTime {
156    type ReadResult = Result<NaiveDateTime, Error>;
157
158    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
159        writer.write_i64(v.and_utc().timestamp_millis());
160        Ok(())
161    }
162
163    fn cast(bytes: &[u8]) -> Self::ReadResult {
164        let timestamp = LittleEndian::read_u64(bytes);
165        DateTime::from_timestamp_millis(timestamp as i64)
166            .map(|dt| dt.naive_utc())
167            .ok_or(Error::invalid_data(format!(
168                "Date out of range, timestamp:{timestamp}"
169            )))
170    }
171}
172
173impl<'a> Row<'a> for Vec<u8> {
174    type ReadResult = &'a [u8];
175
176    fn write(v: &Self, writer: &mut Writer) -> Result<(), Error> {
177        writer.write_bytes(v);
178        Ok(())
179    }
180
181    fn cast(bytes: &'a [u8]) -> Self::ReadResult {
182        bytes
183    }
184}
185
186pub struct ArrayGetter<'a, T> {
187    array_data: ArrayViewer<'a>,
188    _marker: PhantomData<T>,
189}
190
191#[allow(clippy::needless_lifetimes)]
192impl<'a, T: Row<'a>> ArrayGetter<'a, T> {
193    pub fn size(&self) -> usize {
194        self.array_data.num_elements()
195    }
196
197    pub fn get(&self, idx: usize) -> Result<T::ReadResult, Error> {
198        if idx >= self.array_data.num_elements() {
199            return Err(Error::buffer_out_of_bound(
200                idx,
201                1,
202                self.array_data.num_elements(),
203            ));
204        }
205        let bytes = self.array_data.get_field_bytes(idx);
206        Ok(<T as Row>::cast(bytes))
207    }
208}
209
210#[allow(clippy::needless_lifetimes)]
211impl<'a, T: Row<'a>> Row<'a> for Vec<T> {
212    type ReadResult = ArrayGetter<'a, T>;
213
214    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
215        let mut array_writer = ArrayWriter::new(v.len(), writer)?;
216        for (idx, item) in v.iter().enumerate() {
217            let callback_info = array_writer.write_start(idx);
218            <T as Row>::write(item, array_writer.get_writer())?;
219            array_writer.write_end(callback_info);
220        }
221        Ok(())
222    }
223
224    fn cast(row: &'a [u8]) -> Self::ReadResult {
225        ArrayGetter {
226            array_data: ArrayViewer::new(row),
227            _marker: PhantomData::<T>,
228        }
229    }
230}
231
232pub struct MapGetter<'a, T1, T2>
233where
234    T1: Ord,
235    T2: Ord,
236{
237    map_data: MapViewer<'a>,
238    _key_marker: PhantomData<T1>,
239    _value_marker: PhantomData<T2>,
240}
241
242impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> MapGetter<'a, T1, T2> {
243    pub fn to_btree_map(&'a self) -> Result<BTreeMap<T1::ReadResult, T2::ReadResult>, Error>
244    where
245        <T1 as Row<'a>>::ReadResult: Ord,
246    {
247        let mut map = BTreeMap::new();
248        let keys = self.keys();
249        let values = self.values();
250
251        for i in 0..self.keys().size() {
252            map.insert(keys.get(i)?, values.get(i)?);
253        }
254        Ok(map)
255    }
256
257    pub fn keys(&'a self) -> ArrayGetter<'a, T1> {
258        ArrayGetter {
259            array_data: ArrayViewer::new(self.map_data.get_key_row()),
260            _marker: PhantomData::<T1>,
261        }
262    }
263
264    pub fn values(&'a self) -> ArrayGetter<'a, T2> {
265        ArrayGetter {
266            array_data: ArrayViewer::new(self.map_data.get_value_row()),
267            _marker: PhantomData::<T2>,
268        }
269    }
270}
271
272#[allow(clippy::needless_lifetimes)]
273impl<'a, T1: Row<'a> + Ord, T2: Row<'a> + Ord> Row<'a> for BTreeMap<T1, T2> {
274    type ReadResult = MapGetter<'a, T1, T2>;
275
276    fn write<'b>(v: &Self, writer: &mut Writer<'b>) -> Result<(), Error> {
277        let mut map_writer = MapWriter::new(writer);
278        {
279            let callback_info = map_writer.write_start(0);
280            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
281            for (idx, item) in v.keys().enumerate() {
282                let callback_info = array_writer.write_start(idx);
283                <T1 as Row>::write(item, array_writer.get_writer())?;
284                array_writer.write_end(callback_info);
285            }
286            map_writer.write_end(callback_info);
287        }
288        {
289            let mut array_writer = ArrayWriter::new(v.len(), map_writer.get_writer())?;
290            for (idx, item) in v.values().enumerate() {
291                let callback_info = array_writer.write_start(idx);
292                <T2 as Row>::write(item, array_writer.get_writer())?;
293                array_writer.write_end(callback_info);
294            }
295        }
296        Ok(())
297    }
298
299    fn cast(row: &'a [u8]) -> Self::ReadResult {
300        MapGetter {
301            map_data: MapViewer::new(row),
302            _key_marker: PhantomData::<T1>,
303            _value_marker: PhantomData::<T2>,
304        }
305    }
306}