1use std::sync::Arc;
5
6use arrow_schema::{DataType, Fields};
7use bytes::Bytes;
8use bytes::BytesMut;
9use futures::{future::BoxFuture, FutureExt};
10use lance_arrow::DataTypeExt;
11use lance_core::{Error, Result};
12use snafu::location;
13
14use crate::data::BlockInfo;
15use crate::data::FixedSizeListBlock;
16use crate::format::ProtobufUtils;
17use crate::{
18 buffer::LanceBuffer,
19 data::{DataBlock, FixedWidthDataBlock, StructDataBlock},
20 decoder::{PageScheduler, PrimitivePageDecoder},
21 previous::encoder::{ArrayEncoder, EncodedArray},
22 EncodingsIo,
23};
24
25#[derive(Debug)]
26pub struct PackedStructPageScheduler {
27 _inner_schedulers: Vec<Box<dyn PageScheduler>>,
31 fields: Fields,
32 buffer_offset: u64,
33}
34
35impl PackedStructPageScheduler {
36 pub fn new(
37 _inner_schedulers: Vec<Box<dyn PageScheduler>>,
38 struct_datatype: DataType,
39 buffer_offset: u64,
40 ) -> Self {
41 let DataType::Struct(fields) = struct_datatype else {
42 panic!("Struct datatype expected");
43 };
44 Self {
45 _inner_schedulers,
46 fields,
47 buffer_offset,
48 }
49 }
50}
51
52impl PageScheduler for PackedStructPageScheduler {
53 fn schedule_ranges(
54 &self,
55 ranges: &[std::ops::Range<u64>],
56 scheduler: &Arc<dyn EncodingsIo>,
57 top_level_row: u64,
58 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
59 let mut total_bytes_per_row: u64 = 0;
60
61 for field in &self.fields {
62 let bytes_per_field = field.data_type().byte_width() as u64;
63 total_bytes_per_row += bytes_per_field;
64 }
65
66 let byte_ranges = ranges
72 .iter()
73 .map(|range| {
74 let start = self.buffer_offset + (range.start * total_bytes_per_row);
75 let end = self.buffer_offset + (range.end * total_bytes_per_row);
76 start..end
77 })
78 .collect::<Vec<_>>();
79
80 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
82
83 let copy_struct_fields = self.fields.clone();
84
85 tokio::spawn(async move {
86 let bytes = bytes.await?;
87
88 let mut combined_bytes = BytesMut::default();
89 for byte_slice in bytes {
90 combined_bytes.extend_from_slice(&byte_slice);
91 }
92
93 Ok(Box::new(PackedStructPageDecoder {
94 data: combined_bytes.freeze(),
95 fields: copy_struct_fields,
96 total_bytes_per_row: total_bytes_per_row as usize,
97 }) as Box<dyn PrimitivePageDecoder>)
98 })
99 .map(|join_handle| join_handle.unwrap())
100 .boxed()
101 }
102}
103
104struct PackedStructPageDecoder {
105 data: Bytes,
106 fields: Fields,
107 total_bytes_per_row: usize,
108}
109
110impl PrimitivePageDecoder for PackedStructPageDecoder {
111 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
112 let bytes_to_skip = (rows_to_skip as usize) * self.total_bytes_per_row;
127
128 let mut children = Vec::with_capacity(self.fields.len());
129
130 let mut start_index = 0;
131
132 for field in &self.fields {
133 let bytes_per_field = field.data_type().byte_width();
134 let mut field_bytes = Vec::with_capacity(bytes_per_field * num_rows as usize);
135
136 let mut byte_index = start_index;
137
138 for _ in 0..num_rows {
139 let start = bytes_to_skip + byte_index;
140 field_bytes.extend_from_slice(&self.data[start..(start + bytes_per_field)]);
141 byte_index += self.total_bytes_per_row;
142 }
143
144 start_index += bytes_per_field;
145 let child_block = FixedWidthDataBlock {
146 data: LanceBuffer::from(field_bytes),
147 bits_per_value: bytes_per_field as u64 * 8,
148 num_values: num_rows,
149 block_info: BlockInfo::new(),
150 };
151 let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type());
152 children.push(child_block);
153 }
154 Ok(DataBlock::Struct(StructDataBlock {
155 children,
156 block_info: BlockInfo::default(),
157 validity: None,
158 }))
159 }
160}
161
162#[derive(Debug)]
163pub struct PackedStructEncoder {
164 inner_encoders: Vec<Box<dyn ArrayEncoder>>,
165}
166
167impl PackedStructEncoder {
168 pub fn new(inner_encoders: Vec<Box<dyn ArrayEncoder>>) -> Self {
169 Self { inner_encoders }
170 }
171}
172
173impl ArrayEncoder for PackedStructEncoder {
174 fn encode(
175 &self,
176 data: DataBlock,
177 data_type: &DataType,
178 buffer_index: &mut u32,
179 ) -> Result<EncodedArray> {
180 let struct_data = data.as_struct().unwrap();
181
182 let DataType::Struct(child_types) = data_type else {
183 panic!("Struct datatype expected");
184 };
185
186 let mut encoded_fields = Vec::with_capacity(struct_data.children.len());
188 for ((child, encoder), child_type) in struct_data
189 .children
190 .into_iter()
191 .zip(&self.inner_encoders)
192 .zip(child_types)
193 {
194 encoded_fields.push(encoder.encode(child, child_type.data_type(), &mut 0)?);
195 }
196
197 let (encoded_data_vec, child_encodings): (Vec<_>, Vec<_>) = encoded_fields
198 .into_iter()
199 .map(|field| (field.data, field.encoding))
200 .unzip();
201
202 let fixed_fields = encoded_data_vec
208 .into_iter()
209 .map(|child| match child {
210 DataBlock::FixedWidth(fixed) => Ok(fixed),
211 DataBlock::FixedSizeList(fixed_size_list) => {
212 let flattened = fixed_size_list.try_into_flat().ok_or_else(|| {
213 Error::invalid_input(
214 "Packed struct encoder cannot pack nullable fixed-width data blocks",
215 location!(),
216 )
217 })?;
218 Ok(flattened)
219 }
220 _ => Err(Error::invalid_input(
221 "Packed struct encoder currently only implemented for fixed-width data blocks",
222 location!(),
223 )),
224 })
225 .collect::<Result<Vec<_>>>()?;
226 let total_bits_per_value = fixed_fields.iter().map(|f| f.bits_per_value).sum::<u64>();
227
228 let num_values = fixed_fields[0].num_values;
229 debug_assert!(fixed_fields
230 .iter()
231 .all(|field| field.num_values == num_values));
232
233 let zipped_input = fixed_fields
234 .into_iter()
235 .map(|field| (field.data, field.bits_per_value))
236 .collect::<Vec<_>>();
237 let zipped = LanceBuffer::zip_into_one(zipped_input, num_values)?;
238
239 let index = *buffer_index;
241 *buffer_index += 1;
242
243 let packed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
244 data: zipped,
245 bits_per_value: total_bits_per_value,
246 num_values,
247 block_info: BlockInfo::new(),
248 });
249
250 let encoding = ProtobufUtils::packed_struct(child_encodings, index);
251
252 Ok(EncodedArray {
253 data: packed_data,
254 encoding,
255 })
256 }
257}
258
259#[cfg(test)]
260pub mod tests {
261
262 use arrow_array::{
263 Array, ArrayRef, FixedSizeListArray, Int32Array, StructArray, UInt64Array, UInt8Array,
264 };
265 use arrow_data::ArrayData;
266 use arrow_schema::{DataType, Field, Fields};
267 use std::{collections::HashMap, sync::Arc, vec};
268
269 use crate::{
270 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
271 version::LanceFileVersion,
272 };
273 use rstest::rstest;
274
275 #[rstest]
276 #[test_log::test(tokio::test)]
277 async fn test_random_packed_struct(
278 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
279 ) {
280 let data_type = DataType::Struct(Fields::from(vec![
281 Field::new("a", DataType::UInt64, false),
282 Field::new("b", DataType::UInt32, false),
283 ]));
284 let mut metadata = HashMap::new();
285 metadata.insert("packed".to_string(), "true".to_string());
286
287 let field = Field::new("", data_type, false).with_metadata(metadata);
288
289 check_round_trip_encoding_random(field, version).await;
290 }
291
292 #[rstest]
293 #[test_log::test(tokio::test)]
294 async fn test_specific_packed_struct(
295 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
296 ) {
297 let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4]));
298 let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
299 let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12]));
300
301 let struct_array1 = Arc::new(StructArray::from(vec![
302 (
303 Arc::new(Field::new("x", DataType::UInt64, false)),
304 array1.clone() as ArrayRef,
305 ),
306 (
307 Arc::new(Field::new("y", DataType::Int32, false)),
308 array2.clone() as ArrayRef,
309 ),
310 (
311 Arc::new(Field::new("z", DataType::UInt8, false)),
312 array3.clone() as ArrayRef,
313 ),
314 ]));
315
316 let array4 = Arc::new(UInt64Array::from(vec![13, 14, 15, 16]));
317 let array5 = Arc::new(Int32Array::from(vec![17, 18, 19, 20]));
318 let array6 = Arc::new(UInt8Array::from(vec![21, 22, 23, 24]));
319
320 let struct_array2 = Arc::new(StructArray::from(vec![
321 (
322 Arc::new(Field::new("x", DataType::UInt64, false)),
323 array4.clone() as ArrayRef,
324 ),
325 (
326 Arc::new(Field::new("y", DataType::Int32, false)),
327 array5.clone() as ArrayRef,
328 ),
329 (
330 Arc::new(Field::new("z", DataType::UInt8, false)),
331 array6.clone() as ArrayRef,
332 ),
333 ]));
334
335 let test_cases = TestCases::default()
336 .with_range(0..2)
337 .with_range(0..6)
338 .with_range(1..4)
339 .with_indices(vec![1, 3, 7])
340 .with_file_version(version);
341
342 let mut metadata = HashMap::new();
343 metadata.insert("packed".to_string(), "true".to_string());
344
345 check_round_trip_encoding_of_data(
346 vec![struct_array1, struct_array2],
347 &test_cases,
348 metadata,
349 )
350 .await;
351 }
352
353 #[rstest]
356 #[test_log::test(tokio::test)]
357 async fn test_fsl_packed_struct(
358 #[values()]
359 version: LanceFileVersion,
360 ) {
361 let int_array = Arc::new(Int32Array::from(vec![12, 13, 14, 15]));
362
363 let list_data_type =
364 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3);
365 let inner_array = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
366 let list_data = ArrayData::builder(list_data_type.clone())
367 .len(4)
368 .add_child_data(inner_array.into_data())
369 .build()
370 .unwrap();
371 let list_array = FixedSizeListArray::from(list_data);
372
373 let struct_array = Arc::new(StructArray::from(vec![
374 (
375 Arc::new(Field::new("x", list_data_type.clone(), false)),
376 Arc::new(list_array) as ArrayRef,
377 ),
378 (
379 Arc::new(Field::new("x", DataType::Int32, false)),
380 int_array as ArrayRef,
381 ),
382 ]));
383
384 let test_cases = TestCases::default()
385 .with_range(1..3)
386 .with_range(0..1)
387 .with_range(2..4)
388 .with_indices(vec![0, 2, 3])
389 .with_file_version(version);
390
391 let mut metadata = HashMap::new();
392 metadata.insert("packed".to_string(), "true".to_string());
393
394 check_round_trip_encoding_of_data(vec![struct_array], &test_cases, metadata).await;
395 }
396}