lance_encoding/encodings/physical/
fixed_size_list.rs1use std::sync::Arc;
5
6use arrow_schema::DataType;
7use futures::{future::BoxFuture, FutureExt};
8use lance_core::Result;
9use log::trace;
10
11use crate::{
12 data::{DataBlock, FixedSizeListBlock},
13 decoder::{PageScheduler, PrimitivePageDecoder},
14 encoder::{ArrayEncoder, EncodedArray},
15 format::ProtobufUtils,
16 EncodingsIo,
17};
18
19#[derive(Debug)]
23pub struct FixedListScheduler {
24 items_scheduler: Box<dyn PageScheduler>,
25 dimension: u32,
26}
27
28impl FixedListScheduler {
29 pub fn new(items_scheduler: Box<dyn PageScheduler>, dimension: u32) -> Self {
30 Self {
31 items_scheduler,
32 dimension,
33 }
34 }
35}
36
37impl PageScheduler for FixedListScheduler {
38 fn schedule_ranges(
39 &self,
40 ranges: &[std::ops::Range<u64>],
41 scheduler: &Arc<dyn EncodingsIo>,
42 top_level_row: u64,
43 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
44 let expanded_ranges = ranges
45 .iter()
46 .map(|range| (range.start * self.dimension as u64)..(range.end * self.dimension as u64))
47 .collect::<Vec<_>>();
48 trace!(
49 "Expanding {} fsl ranges across {}..{} to item ranges across {}..{}",
50 ranges.len(),
51 ranges[0].start,
52 ranges[ranges.len() - 1].end,
53 expanded_ranges[0].start,
54 expanded_ranges[expanded_ranges.len() - 1].end
55 );
56 let inner_page_decoder =
57 self.items_scheduler
58 .schedule_ranges(&expanded_ranges, scheduler, top_level_row);
59 let dimension = self.dimension;
60 async move {
61 let items_decoder = inner_page_decoder.await?;
62 Ok(Box::new(FixedListDecoder {
63 items_decoder,
64 dimension: dimension as u64,
65 }) as Box<dyn PrimitivePageDecoder>)
66 }
67 .boxed()
68 }
69}
70
71pub struct FixedListDecoder {
72 items_decoder: Box<dyn PrimitivePageDecoder>,
73 dimension: u64,
74}
75
76impl PrimitivePageDecoder for FixedListDecoder {
77 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
78 let rows_to_skip = rows_to_skip * self.dimension;
79 let num_child_rows = num_rows * self.dimension;
80 let child_data = self.items_decoder.decode(rows_to_skip, num_child_rows)?;
81 Ok(DataBlock::FixedSizeList(FixedSizeListBlock {
82 child: Box::new(child_data),
83 dimension: self.dimension,
84 }))
85 }
86}
87
88#[derive(Debug)]
89pub struct FslEncoder {
90 items_encoder: Box<dyn ArrayEncoder>,
91 dimension: u32,
92}
93
94impl FslEncoder {
95 pub fn new(items_encoder: Box<dyn ArrayEncoder>, dimension: u32) -> Self {
96 Self {
97 items_encoder,
98 dimension,
99 }
100 }
101}
102
103impl ArrayEncoder for FslEncoder {
104 fn encode(
105 &self,
106 data: DataBlock,
107 data_type: &DataType,
108 buffer_index: &mut u32,
109 ) -> Result<EncodedArray> {
110 let inner_type = match data_type {
111 DataType::FixedSizeList(inner_field, _) => inner_field.data_type().clone(),
112 _ => panic!("Expected fixed size list data type and got {}", data_type),
113 };
114 let data = data.as_fixed_size_list().unwrap();
115 let child = *data.child;
116
117 let encoded_data = self
118 .items_encoder
119 .encode(child, &inner_type, buffer_index)?;
120
121 let data = DataBlock::FixedSizeList(FixedSizeListBlock {
122 child: Box::new(encoded_data.data),
123 dimension: self.dimension as u64,
124 });
125
126 let encoding =
127 ProtobufUtils::fsl_encoding(self.dimension as u64, encoded_data.encoding, false);
128 Ok(EncodedArray { data, encoding })
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use std::{collections::HashMap, sync::Arc};
135
136 use arrow::datatypes::Int32Type;
137 use arrow_array::{FixedSizeListArray, Int32Array};
138 use arrow_buffer::{BooleanBuffer, NullBuffer};
139 use arrow_schema::{DataType, Field};
140 use lance_datagen::{array, gen_array, ArrayGeneratorExt, RowCount};
141 use rstest::rstest;
142
143 use crate::{
144 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
145 version::LanceFileVersion,
146 };
147
148 const PRIMITIVE_TYPES: &[DataType] = &[DataType::Int8, DataType::Float32, DataType::Float64];
149
150 #[rstest]
151 #[test_log::test(tokio::test)]
152 async fn test_value_fsl_primitive(
153 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
154 ) {
155 for data_type in PRIMITIVE_TYPES {
156 let inner_field = Field::new("item", data_type.clone(), true);
157 let data_type = DataType::FixedSizeList(Arc::new(inner_field), 16);
158 let field = Field::new("", data_type, false);
159 check_round_trip_encoding_random(field, version).await;
160 }
161 }
162
163 #[test_log::test(tokio::test)]
164 async fn test_simple_fsl() {
165 let items = Arc::new(Int32Array::from(vec![
167 Some(0),
168 None,
169 Some(2),
170 Some(3),
171 Some(4),
172 Some(5),
173 ]));
174 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
175 let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
176 let list = Arc::new(FixedSizeListArray::new(
177 items_field,
178 2,
179 items,
180 Some(list_nulls),
181 ));
182
183 let test_cases = TestCases::default()
184 .with_range(0..3)
185 .with_range(0..2)
186 .with_range(1..3)
187 .with_indices(vec![0, 1, 2])
188 .with_indices(vec![1])
189 .with_indices(vec![2])
190 .with_file_version(LanceFileVersion::V2_1);
191
192 check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
193 }
194
195 #[test_log::test(tokio::test)]
196 #[ignore]
197 async fn test_simple_wide_fsl() {
198 let items = gen_array(array::rand::<Int32Type>().with_random_nulls(0.1))
199 .into_array_rows(RowCount::from(4096))
200 .unwrap();
201 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
202 let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false]));
203 let list = Arc::new(FixedSizeListArray::new(
204 items_field,
205 1024,
206 items,
207 Some(list_nulls),
208 ));
209
210 let test_cases = TestCases::default()
211 .with_range(0..3)
212 .with_range(0..2)
213 .with_range(1..3)
214 .with_indices(vec![0, 1, 2])
215 .with_indices(vec![1])
216 .with_indices(vec![2])
217 .with_file_version(LanceFileVersion::V2_1);
218
219 check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
220 }
221
222 #[test_log::test(tokio::test)]
223 async fn test_nested_fsl() {
224 let items = Arc::new(Int32Array::from(vec![
226 Some(0),
227 Some(1),
228 None,
229 None,
230 None,
231 None,
232 None,
233 None,
234 Some(8),
235 Some(9),
236 None,
237 Some(11),
238 ]));
239 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
240 let inner_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![
241 true, false, false, false, true, true,
242 ]));
243 let inner_list = Arc::new(FixedSizeListArray::new(
244 items_field.clone(),
245 2,
246 items,
247 Some(inner_list_nulls),
248 ));
249 let inner_list_field = Arc::new(Field::new(
250 "item",
251 DataType::FixedSizeList(items_field, 2),
252 true,
253 ));
254 let outer_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
255 let outer_list = Arc::new(FixedSizeListArray::new(
256 inner_list_field,
257 2,
258 inner_list,
259 Some(outer_list_nulls),
260 ));
261
262 let test_cases = TestCases::default()
263 .with_range(0..3)
264 .with_range(0..2)
265 .with_range(1..3)
266 .with_indices(vec![0, 1, 2])
267 .with_indices(vec![2])
268 .with_file_version(LanceFileVersion::V2_1);
269
270 check_round_trip_encoding_of_data(vec![outer_list], &test_cases, HashMap::default()).await;
271 }
272}