Skip to main content

fluss/row/compacted/
compacted_row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::WriteFormat;
19use crate::error::Result;
20use crate::metadata::RowType;
21use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader};
22use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
23use crate::row::{Decimal, GenericRow, InternalRow};
24use std::sync::{Arc, OnceLock};
25
26// Reference implementation:
27// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
28#[allow(dead_code)]
29pub struct CompactedRow<'a> {
30    arity: usize,
31    size_in_bytes: usize,
32    decoded_row: OnceLock<GenericRow<'a>>,
33    deserializer: Arc<CompactedRowDeserializer<'a>>,
34    reader: CompactedRowReader<'a>,
35    data: &'a [u8],
36}
37
38pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
39    arity.div_ceil(8)
40}
41
42#[allow(dead_code)]
43impl<'a> CompactedRow<'a> {
44    pub fn from_bytes(row_type: &'a RowType, data: &'a [u8]) -> Self {
45        Self::deserialize(
46            Arc::new(CompactedRowDeserializer::new(row_type)),
47            row_type.fields().len(),
48            data,
49        )
50    }
51
52    pub fn deserialize(
53        deserializer: Arc<CompactedRowDeserializer<'a>>,
54        arity: usize,
55        data: &'a [u8],
56    ) -> Self {
57        Self {
58            arity,
59            size_in_bytes: data.len(),
60            decoded_row: OnceLock::new(),
61            deserializer: Arc::clone(&deserializer),
62            reader: CompactedRowReader::new(arity, data, 0, data.len()),
63            data,
64        }
65    }
66
67    pub fn get_size_in_bytes(&self) -> usize {
68        self.size_in_bytes
69    }
70
71    fn decoded_row(&self) -> &GenericRow<'_> {
72        self.decoded_row
73            .get_or_init(|| self.deserializer.deserialize(&self.reader))
74    }
75
76    pub fn as_bytes(&self) -> &[u8] {
77        self.data
78    }
79}
80
81impl<'a> InternalRow for CompactedRow<'a> {
82    fn get_field_count(&self) -> usize {
83        self.arity
84    }
85
86    fn is_null_at(&self, pos: usize) -> Result<bool> {
87        let fields = self.deserializer.get_row_type().fields();
88        if pos >= fields.len() {
89            return Err(crate::error::Error::IllegalArgument {
90                message: format!(
91                    "position {pos} out of bounds (row has {} fields)",
92                    fields.len()
93                ),
94            });
95        }
96        Ok(fields.as_slice()[pos].data_type.is_nullable() && self.reader.is_null_at(pos))
97    }
98
99    fn get_boolean(&self, pos: usize) -> Result<bool> {
100        self.decoded_row().get_boolean(pos)
101    }
102
103    fn get_byte(&self, pos: usize) -> Result<i8> {
104        self.decoded_row().get_byte(pos)
105    }
106
107    fn get_short(&self, pos: usize) -> Result<i16> {
108        self.decoded_row().get_short(pos)
109    }
110
111    fn get_int(&self, pos: usize) -> Result<i32> {
112        self.decoded_row().get_int(pos)
113    }
114
115    fn get_long(&self, pos: usize) -> Result<i64> {
116        self.decoded_row().get_long(pos)
117    }
118
119    fn get_float(&self, pos: usize) -> Result<f32> {
120        self.decoded_row().get_float(pos)
121    }
122
123    fn get_double(&self, pos: usize) -> Result<f64> {
124        self.decoded_row().get_double(pos)
125    }
126
127    fn get_char(&self, pos: usize, length: usize) -> Result<&str> {
128        self.decoded_row().get_char(pos, length)
129    }
130
131    fn get_string(&self, pos: usize) -> Result<&str> {
132        self.decoded_row().get_string(pos)
133    }
134
135    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result<Decimal> {
136        self.decoded_row().get_decimal(pos, precision, scale)
137    }
138
139    fn get_date(&self, pos: usize) -> Result<Date> {
140        self.decoded_row().get_date(pos)
141    }
142
143    fn get_time(&self, pos: usize) -> Result<Time> {
144        self.decoded_row().get_time(pos)
145    }
146
147    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> Result<TimestampNtz> {
148        self.decoded_row().get_timestamp_ntz(pos, precision)
149    }
150
151    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> Result<TimestampLtz> {
152        self.decoded_row().get_timestamp_ltz(pos, precision)
153    }
154
155    fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]> {
156        self.decoded_row().get_binary(pos, length)
157    }
158
159    fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
160        self.decoded_row().get_bytes(pos)
161    }
162
163    fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
164        match write_format {
165            WriteFormat::CompactedKv => Some(self.as_bytes()),
166            WriteFormat::ArrowLog => None,
167            WriteFormat::CompactedLog => None,
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::row::binary::BinaryWriter;
176
177    use crate::metadata::{
178        BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType, IntType, SmallIntType,
179        StringType, TinyIntType,
180    };
181    use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
182
183    #[test]
184    fn test_compacted_row() {
185        // Test all primitive types
186        let row_type = RowType::with_data_types(vec![
187            DataType::Boolean(BooleanType::new()),
188            DataType::TinyInt(TinyIntType::new()),
189            DataType::SmallInt(SmallIntType::new()),
190            DataType::Int(IntType::new()),
191            DataType::BigInt(BigIntType::new()),
192            DataType::Float(FloatType::new()),
193            DataType::Double(DoubleType::new()),
194            DataType::String(StringType::new()),
195            DataType::Bytes(BytesType::new()),
196        ]);
197
198        let mut writer = CompactedRowWriter::new(row_type.fields().len());
199
200        writer.write_boolean(true);
201        writer.write_byte(1);
202        writer.write_short(100);
203        writer.write_int(1000);
204        writer.write_long(10000);
205        writer.write_float(1.5);
206        writer.write_double(2.5);
207        writer.write_string("Hello World");
208        writer.write_bytes(&[1, 2, 3, 4, 5]);
209
210        let bytes = writer.to_bytes();
211        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
212
213        assert_eq!(row.get_field_count(), 9);
214        assert!(row.get_boolean(0).unwrap());
215        assert_eq!(row.get_byte(1).unwrap(), 1);
216        assert_eq!(row.get_short(2).unwrap(), 100);
217        assert_eq!(row.get_int(3).unwrap(), 1000);
218        assert_eq!(row.get_long(4).unwrap(), 10000);
219        assert_eq!(row.get_float(5).unwrap(), 1.5);
220        assert_eq!(row.get_double(6).unwrap(), 2.5);
221        assert_eq!(row.get_string(7).unwrap(), "Hello World");
222        assert_eq!(row.get_bytes(8).unwrap(), &[1, 2, 3, 4, 5]);
223
224        // Test with nulls and negative values
225        let row_type = RowType::with_data_types(vec![
226            DataType::Int(IntType::new()),
227            DataType::String(StringType::new()),
228            DataType::Double(DoubleType::new()),
229        ]);
230
231        let mut writer = CompactedRowWriter::new(row_type.fields().len());
232        writer.write_int(-42);
233        writer.set_null_at(1);
234        writer.write_double(2.71);
235
236        let bytes = writer.to_bytes();
237        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
238
239        assert!(!row.is_null_at(0).unwrap());
240        assert!(row.is_null_at(1).unwrap());
241        assert!(!row.is_null_at(2).unwrap());
242        assert_eq!(row.get_int(0).unwrap(), -42);
243        assert_eq!(row.get_double(2).unwrap(), 2.71);
244        // Verify caching works on repeated reads
245        assert_eq!(row.get_int(0).unwrap(), -42);
246    }
247
248    #[test]
249    fn test_compacted_row_temporal_and_decimal_types() {
250        // Comprehensive test covering DATE, TIME, TIMESTAMP (compact/non-compact), and DECIMAL (compact/non-compact)
251        use crate::metadata::{DataTypes, DecimalType, TimestampLTzType, TimestampType};
252        use crate::row::Decimal;
253        use crate::row::datum::{TimestampLtz, TimestampNtz};
254        use bigdecimal::{BigDecimal, num_bigint::BigInt};
255
256        let row_type = RowType::with_data_types(vec![
257            DataTypes::date(),
258            DataTypes::time(),
259            DataType::Timestamp(TimestampType::with_nullable(true, 3).unwrap()), // Compact (precision <= 3)
260            DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 3).unwrap()), // Compact
261            DataType::Timestamp(TimestampType::with_nullable(true, 6).unwrap()), // Non-compact (precision > 3)
262            DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 9).unwrap()), // Non-compact
263            DataType::Decimal(DecimalType::new(10, 2).unwrap()), // Compact (precision <= 18)
264            DataType::Decimal(DecimalType::new(28, 10).unwrap()), // Non-compact (precision > 18)
265        ]);
266
267        let mut writer = CompactedRowWriter::new(row_type.fields().len());
268
269        // Write values
270        writer.write_int(19651); // Date: 2023-10-25
271        writer.write_time(34200000, 0); // Time: 09:30:00.0
272        writer.write_timestamp_ntz(&TimestampNtz::new(1698235273182), 3); // Compact timestamp
273        writer.write_timestamp_ltz(&TimestampLtz::new(1698235273182), 3); // Compact timestamp ltz
274        let ts_ntz_high = TimestampNtz::from_millis_nanos(1698235273182, 123456).unwrap();
275        let ts_ltz_high = TimestampLtz::from_millis_nanos(1698235273182, 987654).unwrap();
276        writer.write_timestamp_ntz(&ts_ntz_high, 6); // Non-compact timestamp with nanos
277        writer.write_timestamp_ltz(&ts_ltz_high, 9); // Non-compact timestamp ltz with nanos
278
279        // Create Decimal values for testing
280        let small_decimal =
281            Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 10, 2).unwrap(); // Compact decimal: 123.45
282        let large_decimal = Decimal::from_big_decimal(
283            BigDecimal::new(BigInt::from(999999999999999999i128), 10),
284            28,
285            10,
286        )
287        .unwrap(); // Non-compact decimal
288
289        writer.write_decimal(&small_decimal, 10);
290        writer.write_decimal(&large_decimal, 28);
291
292        let bytes = writer.to_bytes();
293        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
294
295        // Verify all values
296        assert_eq!(row.get_date(0).unwrap().get_inner(), 19651);
297        assert_eq!(row.get_time(1).unwrap().get_inner(), 34200000);
298        assert_eq!(
299            row.get_timestamp_ntz(2, 3).unwrap().get_millisecond(),
300            1698235273182
301        );
302        assert_eq!(
303            row.get_timestamp_ltz(3, 3).unwrap().get_epoch_millisecond(),
304            1698235273182
305        );
306        let read_ts_ntz = row.get_timestamp_ntz(4, 6).unwrap();
307        assert_eq!(read_ts_ntz.get_millisecond(), 1698235273182);
308        assert_eq!(read_ts_ntz.get_nano_of_millisecond(), 123456);
309        let read_ts_ltz = row.get_timestamp_ltz(5, 9).unwrap();
310        assert_eq!(read_ts_ltz.get_epoch_millisecond(), 1698235273182);
311        assert_eq!(read_ts_ltz.get_nano_of_millisecond(), 987654);
312        // Assert on Decimal equality
313        assert_eq!(row.get_decimal(6, 10, 2).unwrap(), small_decimal);
314        assert_eq!(row.get_decimal(7, 28, 10).unwrap(), large_decimal);
315
316        // Assert on Decimal components to catch any regressions
317        let read_small_decimal = row.get_decimal(6, 10, 2).unwrap();
318        assert_eq!(read_small_decimal.precision(), 10);
319        assert_eq!(read_small_decimal.scale(), 2);
320        assert_eq!(read_small_decimal.to_unscaled_long().unwrap(), 12345);
321
322        let read_large_decimal = row.get_decimal(7, 28, 10).unwrap();
323        assert_eq!(read_large_decimal.precision(), 28);
324        assert_eq!(read_large_decimal.scale(), 10);
325        assert_eq!(
326            read_large_decimal.to_unscaled_long().unwrap(),
327            999999999999999999i64
328        );
329    }
330}