lance_encoding/encodings/physical/
fixed_size_list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_schema::DataType;
7use futures::{future::BoxFuture, FutureExt};
8use lance_core::Result;
9use log::trace;
10
11use crate::{
12    data::{DataBlock, FixedSizeListBlock},
13    decoder::{PageScheduler, PrimitivePageDecoder},
14    encoder::{ArrayEncoder, EncodedArray},
15    format::ProtobufUtils,
16    EncodingsIo,
17};
18
19/// A scheduler for fixed size lists of primitive values
20///
21/// This scheduler is, itself, primitive
22#[derive(Debug)]
23pub struct FixedListScheduler {
24    items_scheduler: Box<dyn PageScheduler>,
25    dimension: u32,
26}
27
28impl FixedListScheduler {
29    pub fn new(items_scheduler: Box<dyn PageScheduler>, dimension: u32) -> Self {
30        Self {
31            items_scheduler,
32            dimension,
33        }
34    }
35}
36
37impl PageScheduler for FixedListScheduler {
38    fn schedule_ranges(
39        &self,
40        ranges: &[std::ops::Range<u64>],
41        scheduler: &Arc<dyn EncodingsIo>,
42        top_level_row: u64,
43    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
44        let expanded_ranges = ranges
45            .iter()
46            .map(|range| (range.start * self.dimension as u64)..(range.end * self.dimension as u64))
47            .collect::<Vec<_>>();
48        trace!(
49            "Expanding {} fsl ranges across {}..{} to item ranges across {}..{}",
50            ranges.len(),
51            ranges[0].start,
52            ranges[ranges.len() - 1].end,
53            expanded_ranges[0].start,
54            expanded_ranges[expanded_ranges.len() - 1].end
55        );
56        let inner_page_decoder =
57            self.items_scheduler
58                .schedule_ranges(&expanded_ranges, scheduler, top_level_row);
59        let dimension = self.dimension;
60        async move {
61            let items_decoder = inner_page_decoder.await?;
62            Ok(Box::new(FixedListDecoder {
63                items_decoder,
64                dimension: dimension as u64,
65            }) as Box<dyn PrimitivePageDecoder>)
66        }
67        .boxed()
68    }
69}
70
71pub struct FixedListDecoder {
72    items_decoder: Box<dyn PrimitivePageDecoder>,
73    dimension: u64,
74}
75
76impl PrimitivePageDecoder for FixedListDecoder {
77    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
78        let rows_to_skip = rows_to_skip * self.dimension;
79        let num_child_rows = num_rows * self.dimension;
80        let child_data = self.items_decoder.decode(rows_to_skip, num_child_rows)?;
81        Ok(DataBlock::FixedSizeList(FixedSizeListBlock {
82            child: Box::new(child_data),
83            dimension: self.dimension,
84        }))
85    }
86}
87
88#[derive(Debug)]
89pub struct FslEncoder {
90    items_encoder: Box<dyn ArrayEncoder>,
91    dimension: u32,
92}
93
94impl FslEncoder {
95    pub fn new(items_encoder: Box<dyn ArrayEncoder>, dimension: u32) -> Self {
96        Self {
97            items_encoder,
98            dimension,
99        }
100    }
101}
102
103impl ArrayEncoder for FslEncoder {
104    fn encode(
105        &self,
106        data: DataBlock,
107        data_type: &DataType,
108        buffer_index: &mut u32,
109    ) -> Result<EncodedArray> {
110        let inner_type = match data_type {
111            DataType::FixedSizeList(inner_field, _) => inner_field.data_type().clone(),
112            _ => panic!("Expected fixed size list data type and got {}", data_type),
113        };
114        let data = data.as_fixed_size_list().unwrap();
115        let child = *data.child;
116
117        let encoded_data = self
118            .items_encoder
119            .encode(child, &inner_type, buffer_index)?;
120
121        let data = DataBlock::FixedSizeList(FixedSizeListBlock {
122            child: Box::new(encoded_data.data),
123            dimension: self.dimension as u64,
124        });
125
126        let encoding =
127            ProtobufUtils::fsl_encoding(self.dimension as u64, encoded_data.encoding, false);
128        Ok(EncodedArray { data, encoding })
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use std::{collections::HashMap, sync::Arc};
135
136    use arrow::datatypes::Int32Type;
137    use arrow_array::{FixedSizeListArray, Int32Array};
138    use arrow_buffer::{BooleanBuffer, NullBuffer};
139    use arrow_schema::{DataType, Field};
140    use lance_datagen::{array, gen_array, ArrayGeneratorExt, RowCount};
141    use rstest::rstest;
142
143    use crate::{
144        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
145        version::LanceFileVersion,
146    };
147
148    const PRIMITIVE_TYPES: &[DataType] = &[DataType::Int8, DataType::Float32, DataType::Float64];
149
150    #[rstest]
151    #[test_log::test(tokio::test)]
152    async fn test_value_fsl_primitive(
153        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
154    ) {
155        for data_type in PRIMITIVE_TYPES {
156            let inner_field = Field::new("item", data_type.clone(), true);
157            let data_type = DataType::FixedSizeList(Arc::new(inner_field), 16);
158            let field = Field::new("", data_type, false);
159            check_round_trip_encoding_random(field, version).await;
160        }
161    }
162
163    #[test_log::test(tokio::test)]
164    async fn test_simple_fsl() {
165        // [0, NULL], NULL, [4, 5]
166        let items = Arc::new(Int32Array::from(vec![
167            Some(0),
168            None,
169            Some(2),
170            Some(3),
171            Some(4),
172            Some(5),
173        ]));
174        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
175        let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
176        let list = Arc::new(FixedSizeListArray::new(
177            items_field,
178            2,
179            items,
180            Some(list_nulls),
181        ));
182
183        let test_cases = TestCases::default()
184            .with_range(0..3)
185            .with_range(0..2)
186            .with_range(1..3)
187            .with_indices(vec![0, 1, 2])
188            .with_indices(vec![1])
189            .with_indices(vec![2])
190            .with_file_version(LanceFileVersion::V2_1);
191
192        check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
193    }
194
195    #[test_log::test(tokio::test)]
196    #[ignore]
197    async fn test_simple_wide_fsl() {
198        let items = gen_array(array::rand::<Int32Type>().with_random_nulls(0.1))
199            .into_array_rows(RowCount::from(4096))
200            .unwrap();
201        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
202        let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false]));
203        let list = Arc::new(FixedSizeListArray::new(
204            items_field,
205            1024,
206            items,
207            Some(list_nulls),
208        ));
209
210        let test_cases = TestCases::default()
211            .with_range(0..3)
212            .with_range(0..2)
213            .with_range(1..3)
214            .with_indices(vec![0, 1, 2])
215            .with_indices(vec![1])
216            .with_indices(vec![2])
217            .with_file_version(LanceFileVersion::V2_1);
218
219        check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
220    }
221
222    #[test_log::test(tokio::test)]
223    async fn test_nested_fsl() {
224        // [[0, 1], NULL], NULL, [[8, 9], [NULL, 11]]
225        let items = Arc::new(Int32Array::from(vec![
226            Some(0),
227            Some(1),
228            None,
229            None,
230            None,
231            None,
232            None,
233            None,
234            Some(8),
235            Some(9),
236            None,
237            Some(11),
238        ]));
239        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
240        let inner_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![
241            true, false, false, false, true, true,
242        ]));
243        let inner_list = Arc::new(FixedSizeListArray::new(
244            items_field.clone(),
245            2,
246            items,
247            Some(inner_list_nulls),
248        ));
249        let inner_list_field = Arc::new(Field::new(
250            "item",
251            DataType::FixedSizeList(items_field, 2),
252            true,
253        ));
254        let outer_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
255        let outer_list = Arc::new(FixedSizeListArray::new(
256            inner_list_field,
257            2,
258            inner_list,
259            Some(outer_list_nulls),
260        ));
261
262        let test_cases = TestCases::default()
263            .with_range(0..3)
264            .with_range(0..2)
265            .with_range(1..3)
266            .with_indices(vec![0, 1, 2])
267            .with_indices(vec![2])
268            .with_file_version(LanceFileVersion::V2_1);
269
270        check_round_trip_encoding_of_data(vec![outer_list], &test_cases, HashMap::default()).await;
271    }
272}