lance_encoding/previous/encodings/physical/
packed_struct.rs1use std::sync::Arc;
5
6use arrow_schema::{DataType, Fields};
7use bytes::Bytes;
8use bytes::BytesMut;
9use futures::{future::BoxFuture, FutureExt};
10use lance_arrow::DataTypeExt;
11use lance_core::{Error, Result};
12use snafu::location;
13
14use crate::data::BlockInfo;
15use crate::data::FixedSizeListBlock;
16use crate::format::ProtobufUtils;
17use crate::{
18 buffer::LanceBuffer,
19 data::{DataBlock, FixedWidthDataBlock, StructDataBlock},
20 decoder::{PageScheduler, PrimitivePageDecoder},
21 previous::encoder::{ArrayEncoder, EncodedArray},
22 EncodingsIo,
23};
24
25#[derive(Debug)]
26pub struct PackedStructPageScheduler {
27 _inner_schedulers: Vec<Box<dyn PageScheduler>>,
31 fields: Fields,
32 buffer_offset: u64,
33}
34
35impl PackedStructPageScheduler {
36 pub fn new(
37 _inner_schedulers: Vec<Box<dyn PageScheduler>>,
38 struct_datatype: DataType,
39 buffer_offset: u64,
40 ) -> Self {
41 let DataType::Struct(fields) = struct_datatype else {
42 panic!("Struct datatype expected");
43 };
44 Self {
45 _inner_schedulers,
46 fields,
47 buffer_offset,
48 }
49 }
50}
51
52impl PageScheduler for PackedStructPageScheduler {
53 fn schedule_ranges(
54 &self,
55 ranges: &[std::ops::Range<u64>],
56 scheduler: &Arc<dyn EncodingsIo>,
57 top_level_row: u64,
58 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
59 let mut total_bytes_per_row: u64 = 0;
60
61 for field in &self.fields {
62 let bytes_per_field = field.data_type().byte_width() as u64;
63 total_bytes_per_row += bytes_per_field;
64 }
65
66 let byte_ranges = ranges
72 .iter()
73 .map(|range| {
74 let start = self.buffer_offset + (range.start * total_bytes_per_row);
75 let end = self.buffer_offset + (range.end * total_bytes_per_row);
76 start..end
77 })
78 .collect::<Vec<_>>();
79
80 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
82
83 let copy_struct_fields = self.fields.clone();
84
85 tokio::spawn(async move {
86 let bytes = bytes.await?;
87
88 let mut combined_bytes = BytesMut::default();
89 for byte_slice in bytes {
90 combined_bytes.extend_from_slice(&byte_slice);
91 }
92
93 Ok(Box::new(PackedStructPageDecoder {
94 data: combined_bytes.freeze(),
95 fields: copy_struct_fields,
96 total_bytes_per_row: total_bytes_per_row as usize,
97 }) as Box<dyn PrimitivePageDecoder>)
98 })
99 .map(|join_handle| join_handle.unwrap())
100 .boxed()
101 }
102}
103
104struct PackedStructPageDecoder {
105 data: Bytes,
106 fields: Fields,
107 total_bytes_per_row: usize,
108}
109
110impl PrimitivePageDecoder for PackedStructPageDecoder {
111 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
112 let bytes_to_skip = (rows_to_skip as usize) * self.total_bytes_per_row;
127
128 let mut children = Vec::with_capacity(self.fields.len());
129
130 let mut start_index = 0;
131
132 for field in &self.fields {
133 let bytes_per_field = field.data_type().byte_width();
134 let mut field_bytes = Vec::with_capacity(bytes_per_field * num_rows as usize);
135
136 let mut byte_index = start_index;
137
138 for _ in 0..num_rows {
139 let start = bytes_to_skip + byte_index;
140 field_bytes.extend_from_slice(&self.data[start..(start + bytes_per_field)]);
141 byte_index += self.total_bytes_per_row;
142 }
143
144 start_index += bytes_per_field;
145 let child_block = FixedWidthDataBlock {
146 data: LanceBuffer::from(field_bytes),
147 bits_per_value: bytes_per_field as u64 * 8,
148 num_values: num_rows,
149 block_info: BlockInfo::new(),
150 };
151 let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type());
152 children.push(child_block);
153 }
154 Ok(DataBlock::Struct(StructDataBlock {
155 children,
156 block_info: BlockInfo::default(),
157 validity: None,
158 }))
159 }
160}
161
162#[derive(Debug)]
163pub struct PackedStructEncoder {
164 inner_encoders: Vec<Box<dyn ArrayEncoder>>,
165}
166
167impl PackedStructEncoder {
168 pub fn new(inner_encoders: Vec<Box<dyn ArrayEncoder>>) -> Self {
169 Self { inner_encoders }
170 }
171}
172
173impl ArrayEncoder for PackedStructEncoder {
174 fn encode(
175 &self,
176 data: DataBlock,
177 data_type: &DataType,
178 buffer_index: &mut u32,
179 ) -> Result<EncodedArray> {
180 let struct_data = data.as_struct().unwrap();
181
182 let DataType::Struct(child_types) = data_type else {
183 panic!("Struct datatype expected");
184 };
185
186 let mut encoded_fields = Vec::with_capacity(struct_data.children.len());
188 for ((child, encoder), child_type) in struct_data
189 .children
190 .into_iter()
191 .zip(&self.inner_encoders)
192 .zip(child_types)
193 {
194 encoded_fields.push(encoder.encode(child, child_type.data_type(), &mut 0)?);
195 }
196
197 let (encoded_data_vec, child_encodings): (Vec<_>, Vec<_>) = encoded_fields
198 .into_iter()
199 .map(|field| (field.data, field.encoding))
200 .unzip();
201
202 let fixed_fields = encoded_data_vec
208 .into_iter()
209 .map(|child| match child {
210 DataBlock::FixedWidth(fixed) => Ok(fixed),
211 DataBlock::FixedSizeList(fixed_size_list) => {
212 let flattened = fixed_size_list.try_into_flat().ok_or_else(|| {
213 Error::invalid_input(
214 "Packed struct encoder cannot pack nullable fixed-width data blocks",
215 location!(),
216 )
217 })?;
218 Ok(flattened)
219 }
220 _ => Err(Error::invalid_input(
221 "Packed struct encoder currently only implemented for fixed-width data blocks",
222 location!(),
223 )),
224 })
225 .collect::<Result<Vec<_>>>()?;
226 let total_bits_per_value = fixed_fields.iter().map(|f| f.bits_per_value).sum::<u64>();
227
228 let num_values = fixed_fields[0].num_values;
229 debug_assert!(fixed_fields
230 .iter()
231 .all(|field| field.num_values == num_values));
232
233 let zipped_input = fixed_fields
234 .into_iter()
235 .map(|field| (field.data, field.bits_per_value))
236 .collect::<Vec<_>>();
237 let zipped = LanceBuffer::zip_into_one(zipped_input, num_values)?;
238
239 let index = *buffer_index;
241 *buffer_index += 1;
242
243 let packed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
244 data: zipped,
245 bits_per_value: total_bits_per_value,
246 num_values,
247 block_info: BlockInfo::new(),
248 });
249
250 let encoding = ProtobufUtils::packed_struct(child_encodings, index);
251
252 Ok(EncodedArray {
253 data: packed_data,
254 encoding,
255 })
256 }
257}
258
259#[cfg(test)]
260pub mod tests {
261
262 use arrow_array::{ArrayRef, Int32Array, StructArray, UInt64Array, UInt8Array};
263 use arrow_schema::{DataType, Field, Fields};
264 use std::{collections::HashMap, sync::Arc, vec};
265
266 use crate::testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases};
267
268 #[test_log::test(tokio::test)]
269 async fn test_random_packed_struct() {
270 let data_type = DataType::Struct(Fields::from(vec![
271 Field::new("a", DataType::UInt64, false),
272 Field::new("b", DataType::UInt32, false),
273 ]));
274 let mut metadata = HashMap::new();
275 metadata.insert("packed".to_string(), "true".to_string());
276
277 let field = Field::new("", data_type, false).with_metadata(metadata);
278
279 check_basic_random(field).await;
280 }
281
282 #[test_log::test(tokio::test)]
283 async fn test_specific_packed_struct() {
284 let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4]));
285 let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
286 let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12]));
287
288 let struct_array1 = Arc::new(StructArray::from(vec![
289 (
290 Arc::new(Field::new("x", DataType::UInt64, false)),
291 array1.clone() as ArrayRef,
292 ),
293 (
294 Arc::new(Field::new("y", DataType::Int32, false)),
295 array2.clone() as ArrayRef,
296 ),
297 (
298 Arc::new(Field::new("z", DataType::UInt8, false)),
299 array3.clone() as ArrayRef,
300 ),
301 ]));
302
303 let array4 = Arc::new(UInt64Array::from(vec![13, 14, 15, 16]));
304 let array5 = Arc::new(Int32Array::from(vec![17, 18, 19, 20]));
305 let array6 = Arc::new(UInt8Array::from(vec![21, 22, 23, 24]));
306
307 let struct_array2 = Arc::new(StructArray::from(vec![
308 (
309 Arc::new(Field::new("x", DataType::UInt64, false)),
310 array4.clone() as ArrayRef,
311 ),
312 (
313 Arc::new(Field::new("y", DataType::Int32, false)),
314 array5.clone() as ArrayRef,
315 ),
316 (
317 Arc::new(Field::new("z", DataType::UInt8, false)),
318 array6.clone() as ArrayRef,
319 ),
320 ]));
321
322 let test_cases = TestCases::default()
323 .with_range(0..2)
324 .with_range(0..6)
325 .with_range(1..4)
326 .with_indices(vec![1, 3, 7]);
327
328 let mut metadata = HashMap::new();
329 metadata.insert("packed".to_string(), "true".to_string());
330
331 check_round_trip_encoding_of_data(
332 vec![struct_array1, struct_array2],
333 &test_cases,
334 metadata,
335 )
336 .await;
337 }
338}