Skip to main content

fluss/row/compacted/
compacted_row_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::metadata::RowType;
19use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
20use crate::{
21    metadata::DataType,
22    row::{Datum, Decimal, GenericRow, compacted::compacted_row_writer::CompactedRowWriter},
23    util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
24};
25use std::borrow::Cow;
26use std::str::from_utf8;
27
28#[allow(dead_code)]
29#[derive(Clone)]
30pub struct CompactedRowDeserializer<'a> {
31    row_type: Cow<'a, RowType>,
32}
33
34#[allow(dead_code)]
35impl<'a> CompactedRowDeserializer<'a> {
36    pub fn new(row_type: &'a RowType) -> Self {
37        Self {
38            row_type: Cow::Borrowed(row_type),
39        }
40    }
41
42    pub fn new_from_owned(row_type: RowType) -> Self {
43        Self {
44            row_type: Cow::Owned(row_type),
45        }
46    }
47
48    pub fn get_row_type(&self) -> &RowType {
49        self.row_type.as_ref()
50    }
51
52    pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> {
53        let mut row = GenericRow::new(self.row_type.fields().len());
54        let mut cursor = reader.initial_position();
55        for (col_pos, data_field) in self.row_type.fields().iter().enumerate() {
56            let dtype = &data_field.data_type;
57            if dtype.is_nullable() && reader.is_null_at(col_pos) {
58                row.set_field(col_pos, Datum::Null);
59                continue;
60            }
61            let (datum, next_cursor) = match dtype {
62                DataType::Boolean(_) => {
63                    let (val, next) = reader.read_boolean(cursor);
64                    (Datum::Bool(val), next)
65                }
66                DataType::TinyInt(_) => {
67                    let (val, next) = reader.read_byte(cursor);
68                    (Datum::Int8(val as i8), next)
69                }
70                DataType::SmallInt(_) => {
71                    let (val, next) = reader.read_short(cursor);
72                    (Datum::Int16(val), next)
73                }
74                DataType::Int(_) => {
75                    let (val, next) = reader.read_int(cursor);
76                    (Datum::Int32(val), next)
77                }
78                DataType::BigInt(_) => {
79                    let (val, next) = reader.read_long(cursor);
80                    (Datum::Int64(val), next)
81                }
82                DataType::Float(_) => {
83                    let (val, next) = reader.read_float(cursor);
84                    (Datum::Float32(val.into()), next)
85                }
86                DataType::Double(_) => {
87                    let (val, next) = reader.read_double(cursor);
88                    (Datum::Float64(val.into()), next)
89                }
90                // TODO: use read_char(length) in the future, but need to keep compatibility
91                DataType::Char(_) | DataType::String(_) => {
92                    let (val, next) = reader.read_string(cursor);
93                    (Datum::String(val.into()), next)
94                }
95                // TODO: use read_binary(length) in the future, but need to keep compatibility
96                DataType::Bytes(_) | DataType::Binary(_) => {
97                    let (val, next) = reader.read_bytes(cursor);
98                    (Datum::Blob(val.into()), next)
99                }
100                DataType::Decimal(decimal_type) => {
101                    let precision = decimal_type.precision();
102                    let scale = decimal_type.scale();
103                    if Decimal::is_compact_precision(precision) {
104                        // Compact: stored as i64
105                        let (val, next) = reader.read_long(cursor);
106                        let decimal = Decimal::from_unscaled_long(val, precision, scale)
107                            .expect("Failed to create decimal from unscaled long");
108                        (Datum::Decimal(decimal), next)
109                    } else {
110                        // Non-compact: stored as minimal big-endian bytes
111                        let (bytes, next) = reader.read_bytes(cursor);
112                        let decimal = Decimal::from_unscaled_bytes(bytes, precision, scale)
113                            .expect("Failed to create decimal from unscaled bytes");
114                        (Datum::Decimal(decimal), next)
115                    }
116                }
117                DataType::Date(_) => {
118                    let (val, next) = reader.read_int(cursor);
119                    (Datum::Date(crate::row::datum::Date::new(val)), next)
120                }
121                DataType::Time(_) => {
122                    let (val, next) = reader.read_int(cursor);
123                    (Datum::Time(crate::row::datum::Time::new(val)), next)
124                }
125                DataType::Timestamp(timestamp_type) => {
126                    let precision = timestamp_type.precision();
127                    if crate::row::datum::TimestampNtz::is_compact(precision) {
128                        // Compact: only milliseconds
129                        let (millis, next) = reader.read_long(cursor);
130                        (
131                            Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(millis)),
132                            next,
133                        )
134                    } else {
135                        // Non-compact: milliseconds + nanos
136                        let (millis, mid) = reader.read_long(cursor);
137                        let (nanos, next) = reader.read_int(mid);
138                        let timestamp =
139                            crate::row::datum::TimestampNtz::from_millis_nanos(millis, nanos)
140                                .expect("Invalid nano_of_millisecond value in compacted row");
141                        (Datum::TimestampNtz(timestamp), next)
142                    }
143                }
144                DataType::TimestampLTz(timestamp_ltz_type) => {
145                    let precision = timestamp_ltz_type.precision();
146                    if crate::row::datum::TimestampLtz::is_compact(precision) {
147                        // Compact: only epoch milliseconds
148                        let (epoch_millis, next) = reader.read_long(cursor);
149                        (
150                            Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(epoch_millis)),
151                            next,
152                        )
153                    } else {
154                        // Non-compact: epoch milliseconds + nanos
155                        let (epoch_millis, mid) = reader.read_long(cursor);
156                        let (nanos, next) = reader.read_int(mid);
157                        let timestamp_ltz =
158                            crate::row::datum::TimestampLtz::from_millis_nanos(epoch_millis, nanos)
159                                .expect("Invalid nano_of_millisecond value in compacted row");
160                        (Datum::TimestampLtz(timestamp_ltz), next)
161                    }
162                }
163                _ => {
164                    panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}");
165                }
166            };
167            cursor = next_cursor;
168            row.set_field(col_pos, datum);
169        }
170        row
171    }
172}
173
174// Reference implementation:
175// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java
176#[allow(dead_code)]
177pub struct CompactedRowReader<'a> {
178    segment: &'a [u8],
179    offset: usize,
180    limit: usize,
181    header_size_in_bytes: usize,
182}
183
184#[allow(dead_code)]
185impl<'a> CompactedRowReader<'a> {
186    pub fn new(field_count: usize, data: &'a [u8], offset: usize, length: usize) -> Self {
187        let header_size_in_bytes = calculate_bit_set_width_in_bytes(field_count);
188        let limit = offset + length;
189        let position = offset + header_size_in_bytes;
190        debug_assert!(limit <= data.len());
191        debug_assert!(position <= limit);
192
193        CompactedRowReader {
194            segment: data,
195            offset,
196            limit,
197            header_size_in_bytes,
198        }
199    }
200
201    fn initial_position(&self) -> usize {
202        self.offset + self.header_size_in_bytes
203    }
204
205    pub fn is_null_at(&self, col_pos: usize) -> bool {
206        let byte_index = col_pos >> 3;
207        let bit = col_pos & 7;
208        debug_assert!(byte_index < self.header_size_in_bytes);
209        let idx = self.offset + byte_index;
210        (self.segment[idx] & (1u8 << bit)) != 0
211    }
212
213    pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
214        let (val, next) = self.read_byte(pos);
215        (val != 0, next)
216    }
217
218    pub fn read_byte(&self, pos: usize) -> (u8, usize) {
219        debug_assert!(pos < self.limit);
220        (self.segment[pos], pos + 1)
221    }
222
223    pub fn read_short(&self, pos: usize) -> (i16, usize) {
224        let next_pos = pos + 2;
225        debug_assert!(next_pos <= self.limit);
226        let bytes_slice = &self.segment[pos..pos + 2];
227        let val = i16::from_ne_bytes(
228            bytes_slice
229                .try_into()
230                .expect("Slice must be exactly 2 bytes long"),
231        );
232        (val, next_pos)
233    }
234
235    pub fn read_int(&self, pos: usize) -> (i32, usize) {
236        match read_unsigned_varint_at(self.segment, pos, CompactedRowWriter::MAX_INT_SIZE) {
237            Ok((value, next_pos)) => (value as i32, next_pos),
238            Err(_) => panic!("Invalid VarInt32 input stream."),
239        }
240    }
241
242    pub fn read_long(&self, pos: usize) -> (i64, usize) {
243        match read_unsigned_varint_u64_at(self.segment, pos, CompactedRowWriter::MAX_LONG_SIZE) {
244            Ok((value, next_pos)) => (value as i64, next_pos),
245            Err(_) => panic!("Invalid VarInt64 input stream."),
246        }
247    }
248
249    pub fn read_float(&self, pos: usize) -> (f32, usize) {
250        let next_pos = pos + 4;
251        debug_assert!(next_pos <= self.limit);
252        let val = f32::from_ne_bytes(
253            self.segment[pos..pos + 4]
254                .try_into()
255                .expect("Slice must be exactly 4 bytes long"),
256        );
257        (val, next_pos)
258    }
259
260    pub fn read_double(&self, pos: usize) -> (f64, usize) {
261        let next_pos = pos + 8;
262        debug_assert!(next_pos <= self.limit);
263        let val = f64::from_ne_bytes(
264            self.segment[pos..pos + 8]
265                .try_into()
266                .expect("Slice must be exactly 8 bytes long"),
267        );
268        (val, next_pos)
269    }
270
271    pub fn read_binary(&self, pos: usize) -> (&'a [u8], usize) {
272        self.read_bytes(pos)
273    }
274
275    pub fn read_bytes(&self, pos: usize) -> (&'a [u8], usize) {
276        let (len, data_pos) = self.read_int(pos);
277        let len = len as usize;
278        let next_pos = data_pos + len;
279        debug_assert!(next_pos <= self.limit);
280        (&self.segment[data_pos..next_pos], next_pos)
281    }
282
283    pub fn read_string(&self, pos: usize) -> (&'a str, usize) {
284        let (bytes, next_pos) = self.read_bytes(pos);
285        let s = from_utf8(bytes).expect("Invalid UTF-8 when reading string");
286        (s, next_pos)
287    }
288}