1use crate::client::WriteFormat;
19use crate::error::Result;
20use crate::metadata::RowType;
21use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader};
22use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
23use crate::row::{Decimal, GenericRow, InternalRow};
24use std::sync::{Arc, OnceLock};
25
26#[allow(dead_code)]
29pub struct CompactedRow<'a> {
30 arity: usize,
31 size_in_bytes: usize,
32 decoded_row: OnceLock<GenericRow<'a>>,
33 deserializer: Arc<CompactedRowDeserializer<'a>>,
34 reader: CompactedRowReader<'a>,
35 data: &'a [u8],
36}
37
38pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
39 arity.div_ceil(8)
40}
41
42#[allow(dead_code)]
43impl<'a> CompactedRow<'a> {
44 pub fn from_bytes(row_type: &'a RowType, data: &'a [u8]) -> Self {
45 Self::deserialize(
46 Arc::new(CompactedRowDeserializer::new(row_type)),
47 row_type.fields().len(),
48 data,
49 )
50 }
51
52 pub fn deserialize(
53 deserializer: Arc<CompactedRowDeserializer<'a>>,
54 arity: usize,
55 data: &'a [u8],
56 ) -> Self {
57 Self {
58 arity,
59 size_in_bytes: data.len(),
60 decoded_row: OnceLock::new(),
61 deserializer: Arc::clone(&deserializer),
62 reader: CompactedRowReader::new(arity, data, 0, data.len()),
63 data,
64 }
65 }
66
67 pub fn get_size_in_bytes(&self) -> usize {
68 self.size_in_bytes
69 }
70
71 fn decoded_row(&self) -> &GenericRow<'_> {
72 self.decoded_row
73 .get_or_init(|| self.deserializer.deserialize(&self.reader))
74 }
75
76 pub fn as_bytes(&self) -> &[u8] {
77 self.data
78 }
79}
80
81impl<'a> InternalRow for CompactedRow<'a> {
82 fn get_field_count(&self) -> usize {
83 self.arity
84 }
85
86 fn is_null_at(&self, pos: usize) -> Result<bool> {
87 let fields = self.deserializer.get_row_type().fields();
88 if pos >= fields.len() {
89 return Err(crate::error::Error::IllegalArgument {
90 message: format!(
91 "position {pos} out of bounds (row has {} fields)",
92 fields.len()
93 ),
94 });
95 }
96 Ok(fields.as_slice()[pos].data_type.is_nullable() && self.reader.is_null_at(pos))
97 }
98
99 fn get_boolean(&self, pos: usize) -> Result<bool> {
100 self.decoded_row().get_boolean(pos)
101 }
102
103 fn get_byte(&self, pos: usize) -> Result<i8> {
104 self.decoded_row().get_byte(pos)
105 }
106
107 fn get_short(&self, pos: usize) -> Result<i16> {
108 self.decoded_row().get_short(pos)
109 }
110
111 fn get_int(&self, pos: usize) -> Result<i32> {
112 self.decoded_row().get_int(pos)
113 }
114
115 fn get_long(&self, pos: usize) -> Result<i64> {
116 self.decoded_row().get_long(pos)
117 }
118
119 fn get_float(&self, pos: usize) -> Result<f32> {
120 self.decoded_row().get_float(pos)
121 }
122
123 fn get_double(&self, pos: usize) -> Result<f64> {
124 self.decoded_row().get_double(pos)
125 }
126
127 fn get_char(&self, pos: usize, length: usize) -> Result<&str> {
128 self.decoded_row().get_char(pos, length)
129 }
130
131 fn get_string(&self, pos: usize) -> Result<&str> {
132 self.decoded_row().get_string(pos)
133 }
134
135 fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result<Decimal> {
136 self.decoded_row().get_decimal(pos, precision, scale)
137 }
138
139 fn get_date(&self, pos: usize) -> Result<Date> {
140 self.decoded_row().get_date(pos)
141 }
142
143 fn get_time(&self, pos: usize) -> Result<Time> {
144 self.decoded_row().get_time(pos)
145 }
146
147 fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> Result<TimestampNtz> {
148 self.decoded_row().get_timestamp_ntz(pos, precision)
149 }
150
151 fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> Result<TimestampLtz> {
152 self.decoded_row().get_timestamp_ltz(pos, precision)
153 }
154
155 fn get_binary(&self, pos: usize, length: usize) -> Result<&[u8]> {
156 self.decoded_row().get_binary(pos, length)
157 }
158
159 fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
160 self.decoded_row().get_bytes(pos)
161 }
162
163 fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> {
164 match write_format {
165 WriteFormat::CompactedKv => Some(self.as_bytes()),
166 WriteFormat::ArrowLog => None,
167 WriteFormat::CompactedLog => None,
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use crate::row::binary::BinaryWriter;
176
177 use crate::metadata::{
178 BigIntType, BooleanType, BytesType, DataType, DoubleType, FloatType, IntType, SmallIntType,
179 StringType, TinyIntType,
180 };
181 use crate::row::compacted::compacted_row_writer::CompactedRowWriter;
182
183 #[test]
184 fn test_compacted_row() {
185 let row_type = RowType::with_data_types(vec![
187 DataType::Boolean(BooleanType::new()),
188 DataType::TinyInt(TinyIntType::new()),
189 DataType::SmallInt(SmallIntType::new()),
190 DataType::Int(IntType::new()),
191 DataType::BigInt(BigIntType::new()),
192 DataType::Float(FloatType::new()),
193 DataType::Double(DoubleType::new()),
194 DataType::String(StringType::new()),
195 DataType::Bytes(BytesType::new()),
196 ]);
197
198 let mut writer = CompactedRowWriter::new(row_type.fields().len());
199
200 writer.write_boolean(true);
201 writer.write_byte(1);
202 writer.write_short(100);
203 writer.write_int(1000);
204 writer.write_long(10000);
205 writer.write_float(1.5);
206 writer.write_double(2.5);
207 writer.write_string("Hello World");
208 writer.write_bytes(&[1, 2, 3, 4, 5]);
209
210 let bytes = writer.to_bytes();
211 let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
212
213 assert_eq!(row.get_field_count(), 9);
214 assert!(row.get_boolean(0).unwrap());
215 assert_eq!(row.get_byte(1).unwrap(), 1);
216 assert_eq!(row.get_short(2).unwrap(), 100);
217 assert_eq!(row.get_int(3).unwrap(), 1000);
218 assert_eq!(row.get_long(4).unwrap(), 10000);
219 assert_eq!(row.get_float(5).unwrap(), 1.5);
220 assert_eq!(row.get_double(6).unwrap(), 2.5);
221 assert_eq!(row.get_string(7).unwrap(), "Hello World");
222 assert_eq!(row.get_bytes(8).unwrap(), &[1, 2, 3, 4, 5]);
223
224 let row_type = RowType::with_data_types(vec![
226 DataType::Int(IntType::new()),
227 DataType::String(StringType::new()),
228 DataType::Double(DoubleType::new()),
229 ]);
230
231 let mut writer = CompactedRowWriter::new(row_type.fields().len());
232 writer.write_int(-42);
233 writer.set_null_at(1);
234 writer.write_double(2.71);
235
236 let bytes = writer.to_bytes();
237 let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
238
239 assert!(!row.is_null_at(0).unwrap());
240 assert!(row.is_null_at(1).unwrap());
241 assert!(!row.is_null_at(2).unwrap());
242 assert_eq!(row.get_int(0).unwrap(), -42);
243 assert_eq!(row.get_double(2).unwrap(), 2.71);
244 assert_eq!(row.get_int(0).unwrap(), -42);
246 }
247
248 #[test]
249 fn test_compacted_row_temporal_and_decimal_types() {
250 use crate::metadata::{DataTypes, DecimalType, TimestampLTzType, TimestampType};
252 use crate::row::Decimal;
253 use crate::row::datum::{TimestampLtz, TimestampNtz};
254 use bigdecimal::{BigDecimal, num_bigint::BigInt};
255
256 let row_type = RowType::with_data_types(vec![
257 DataTypes::date(),
258 DataTypes::time(),
259 DataType::Timestamp(TimestampType::with_nullable(true, 3).unwrap()), DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 3).unwrap()), DataType::Timestamp(TimestampType::with_nullable(true, 6).unwrap()), DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 9).unwrap()), DataType::Decimal(DecimalType::new(10, 2).unwrap()), DataType::Decimal(DecimalType::new(28, 10).unwrap()), ]);
266
267 let mut writer = CompactedRowWriter::new(row_type.fields().len());
268
269 writer.write_int(19651); writer.write_time(34200000, 0); writer.write_timestamp_ntz(&TimestampNtz::new(1698235273182), 3); writer.write_timestamp_ltz(&TimestampLtz::new(1698235273182), 3); let ts_ntz_high = TimestampNtz::from_millis_nanos(1698235273182, 123456).unwrap();
275 let ts_ltz_high = TimestampLtz::from_millis_nanos(1698235273182, 987654).unwrap();
276 writer.write_timestamp_ntz(&ts_ntz_high, 6); writer.write_timestamp_ltz(&ts_ltz_high, 9); let small_decimal =
281 Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 10, 2).unwrap(); let large_decimal = Decimal::from_big_decimal(
283 BigDecimal::new(BigInt::from(999999999999999999i128), 10),
284 28,
285 10,
286 )
287 .unwrap(); writer.write_decimal(&small_decimal, 10);
290 writer.write_decimal(&large_decimal, 28);
291
292 let bytes = writer.to_bytes();
293 let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
294
295 assert_eq!(row.get_date(0).unwrap().get_inner(), 19651);
297 assert_eq!(row.get_time(1).unwrap().get_inner(), 34200000);
298 assert_eq!(
299 row.get_timestamp_ntz(2, 3).unwrap().get_millisecond(),
300 1698235273182
301 );
302 assert_eq!(
303 row.get_timestamp_ltz(3, 3).unwrap().get_epoch_millisecond(),
304 1698235273182
305 );
306 let read_ts_ntz = row.get_timestamp_ntz(4, 6).unwrap();
307 assert_eq!(read_ts_ntz.get_millisecond(), 1698235273182);
308 assert_eq!(read_ts_ntz.get_nano_of_millisecond(), 123456);
309 let read_ts_ltz = row.get_timestamp_ltz(5, 9).unwrap();
310 assert_eq!(read_ts_ltz.get_epoch_millisecond(), 1698235273182);
311 assert_eq!(read_ts_ltz.get_nano_of_millisecond(), 987654);
312 assert_eq!(row.get_decimal(6, 10, 2).unwrap(), small_decimal);
314 assert_eq!(row.get_decimal(7, 28, 10).unwrap(), large_decimal);
315
316 let read_small_decimal = row.get_decimal(6, 10, 2).unwrap();
318 assert_eq!(read_small_decimal.precision(), 10);
319 assert_eq!(read_small_decimal.scale(), 2);
320 assert_eq!(read_small_decimal.to_unscaled_long().unwrap(), 12345);
321
322 let read_large_decimal = row.get_decimal(7, 28, 10).unwrap();
323 assert_eq!(read_large_decimal.precision(), 28);
324 assert_eq!(read_large_decimal.scale(), 10);
325 assert_eq!(
326 read_large_decimal.to_unscaled_long().unwrap(),
327 999999999999999999i64
328 );
329 }
330}