1use std::{ops::Range, sync::Arc};
11
12use arrow_array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, StructArray, cast::AsArray};
13use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer};
14use arrow_schema::DataType;
15use futures::future::BoxFuture;
16use lance_arrow::deepcopy::deep_copy_nulls;
17use lance_core::{Error, Result};
18
19use crate::{
20 decoder::{
21 DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
22 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
23 StructuralSchedulingJob,
24 },
25 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
26 repdef::RepDefBuilder,
27};
28
29pub struct FixedSizeListStructuralEncoder {
34 keep_original_array: bool,
35 child: Box<dyn FieldEncoder>,
36}
37
38impl FixedSizeListStructuralEncoder {
39 pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
40 Self {
41 keep_original_array,
42 child,
43 }
44 }
45}
46
47impl FieldEncoder for FixedSizeListStructuralEncoder {
48 fn maybe_encode(
49 &mut self,
50 array: ArrayRef,
51 external_buffers: &mut OutOfLineBuffers,
52 mut repdef: RepDefBuilder,
53 row_number: u64,
54 num_rows: u64,
55 ) -> Result<Vec<EncodeTask>> {
56 let fsl_arr = array.as_fixed_size_list_opt().ok_or_else(|| {
57 Error::internal("FixedSizeList encoder used for non-fixed-size-list data".to_string())
58 })?;
59
60 let dimension = fsl_arr.value_length() as usize;
61 let values = fsl_arr.values().clone();
62
63 let validity = if self.keep_original_array {
64 array.nulls().cloned()
65 } else {
66 deep_copy_nulls(array.nulls())
67 };
68 repdef.add_fsl(validity.clone(), dimension, num_rows as usize);
69
70 let values = if let Some(ref fsl_validity) = validity {
73 if needs_garbage_filtering(values.data_type()) {
74 let is_garbage =
75 expand_garbage_mask(&fsl_validity_to_garbage_mask(fsl_validity), dimension);
76 filter_fsl_child_garbage(values, &is_garbage)
77 } else {
78 values
79 }
80 } else {
81 values
82 };
83
84 self.child.maybe_encode(
85 values,
86 external_buffers,
87 repdef,
88 row_number,
89 num_rows * dimension as u64,
90 )
91 }
92
93 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
94 self.child.flush(external_buffers)
95 }
96
97 fn num_columns(&self) -> u32 {
98 self.child.num_columns()
99 }
100
101 fn finish(
102 &mut self,
103 external_buffers: &mut OutOfLineBuffers,
104 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
105 self.child.finish(external_buffers)
106 }
107}
108
109#[derive(Debug)]
114pub struct StructuralFixedSizeListScheduler {
115 child: Box<dyn StructuralFieldScheduler>,
116 dimension: u64,
117}
118
119impl StructuralFixedSizeListScheduler {
120 pub fn new(child: Box<dyn StructuralFieldScheduler>, dimension: i32) -> Self {
121 Self {
122 child,
123 dimension: dimension as u64,
124 }
125 }
126}
127
128impl StructuralFieldScheduler for StructuralFixedSizeListScheduler {
129 fn schedule_ranges<'a>(
130 &'a self,
131 ranges: &[Range<u64>],
132 filter: &FilterExpression,
133 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
134 let child_ranges: Vec<Range<u64>> = ranges
136 .iter()
137 .map(|r| (r.start * self.dimension)..(r.end * self.dimension))
138 .collect();
139 let child = self.child.schedule_ranges(&child_ranges, filter)?;
140 Ok(Box::new(StructuralFixedSizeListSchedulingJob::new(
141 child,
142 self.dimension,
143 )))
144 }
145
146 fn initialize<'a>(
147 &'a mut self,
148 filter: &'a FilterExpression,
149 context: &'a SchedulerContext,
150 ) -> BoxFuture<'a, Result<()>> {
151 self.child.initialize(filter, context)
152 }
153}
154
155#[derive(Debug)]
156struct StructuralFixedSizeListSchedulingJob<'a> {
157 child: Box<dyn StructuralSchedulingJob + 'a>,
158 dimension: u64,
159}
160
161impl<'a> StructuralFixedSizeListSchedulingJob<'a> {
162 fn new(child: Box<dyn StructuralSchedulingJob + 'a>, dimension: u64) -> Self {
163 Self { child, dimension }
164 }
165}
166
167impl StructuralSchedulingJob for StructuralFixedSizeListSchedulingJob<'_> {
168 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
169 let child_scan_lines = self.child.schedule_next(context)?;
171
172 Ok(child_scan_lines
174 .into_iter()
175 .map(|scan_line| ScheduledScanLine {
176 decoders: scan_line.decoders,
177 rows_scheduled: scan_line.rows_scheduled / self.dimension,
178 })
179 .collect())
180 }
181}
182
183#[derive(Debug)]
188pub struct StructuralFixedSizeListDecoder {
189 child: Box<dyn StructuralFieldDecoder>,
190 data_type: DataType,
191}
192
193impl StructuralFixedSizeListDecoder {
194 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
195 Self { child, data_type }
196 }
197}
198
199impl StructuralFieldDecoder for StructuralFixedSizeListDecoder {
200 fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
201 self.child.accept_page(child)
202 }
203
204 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
205 let dimension = match &self.data_type {
207 DataType::FixedSizeList(_, d) => *d as u64,
208 _ => {
209 return Err(Error::internal(
210 "FixedSizeListDecoder has non-FSL data type".to_string(),
211 ));
212 }
213 };
214 let child_task = self.child.drain(num_rows * dimension)?;
215 Ok(Box::new(StructuralFixedSizeListDecodeTask::new(
216 child_task,
217 self.data_type.clone(),
218 num_rows,
219 )))
220 }
221
222 fn data_type(&self) -> &DataType {
223 &self.data_type
224 }
225}
226
227#[derive(Debug)]
228struct StructuralFixedSizeListDecodeTask {
229 child_task: Box<dyn StructuralDecodeArrayTask>,
230 data_type: DataType,
231 num_rows: u64,
232}
233
234impl StructuralFixedSizeListDecodeTask {
235 fn new(
236 child_task: Box<dyn StructuralDecodeArrayTask>,
237 data_type: DataType,
238 num_rows: u64,
239 ) -> Self {
240 Self {
241 child_task,
242 data_type,
243 num_rows,
244 }
245 }
246}
247
248impl StructuralDecodeArrayTask for StructuralFixedSizeListDecodeTask {
249 fn decode(self: Box<Self>) -> Result<DecodedArray> {
250 let DecodedArray { array, mut repdef } = self.child_task.decode()?;
251 match &self.data_type {
252 DataType::FixedSizeList(child_field, dimension) => {
253 let num_rows = self.num_rows as usize;
254 let validity = repdef.unravel_fsl_validity(num_rows, *dimension as usize);
255 let fsl_array = arrow_array::FixedSizeListArray::try_new(
256 child_field.clone(),
257 *dimension,
258 array,
259 validity,
260 )?;
261 Ok(DecodedArray {
262 array: Arc::new(fsl_array),
263 repdef,
264 })
265 }
266 _ => Err(Error::internal(
267 "FixedSizeList decoder did not have a fixed-size list field".to_string(),
268 )),
269 }
270 }
271}
272
273fn needs_garbage_filtering(data_type: &DataType) -> bool {
280 match data_type {
281 DataType::List(_)
282 | DataType::LargeList(_)
283 | DataType::ListView(_)
284 | DataType::LargeListView(_)
285 | DataType::Map(_, _) => true,
286 DataType::Struct(fields) => fields
287 .iter()
288 .any(|f| needs_garbage_filtering(f.data_type())),
289 DataType::FixedSizeList(field, _) => needs_garbage_filtering(field.data_type()),
290 _ => false,
291 }
292}
293
294fn filter_fsl_child_garbage(array: ArrayRef, is_garbage: &[bool]) -> ArrayRef {
301 debug_assert_eq!(array.len(), is_garbage.len());
302
303 match array.data_type() {
304 DataType::List(_) => filter_list_garbage(array.as_list::<i32>(), is_garbage),
305 DataType::LargeList(_) => filter_list_garbage(array.as_list::<i64>(), is_garbage),
306 DataType::ListView(_) | DataType::LargeListView(_) => {
307 unimplemented!("ListView inside complex FSL is not yet supported")
308 }
309 DataType::Map(_, _) => filter_map_garbage(array.as_map(), is_garbage),
310 DataType::FixedSizeList(_, dim) => {
311 filter_nested_fsl_garbage(array.as_fixed_size_list(), is_garbage, *dim as usize)
312 }
313 DataType::Struct(_) => filter_struct_garbage(array.as_struct(), is_garbage),
314 _ => array,
315 }
316}
317
318fn filter_struct_garbage(struct_arr: &StructArray, is_garbage: &[bool]) -> ArrayRef {
319 let needs_filtering = struct_arr
320 .fields()
321 .iter()
322 .any(|f| needs_garbage_filtering(f.data_type()));
323
324 if !needs_filtering {
325 return Arc::new(struct_arr.clone());
326 }
327
328 let new_columns: Vec<ArrayRef> = struct_arr
329 .columns()
330 .iter()
331 .zip(struct_arr.fields().iter())
332 .map(|(col, field)| {
333 if needs_garbage_filtering(field.data_type()) {
334 filter_fsl_child_garbage(col.clone(), is_garbage)
335 } else {
336 col.clone()
337 }
338 })
339 .collect();
340
341 Arc::new(StructArray::new(
342 struct_arr.fields().clone(),
343 new_columns,
344 struct_arr.nulls().cloned(),
345 ))
346}
347
348fn expand_garbage_mask(is_garbage: &[bool], dimension: usize) -> Vec<bool> {
349 let mut expanded = Vec::with_capacity(is_garbage.len() * dimension);
350 for &garbage in is_garbage {
351 for _ in 0..dimension {
352 expanded.push(garbage);
353 }
354 }
355 expanded
356}
357
358fn fsl_validity_to_garbage_mask(fsl_validity: &NullBuffer) -> Vec<bool> {
359 fsl_validity.iter().map(|valid| !valid).collect()
360}
361
362fn filter_list_garbage<O: OffsetSizeTrait>(
363 list_arr: &GenericListArray<O>,
364 is_garbage: &[bool],
365) -> ArrayRef {
366 debug_assert_eq!(
367 list_arr.len(),
368 is_garbage.len(),
369 "list length must match garbage mask length"
370 );
371
372 let old_offsets = list_arr.offsets();
373 let value_field = match list_arr.data_type() {
374 DataType::List(f) | DataType::LargeList(f) => f.clone(),
375 _ => unreachable!(),
376 };
377
378 let mut new_offsets: Vec<O> = Vec::with_capacity(list_arr.len() + 1);
379 let mut values_to_keep: Vec<usize> = Vec::new();
380 let mut validity_builder = BooleanBufferBuilder::new(list_arr.len());
381 let mut current_offset = O::usize_as(0);
382 new_offsets.push(current_offset);
383 let old_validity = list_arr.nulls();
384
385 for (i, &garbage) in is_garbage.iter().enumerate() {
386 if garbage {
387 new_offsets.push(current_offset);
388 validity_builder.append(false);
389 } else {
390 let start = old_offsets[i].as_usize();
391 let end = old_offsets[i + 1].as_usize();
392 values_to_keep.extend(start..end);
393 current_offset += O::usize_as(end - start);
394 new_offsets.push(current_offset);
395 validity_builder.append(old_validity.map(|v| v.is_valid(i)).unwrap_or(true));
396 }
397 }
398
399 let new_values = if values_to_keep.is_empty() {
400 list_arr.values().slice(0, 0)
401 } else {
402 let indices =
403 arrow_array::UInt64Array::from_iter_values(values_to_keep.iter().map(|&i| i as u64));
404 arrow_select::take::take(list_arr.values().as_ref(), &indices, None)
405 .expect("take should succeed")
406 };
407
408 let new_values = if needs_garbage_filtering(value_field.data_type()) && !new_values.is_empty() {
409 let len = new_values.len();
410 filter_fsl_child_garbage(new_values, &vec![false; len])
411 } else {
412 new_values
413 };
414
415 let new_validity = NullBuffer::new(validity_builder.finish());
416 Arc::new(GenericListArray::new(
417 value_field,
418 OffsetBuffer::new(ScalarBuffer::from(new_offsets)),
419 new_values,
420 Some(new_validity),
421 ))
422}
423
424fn filter_map_garbage(map_arr: &arrow_array::MapArray, is_garbage: &[bool]) -> ArrayRef {
425 debug_assert_eq!(map_arr.len(), is_garbage.len());
426
427 let old_offsets = map_arr.offsets();
428 let entries_field = match map_arr.data_type() {
429 DataType::Map(field, _) => field.clone(),
430 _ => unreachable!(),
431 };
432
433 let mut new_offsets: Vec<i32> = Vec::with_capacity(map_arr.len() + 1);
434 let mut values_to_keep: Vec<usize> = Vec::new();
435 let mut validity_builder = BooleanBufferBuilder::new(map_arr.len());
436 let mut current_offset: i32 = 0;
437 new_offsets.push(current_offset);
438 let old_validity = map_arr.nulls();
439
440 for (i, &garbage) in is_garbage.iter().enumerate() {
441 if garbage {
442 new_offsets.push(current_offset);
443 validity_builder.append(false);
444 } else {
445 let start = old_offsets[i] as usize;
446 let end = old_offsets[i + 1] as usize;
447 values_to_keep.extend(start..end);
448 current_offset += (end - start) as i32;
449 new_offsets.push(current_offset);
450 validity_builder.append(old_validity.map(|v| v.is_valid(i)).unwrap_or(true));
451 }
452 }
453
454 let new_entries: ArrayRef = if values_to_keep.is_empty() {
455 Arc::new(map_arr.entries().slice(0, 0))
456 } else {
457 let indices =
458 arrow_array::UInt64Array::from_iter_values(values_to_keep.iter().map(|&i| i as u64));
459 arrow_select::take::take(map_arr.entries(), &indices, None).expect("take should succeed")
460 };
461
462 let new_entries =
463 if needs_garbage_filtering(entries_field.data_type()) && !new_entries.is_empty() {
464 let len = new_entries.len();
465 filter_fsl_child_garbage(new_entries, &vec![false; len])
466 } else {
467 new_entries
468 };
469
470 let new_validity = NullBuffer::new(validity_builder.finish());
471 let keys_sorted = matches!(map_arr.data_type(), DataType::Map(_, true));
472
473 Arc::new(
474 arrow_array::MapArray::try_new(
475 entries_field,
476 OffsetBuffer::new(ScalarBuffer::from(new_offsets)),
477 new_entries.as_struct().clone(),
478 Some(new_validity),
479 keys_sorted,
480 )
481 .expect("MapArray construction should succeed"),
482 )
483}
484
485fn filter_nested_fsl_garbage(
487 fsl_arr: &arrow_array::FixedSizeListArray,
488 is_garbage: &[bool],
489 dimension: usize,
490) -> ArrayRef {
491 debug_assert_eq!(fsl_arr.len(), is_garbage.len());
492
493 let child_field = match fsl_arr.data_type() {
494 DataType::FixedSizeList(field, _) => field.clone(),
495 _ => unreachable!(),
496 };
497
498 if !needs_garbage_filtering(child_field.data_type()) {
499 return Arc::new(fsl_arr.clone());
500 }
501
502 let child_garbage = expand_garbage_mask(is_garbage, dimension);
503 let new_values = filter_fsl_child_garbage(fsl_arr.values().clone(), &child_garbage);
504
505 Arc::new(arrow_array::FixedSizeListArray::new(
506 child_field,
507 dimension as i32,
508 new_values,
509 fsl_arr.nulls().cloned(),
510 ))
511}
512
513#[cfg(test)]
514mod tests {
515 use std::{collections::HashMap, sync::Arc};
516
517 use arrow_array::{
518 Array, FixedSizeListArray,
519 builder::{Int32Builder, ListBuilder},
520 cast::AsArray,
521 };
522 use arrow_schema::{DataType, Field, Fields};
523 use rstest::rstest;
524
525 use super::filter_nested_fsl_garbage;
526 use crate::{
527 constants::{
528 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
529 STRUCTURAL_ENCODING_MINIBLOCK,
530 },
531 testing::{TestCases, check_specific_random},
532 version::LanceFileVersion,
533 };
534
535 fn make_fsl_struct_type(struct_fields: Fields, dimension: i32) -> DataType {
536 DataType::FixedSizeList(
537 Arc::new(Field::new("item", DataType::Struct(struct_fields), true)),
538 dimension,
539 )
540 }
541
542 fn simple_struct_fields() -> Fields {
543 Fields::from(vec![
544 Field::new("x", DataType::Float64, false),
545 Field::new("y", DataType::Float64, false),
546 ])
547 }
548
549 fn nested_struct_fields() -> Fields {
550 let inner = Fields::from(vec![
551 Field::new("a", DataType::Int32, false),
552 Field::new("b", DataType::Int32, false),
553 ]);
554 Fields::from(vec![
555 Field::new("outer_val", DataType::Float64, false),
556 Field::new("inner", DataType::Struct(inner), true),
557 ])
558 }
559
560 fn nested_struct_with_list_fields() -> Fields {
561 let inner = Fields::from(vec![Field::new(
562 "values",
563 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
564 true,
565 )]);
566 Fields::from(vec![
567 Field::new("id", DataType::Int32, false),
568 Field::new("inner", DataType::Struct(inner), true),
569 ])
570 }
571
572 fn struct_with_list_fields() -> Fields {
573 Fields::from(vec![
574 Field::new("id", DataType::Int32, false),
575 Field::new(
576 "values",
577 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
578 true,
579 ),
580 ])
581 }
582
583 fn struct_with_large_list_fields() -> Fields {
584 Fields::from(vec![
585 Field::new("id", DataType::Int32, false),
586 Field::new(
587 "values",
588 DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))),
589 true,
590 ),
591 ])
592 }
593
594 fn struct_with_nested_fsl_fields() -> Fields {
595 Fields::from(vec![
596 Field::new("id", DataType::Int32, false),
597 Field::new(
598 "vectors",
599 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
600 true,
601 ),
602 ])
603 }
604
605 fn struct_with_map_fields() -> Fields {
606 let entries_field = Arc::new(Field::new(
607 "entries",
608 DataType::Struct(Fields::from(vec![
609 Field::new("keys", DataType::Utf8, false),
610 Field::new("values", DataType::Int32, true),
611 ])),
612 false,
613 ));
614 Fields::from(vec![
615 Field::new("id", DataType::Int32, false),
616 Field::new("props", DataType::Map(entries_field, false), true),
617 ])
618 }
619
620 #[rstest]
621 #[case::simple(simple_struct_fields(), 2, LanceFileVersion::V2_2)]
622 #[case::nested_struct(nested_struct_fields(), 2, LanceFileVersion::V2_2)]
623 #[case::struct_with_list(struct_with_list_fields(), 2, LanceFileVersion::V2_2)]
624 #[case::struct_with_large_list(struct_with_large_list_fields(), 2, LanceFileVersion::V2_2)]
625 #[case::nested_struct_with_list(nested_struct_with_list_fields(), 2, LanceFileVersion::V2_2)]
626 #[case::struct_with_nested_fsl(struct_with_nested_fsl_fields(), 2, LanceFileVersion::V2_2)]
627 #[case::struct_with_map(struct_with_map_fields(), 2, LanceFileVersion::V2_2)]
628 #[test_log::test(tokio::test)]
629 async fn test_fsl_struct_random(
630 #[case] struct_fields: Fields,
631 #[case] dimension: i32,
632 #[case] min_version: LanceFileVersion,
633 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
634 structural_encoding: &str,
635 ) {
636 let data_type = make_fsl_struct_type(struct_fields, dimension);
637 let mut field_metadata = HashMap::new();
638 field_metadata.insert(
639 STRUCTURAL_ENCODING_META_KEY.to_string(),
640 structural_encoding.into(),
641 );
642 let field = Field::new("", data_type, true).with_metadata(field_metadata);
643 let test_cases = TestCases::basic().with_min_file_version(min_version);
644 check_specific_random(field, test_cases).await;
645 }
646
647 #[test]
649 #[should_panic(expected = "Unsupported logical type: list")]
650 fn test_fsl_list_rejected() {
651 let inner = Field::new(
652 "item",
653 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
654 true,
655 );
656 let data_type = DataType::FixedSizeList(Arc::new(inner), 2);
657 let arrow_field = Field::new("test", data_type, true);
658 let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap();
659 let _ = lance_field.data_type();
660 }
661
662 #[test]
663 #[should_panic(expected = "Unsupported logical type: map")]
664 fn test_fsl_map_rejected() {
665 let inner = Field::new(
666 "item",
667 DataType::Map(
668 Arc::new(Field::new(
669 "entries",
670 DataType::Struct(Fields::from(vec![
671 Field::new("key", DataType::Utf8, false),
672 Field::new("value", DataType::Int32, true),
673 ])),
674 false,
675 )),
676 false,
677 ),
678 true,
679 );
680 let data_type = DataType::FixedSizeList(Arc::new(inner), 2);
681 let arrow_field = Field::new("test", data_type, true);
682 let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap();
683 let _ = lance_field.data_type();
684 }
685
686 #[test]
687 fn test_filter_nested_fsl_garbage() {
688 let mut list_builder = ListBuilder::new(Int32Builder::new());
690 for i in 1..=6 {
691 list_builder.values().append_value(i);
692 list_builder.append(true);
693 }
694 let list_arr = list_builder.finish();
695
696 let fsl_field = Arc::new(Field::new(
697 "item",
698 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
699 true,
700 ));
701 let fsl = FixedSizeListArray::new(fsl_field, 2, Arc::new(list_arr), None);
702
703 let result = filter_nested_fsl_garbage(&fsl, &[false, true, false], 2);
705 let result = result.as_fixed_size_list();
706
707 let child_list = result.values().as_list::<i32>();
709 assert_eq!(
710 (0..6).map(|i| child_list.is_valid(i)).collect::<Vec<_>>(),
711 vec![true, true, false, false, true, true]
712 );
713 }
714
715 #[test]
716 fn test_filter_nested_fsl_no_list_child() {
717 let fsl_field = Arc::new(Field::new("item", DataType::Int32, true));
719 let values = arrow_array::Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
720 let fsl = FixedSizeListArray::new(fsl_field, 2, Arc::new(values), None);
721
722 let result = filter_nested_fsl_garbage(&fsl, &[false, true, false], 2);
723 assert_eq!(result.len(), 3);
725 }
726}