1use crate::metadata::RowType;
19use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
20use crate::{
21 metadata::DataType,
22 row::{Datum, Decimal, GenericRow, compacted::compacted_row_writer::CompactedRowWriter},
23 util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
24};
25use std::borrow::Cow;
26use std::str::from_utf8;
27
28#[allow(dead_code)]
29#[derive(Clone)]
30pub struct CompactedRowDeserializer<'a> {
31 row_type: Cow<'a, RowType>,
32}
33
34#[allow(dead_code)]
35impl<'a> CompactedRowDeserializer<'a> {
36 pub fn new(row_type: &'a RowType) -> Self {
37 Self {
38 row_type: Cow::Borrowed(row_type),
39 }
40 }
41
42 pub fn new_from_owned(row_type: RowType) -> Self {
43 Self {
44 row_type: Cow::Owned(row_type),
45 }
46 }
47
48 pub fn get_row_type(&self) -> &RowType {
49 self.row_type.as_ref()
50 }
51
52 pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> {
53 let mut row = GenericRow::new(self.row_type.fields().len());
54 let mut cursor = reader.initial_position();
55 for (col_pos, data_field) in self.row_type.fields().iter().enumerate() {
56 let dtype = &data_field.data_type;
57 if dtype.is_nullable() && reader.is_null_at(col_pos) {
58 row.set_field(col_pos, Datum::Null);
59 continue;
60 }
61 let (datum, next_cursor) = match dtype {
62 DataType::Boolean(_) => {
63 let (val, next) = reader.read_boolean(cursor);
64 (Datum::Bool(val), next)
65 }
66 DataType::TinyInt(_) => {
67 let (val, next) = reader.read_byte(cursor);
68 (Datum::Int8(val as i8), next)
69 }
70 DataType::SmallInt(_) => {
71 let (val, next) = reader.read_short(cursor);
72 (Datum::Int16(val), next)
73 }
74 DataType::Int(_) => {
75 let (val, next) = reader.read_int(cursor);
76 (Datum::Int32(val), next)
77 }
78 DataType::BigInt(_) => {
79 let (val, next) = reader.read_long(cursor);
80 (Datum::Int64(val), next)
81 }
82 DataType::Float(_) => {
83 let (val, next) = reader.read_float(cursor);
84 (Datum::Float32(val.into()), next)
85 }
86 DataType::Double(_) => {
87 let (val, next) = reader.read_double(cursor);
88 (Datum::Float64(val.into()), next)
89 }
90 DataType::Char(_) | DataType::String(_) => {
92 let (val, next) = reader.read_string(cursor);
93 (Datum::String(val.into()), next)
94 }
95 DataType::Bytes(_) | DataType::Binary(_) => {
97 let (val, next) = reader.read_bytes(cursor);
98 (Datum::Blob(val.into()), next)
99 }
100 DataType::Decimal(decimal_type) => {
101 let precision = decimal_type.precision();
102 let scale = decimal_type.scale();
103 if Decimal::is_compact_precision(precision) {
104 let (val, next) = reader.read_long(cursor);
106 let decimal = Decimal::from_unscaled_long(val, precision, scale)
107 .expect("Failed to create decimal from unscaled long");
108 (Datum::Decimal(decimal), next)
109 } else {
110 let (bytes, next) = reader.read_bytes(cursor);
112 let decimal = Decimal::from_unscaled_bytes(bytes, precision, scale)
113 .expect("Failed to create decimal from unscaled bytes");
114 (Datum::Decimal(decimal), next)
115 }
116 }
117 DataType::Date(_) => {
118 let (val, next) = reader.read_int(cursor);
119 (Datum::Date(crate::row::datum::Date::new(val)), next)
120 }
121 DataType::Time(_) => {
122 let (val, next) = reader.read_int(cursor);
123 (Datum::Time(crate::row::datum::Time::new(val)), next)
124 }
125 DataType::Timestamp(timestamp_type) => {
126 let precision = timestamp_type.precision();
127 if crate::row::datum::TimestampNtz::is_compact(precision) {
128 let (millis, next) = reader.read_long(cursor);
130 (
131 Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(millis)),
132 next,
133 )
134 } else {
135 let (millis, mid) = reader.read_long(cursor);
137 let (nanos, next) = reader.read_int(mid);
138 let timestamp =
139 crate::row::datum::TimestampNtz::from_millis_nanos(millis, nanos)
140 .expect("Invalid nano_of_millisecond value in compacted row");
141 (Datum::TimestampNtz(timestamp), next)
142 }
143 }
144 DataType::TimestampLTz(timestamp_ltz_type) => {
145 let precision = timestamp_ltz_type.precision();
146 if crate::row::datum::TimestampLtz::is_compact(precision) {
147 let (epoch_millis, next) = reader.read_long(cursor);
149 (
150 Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(epoch_millis)),
151 next,
152 )
153 } else {
154 let (epoch_millis, mid) = reader.read_long(cursor);
156 let (nanos, next) = reader.read_int(mid);
157 let timestamp_ltz =
158 crate::row::datum::TimestampLtz::from_millis_nanos(epoch_millis, nanos)
159 .expect("Invalid nano_of_millisecond value in compacted row");
160 (Datum::TimestampLtz(timestamp_ltz), next)
161 }
162 }
163 _ => {
164 panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}");
165 }
166 };
167 cursor = next_cursor;
168 row.set_field(col_pos, datum);
169 }
170 row
171 }
172}
173
174#[allow(dead_code)]
177pub struct CompactedRowReader<'a> {
178 segment: &'a [u8],
179 offset: usize,
180 limit: usize,
181 header_size_in_bytes: usize,
182}
183
184#[allow(dead_code)]
185impl<'a> CompactedRowReader<'a> {
186 pub fn new(field_count: usize, data: &'a [u8], offset: usize, length: usize) -> Self {
187 let header_size_in_bytes = calculate_bit_set_width_in_bytes(field_count);
188 let limit = offset + length;
189 let position = offset + header_size_in_bytes;
190 debug_assert!(limit <= data.len());
191 debug_assert!(position <= limit);
192
193 CompactedRowReader {
194 segment: data,
195 offset,
196 limit,
197 header_size_in_bytes,
198 }
199 }
200
201 fn initial_position(&self) -> usize {
202 self.offset + self.header_size_in_bytes
203 }
204
205 pub fn is_null_at(&self, col_pos: usize) -> bool {
206 let byte_index = col_pos >> 3;
207 let bit = col_pos & 7;
208 debug_assert!(byte_index < self.header_size_in_bytes);
209 let idx = self.offset + byte_index;
210 (self.segment[idx] & (1u8 << bit)) != 0
211 }
212
213 pub fn read_boolean(&self, pos: usize) -> (bool, usize) {
214 let (val, next) = self.read_byte(pos);
215 (val != 0, next)
216 }
217
218 pub fn read_byte(&self, pos: usize) -> (u8, usize) {
219 debug_assert!(pos < self.limit);
220 (self.segment[pos], pos + 1)
221 }
222
223 pub fn read_short(&self, pos: usize) -> (i16, usize) {
224 let next_pos = pos + 2;
225 debug_assert!(next_pos <= self.limit);
226 let bytes_slice = &self.segment[pos..pos + 2];
227 let val = i16::from_ne_bytes(
228 bytes_slice
229 .try_into()
230 .expect("Slice must be exactly 2 bytes long"),
231 );
232 (val, next_pos)
233 }
234
235 pub fn read_int(&self, pos: usize) -> (i32, usize) {
236 match read_unsigned_varint_at(self.segment, pos, CompactedRowWriter::MAX_INT_SIZE) {
237 Ok((value, next_pos)) => (value as i32, next_pos),
238 Err(_) => panic!("Invalid VarInt32 input stream."),
239 }
240 }
241
242 pub fn read_long(&self, pos: usize) -> (i64, usize) {
243 match read_unsigned_varint_u64_at(self.segment, pos, CompactedRowWriter::MAX_LONG_SIZE) {
244 Ok((value, next_pos)) => (value as i64, next_pos),
245 Err(_) => panic!("Invalid VarInt64 input stream."),
246 }
247 }
248
249 pub fn read_float(&self, pos: usize) -> (f32, usize) {
250 let next_pos = pos + 4;
251 debug_assert!(next_pos <= self.limit);
252 let val = f32::from_ne_bytes(
253 self.segment[pos..pos + 4]
254 .try_into()
255 .expect("Slice must be exactly 4 bytes long"),
256 );
257 (val, next_pos)
258 }
259
260 pub fn read_double(&self, pos: usize) -> (f64, usize) {
261 let next_pos = pos + 8;
262 debug_assert!(next_pos <= self.limit);
263 let val = f64::from_ne_bytes(
264 self.segment[pos..pos + 8]
265 .try_into()
266 .expect("Slice must be exactly 8 bytes long"),
267 );
268 (val, next_pos)
269 }
270
271 pub fn read_binary(&self, pos: usize) -> (&'a [u8], usize) {
272 self.read_bytes(pos)
273 }
274
275 pub fn read_bytes(&self, pos: usize) -> (&'a [u8], usize) {
276 let (len, data_pos) = self.read_int(pos);
277 let len = len as usize;
278 let next_pos = data_pos + len;
279 debug_assert!(next_pos <= self.limit);
280 (&self.segment[data_pos..next_pos], next_pos)
281 }
282
283 pub fn read_string(&self, pos: usize) -> (&'a str, usize) {
284 let (bytes, next_pos) = self.read_bytes(pos);
285 let s = from_utf8(bytes).expect("Invalid UTF-8 when reading string");
286 (s, next_pos)
287 }
288}