lance_encoding/previous/encodings/physical/
packed_struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_schema::{DataType, Fields};
7use bytes::Bytes;
8use bytes::BytesMut;
9use futures::{future::BoxFuture, FutureExt};
10use lance_arrow::DataTypeExt;
11use lance_core::{Error, Result};
12use snafu::location;
13
14use crate::data::BlockInfo;
15use crate::data::FixedSizeListBlock;
16use crate::format::ProtobufUtils;
17use crate::{
18    buffer::LanceBuffer,
19    data::{DataBlock, FixedWidthDataBlock, StructDataBlock},
20    decoder::{PageScheduler, PrimitivePageDecoder},
21    previous::encoder::{ArrayEncoder, EncodedArray},
22    EncodingsIo,
23};
24
25#[derive(Debug)]
26pub struct PackedStructPageScheduler {
27    // We don't actually need these schedulers right now since we decode all the field bytes directly
28    // But they can be useful if we actually need to use the decoders for the inner fields later
29    // e.g. once bitpacking is added
30    _inner_schedulers: Vec<Box<dyn PageScheduler>>,
31    fields: Fields,
32    buffer_offset: u64,
33}
34
35impl PackedStructPageScheduler {
36    pub fn new(
37        _inner_schedulers: Vec<Box<dyn PageScheduler>>,
38        struct_datatype: DataType,
39        buffer_offset: u64,
40    ) -> Self {
41        let DataType::Struct(fields) = struct_datatype else {
42            panic!("Struct datatype expected");
43        };
44        Self {
45            _inner_schedulers,
46            fields,
47            buffer_offset,
48        }
49    }
50}
51
52impl PageScheduler for PackedStructPageScheduler {
53    fn schedule_ranges(
54        &self,
55        ranges: &[std::ops::Range<u64>],
56        scheduler: &Arc<dyn EncodingsIo>,
57        top_level_row: u64,
58    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
59        let mut total_bytes_per_row: u64 = 0;
60
61        for field in &self.fields {
62            let bytes_per_field = field.data_type().byte_width() as u64;
63            total_bytes_per_row += bytes_per_field;
64        }
65
66        // Parts of the arrays in a page may be encoded in different encoding tasks
67        // In that case decoding two different sets of rows can result in the same ranges parameter being passed in
68        // e.g. we may get ranges[0..2] and ranges[0..2] to decode 4 rows through 2 tasks
69        // So to get the correct byte ranges we need to know the position of the buffer in the page (i.e. the buffer offset)
70        // This is computed directly from the buffer stored in the protobuf
71        let byte_ranges = ranges
72            .iter()
73            .map(|range| {
74                let start = self.buffer_offset + (range.start * total_bytes_per_row);
75                let end = self.buffer_offset + (range.end * total_bytes_per_row);
76                start..end
77            })
78            .collect::<Vec<_>>();
79
80        // Directly creates a future to decode the bytes
81        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
82
83        let copy_struct_fields = self.fields.clone();
84
85        tokio::spawn(async move {
86            let bytes = bytes.await?;
87
88            let mut combined_bytes = BytesMut::default();
89            for byte_slice in bytes {
90                combined_bytes.extend_from_slice(&byte_slice);
91            }
92
93            Ok(Box::new(PackedStructPageDecoder {
94                data: combined_bytes.freeze(),
95                fields: copy_struct_fields,
96                total_bytes_per_row: total_bytes_per_row as usize,
97            }) as Box<dyn PrimitivePageDecoder>)
98        })
99        .map(|join_handle| join_handle.unwrap())
100        .boxed()
101    }
102}
103
104struct PackedStructPageDecoder {
105    data: Bytes,
106    fields: Fields,
107    total_bytes_per_row: usize,
108}
109
110impl PrimitivePageDecoder for PackedStructPageDecoder {
111    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
112        // Decoding workflow:
113        // rows 0-2: {x: [1, 2, 3], y: [4, 5, 6], z: [7, 8, 9]}
114        // rows 3-5: {x: [10, 11, 12], y: [13, 14, 15], z: [16, 17, 18]}
115        // packed encoding: [
116        // [1, 4, 7, 2, 5, 8, 3, 6, 9],
117        // [10, 13, 16, 11, 14, 17, 12, 15, 18]
118        // ]
119        // suppose bytes_per_field=1, 4, 8 for fields x, y, and z, respectively.
120        // Then total_bytes_per_row = 13
121        // Suppose rows_to_skip=1 and num_rows=2. Then we will slice bytes 13 to 39.
122        // Now we have [2, 5, 8, 3, 6, 9]
123        // We rearrange this to get [BytesMut(2, 3), BytesMut(5, 6), BytesMut(8, 9)] as a Vec<BytesMut>
124        // This is used to reconstruct the struct array later
125
126        let bytes_to_skip = (rows_to_skip as usize) * self.total_bytes_per_row;
127
128        let mut children = Vec::with_capacity(self.fields.len());
129
130        let mut start_index = 0;
131
132        for field in &self.fields {
133            let bytes_per_field = field.data_type().byte_width();
134            let mut field_bytes = Vec::with_capacity(bytes_per_field * num_rows as usize);
135
136            let mut byte_index = start_index;
137
138            for _ in 0..num_rows {
139                let start = bytes_to_skip + byte_index;
140                field_bytes.extend_from_slice(&self.data[start..(start + bytes_per_field)]);
141                byte_index += self.total_bytes_per_row;
142            }
143
144            start_index += bytes_per_field;
145            let child_block = FixedWidthDataBlock {
146                data: LanceBuffer::from(field_bytes),
147                bits_per_value: bytes_per_field as u64 * 8,
148                num_values: num_rows,
149                block_info: BlockInfo::new(),
150            };
151            let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type());
152            children.push(child_block);
153        }
154        Ok(DataBlock::Struct(StructDataBlock {
155            children,
156            block_info: BlockInfo::default(),
157            validity: None,
158        }))
159    }
160}
161
162#[derive(Debug)]
163pub struct PackedStructEncoder {
164    inner_encoders: Vec<Box<dyn ArrayEncoder>>,
165}
166
167impl PackedStructEncoder {
168    pub fn new(inner_encoders: Vec<Box<dyn ArrayEncoder>>) -> Self {
169        Self { inner_encoders }
170    }
171}
172
173impl ArrayEncoder for PackedStructEncoder {
174    fn encode(
175        &self,
176        data: DataBlock,
177        data_type: &DataType,
178        buffer_index: &mut u32,
179    ) -> Result<EncodedArray> {
180        let struct_data = data.as_struct().unwrap();
181
182        let DataType::Struct(child_types) = data_type else {
183            panic!("Struct datatype expected");
184        };
185
186        // Encode individual fields
187        let mut encoded_fields = Vec::with_capacity(struct_data.children.len());
188        for ((child, encoder), child_type) in struct_data
189            .children
190            .into_iter()
191            .zip(&self.inner_encoders)
192            .zip(child_types)
193        {
194            encoded_fields.push(encoder.encode(child, child_type.data_type(), &mut 0)?);
195        }
196
197        let (encoded_data_vec, child_encodings): (Vec<_>, Vec<_>) = encoded_fields
198            .into_iter()
199            .map(|field| (field.data, field.encoding))
200            .unzip();
201
202        // Zip together encoded data
203        //
204        // We can currently encode both FixedWidth and FixedSizeList.  In order
205        // to encode the latter we "flatten" it converting a FixedSizeList into
206        // a FixedWidth with very wide items.
207        let fixed_fields = encoded_data_vec
208            .into_iter()
209            .map(|child| match child {
210                DataBlock::FixedWidth(fixed) => Ok(fixed),
211                DataBlock::FixedSizeList(fixed_size_list) => {
212                    let flattened = fixed_size_list.try_into_flat().ok_or_else(|| {
213                        Error::invalid_input(
214                            "Packed struct encoder cannot pack nullable fixed-width data blocks",
215                            location!(),
216                        )
217                    })?;
218                    Ok(flattened)
219                }
220                _ => Err(Error::invalid_input(
221                    "Packed struct encoder currently only implemented for fixed-width data blocks",
222                    location!(),
223                )),
224            })
225            .collect::<Result<Vec<_>>>()?;
226        let total_bits_per_value = fixed_fields.iter().map(|f| f.bits_per_value).sum::<u64>();
227
228        let num_values = fixed_fields[0].num_values;
229        debug_assert!(fixed_fields
230            .iter()
231            .all(|field| field.num_values == num_values));
232
233        let zipped_input = fixed_fields
234            .into_iter()
235            .map(|field| (field.data, field.bits_per_value))
236            .collect::<Vec<_>>();
237        let zipped = LanceBuffer::zip_into_one(zipped_input, num_values)?;
238
239        // Create encoding protobuf
240        let index = *buffer_index;
241        *buffer_index += 1;
242
243        let packed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
244            data: zipped,
245            bits_per_value: total_bits_per_value,
246            num_values,
247            block_info: BlockInfo::new(),
248        });
249
250        let encoding = ProtobufUtils::packed_struct(child_encodings, index);
251
252        Ok(EncodedArray {
253            data: packed_data,
254            encoding,
255        })
256    }
257}
258
259#[cfg(test)]
260pub mod tests {
261
262    use arrow_array::{ArrayRef, Int32Array, StructArray, UInt64Array, UInt8Array};
263    use arrow_schema::{DataType, Field, Fields};
264    use std::{collections::HashMap, sync::Arc, vec};
265
266    use crate::testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases};
267
268    #[test_log::test(tokio::test)]
269    async fn test_random_packed_struct() {
270        let data_type = DataType::Struct(Fields::from(vec![
271            Field::new("a", DataType::UInt64, false),
272            Field::new("b", DataType::UInt32, false),
273        ]));
274        let mut metadata = HashMap::new();
275        metadata.insert("packed".to_string(), "true".to_string());
276
277        let field = Field::new("", data_type, false).with_metadata(metadata);
278
279        check_basic_random(field).await;
280    }
281
282    #[test_log::test(tokio::test)]
283    async fn test_specific_packed_struct() {
284        let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4]));
285        let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
286        let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12]));
287
288        let struct_array1 = Arc::new(StructArray::from(vec![
289            (
290                Arc::new(Field::new("x", DataType::UInt64, false)),
291                array1.clone() as ArrayRef,
292            ),
293            (
294                Arc::new(Field::new("y", DataType::Int32, false)),
295                array2.clone() as ArrayRef,
296            ),
297            (
298                Arc::new(Field::new("z", DataType::UInt8, false)),
299                array3.clone() as ArrayRef,
300            ),
301        ]));
302
303        let array4 = Arc::new(UInt64Array::from(vec![13, 14, 15, 16]));
304        let array5 = Arc::new(Int32Array::from(vec![17, 18, 19, 20]));
305        let array6 = Arc::new(UInt8Array::from(vec![21, 22, 23, 24]));
306
307        let struct_array2 = Arc::new(StructArray::from(vec![
308            (
309                Arc::new(Field::new("x", DataType::UInt64, false)),
310                array4.clone() as ArrayRef,
311            ),
312            (
313                Arc::new(Field::new("y", DataType::Int32, false)),
314                array5.clone() as ArrayRef,
315            ),
316            (
317                Arc::new(Field::new("z", DataType::UInt8, false)),
318                array6.clone() as ArrayRef,
319            ),
320        ]));
321
322        let test_cases = TestCases::default()
323            .with_range(0..2)
324            .with_range(0..6)
325            .with_range(1..4)
326            .with_indices(vec![1, 3, 7]);
327
328        let mut metadata = HashMap::new();
329        metadata.insert("packed".to_string(), "true".to_string());
330
331        check_round_trip_encoding_of_data(
332            vec![struct_array1, struct_array2],
333            &test_cases,
334            metadata,
335        )
336        .await;
337    }
338}