1use std::{ops::Range, sync::Arc};
11
12use arrow_array::{cast::AsArray, Array, ArrayRef, GenericListArray, OffsetSizeTrait, StructArray};
13use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer};
14use arrow_schema::DataType;
15use futures::future::BoxFuture;
16use lance_arrow::deepcopy::deep_copy_nulls;
17use lance_core::{Error, Result};
18use snafu::location;
19
20use crate::{
21 decoder::{
22 DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext,
23 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
24 StructuralSchedulingJob,
25 },
26 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27 repdef::RepDefBuilder,
28};
29
30pub struct FixedSizeListStructuralEncoder {
35 keep_original_array: bool,
36 child: Box<dyn FieldEncoder>,
37}
38
39impl FixedSizeListStructuralEncoder {
40 pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
41 Self {
42 keep_original_array,
43 child,
44 }
45 }
46}
47
48impl FieldEncoder for FixedSizeListStructuralEncoder {
49 fn maybe_encode(
50 &mut self,
51 array: ArrayRef,
52 external_buffers: &mut OutOfLineBuffers,
53 mut repdef: RepDefBuilder,
54 row_number: u64,
55 num_rows: u64,
56 ) -> Result<Vec<EncodeTask>> {
57 let fsl_arr = array
58 .as_fixed_size_list_opt()
59 .ok_or_else(|| Error::Internal {
60 message: "FixedSizeList encoder used for non-fixed-size-list data".to_string(),
61 location: location!(),
62 })?;
63
64 let dimension = fsl_arr.value_length() as usize;
65 let values = fsl_arr.values().clone();
66
67 let validity = if self.keep_original_array {
68 array.nulls().cloned()
69 } else {
70 deep_copy_nulls(array.nulls())
71 };
72 repdef.add_fsl(validity.clone(), dimension, num_rows as usize);
73
74 let values = if let Some(ref fsl_validity) = validity {
77 if needs_garbage_filtering(values.data_type()) {
78 let is_garbage =
79 expand_garbage_mask(&fsl_validity_to_garbage_mask(fsl_validity), dimension);
80 filter_fsl_child_garbage(values, &is_garbage)
81 } else {
82 values
83 }
84 } else {
85 values
86 };
87
88 self.child.maybe_encode(
89 values,
90 external_buffers,
91 repdef,
92 row_number,
93 num_rows * dimension as u64,
94 )
95 }
96
97 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
98 self.child.flush(external_buffers)
99 }
100
101 fn num_columns(&self) -> u32 {
102 self.child.num_columns()
103 }
104
105 fn finish(
106 &mut self,
107 external_buffers: &mut OutOfLineBuffers,
108 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
109 self.child.finish(external_buffers)
110 }
111}
112
113#[derive(Debug)]
118pub struct StructuralFixedSizeListScheduler {
119 child: Box<dyn StructuralFieldScheduler>,
120 dimension: u64,
121}
122
123impl StructuralFixedSizeListScheduler {
124 pub fn new(child: Box<dyn StructuralFieldScheduler>, dimension: i32) -> Self {
125 Self {
126 child,
127 dimension: dimension as u64,
128 }
129 }
130}
131
132impl StructuralFieldScheduler for StructuralFixedSizeListScheduler {
133 fn schedule_ranges<'a>(
134 &'a self,
135 ranges: &[Range<u64>],
136 filter: &FilterExpression,
137 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
138 let child_ranges: Vec<Range<u64>> = ranges
140 .iter()
141 .map(|r| (r.start * self.dimension)..(r.end * self.dimension))
142 .collect();
143 let child = self.child.schedule_ranges(&child_ranges, filter)?;
144 Ok(Box::new(StructuralFixedSizeListSchedulingJob::new(
145 child,
146 self.dimension,
147 )))
148 }
149
150 fn initialize<'a>(
151 &'a mut self,
152 filter: &'a FilterExpression,
153 context: &'a SchedulerContext,
154 ) -> BoxFuture<'a, Result<()>> {
155 self.child.initialize(filter, context)
156 }
157}
158
159#[derive(Debug)]
160struct StructuralFixedSizeListSchedulingJob<'a> {
161 child: Box<dyn StructuralSchedulingJob + 'a>,
162 dimension: u64,
163}
164
165impl<'a> StructuralFixedSizeListSchedulingJob<'a> {
166 fn new(child: Box<dyn StructuralSchedulingJob + 'a>, dimension: u64) -> Self {
167 Self { child, dimension }
168 }
169}
170
171impl StructuralSchedulingJob for StructuralFixedSizeListSchedulingJob<'_> {
172 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
173 let child_scan_lines = self.child.schedule_next(context)?;
175
176 Ok(child_scan_lines
178 .into_iter()
179 .map(|scan_line| ScheduledScanLine {
180 decoders: scan_line.decoders,
181 rows_scheduled: scan_line.rows_scheduled / self.dimension,
182 })
183 .collect())
184 }
185}
186
187#[derive(Debug)]
192pub struct StructuralFixedSizeListDecoder {
193 child: Box<dyn StructuralFieldDecoder>,
194 data_type: DataType,
195}
196
197impl StructuralFixedSizeListDecoder {
198 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
199 Self { child, data_type }
200 }
201}
202
203impl StructuralFieldDecoder for StructuralFixedSizeListDecoder {
204 fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> {
205 self.child.accept_page(child)
206 }
207
208 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
209 let dimension = match &self.data_type {
211 DataType::FixedSizeList(_, d) => *d as u64,
212 _ => {
213 return Err(Error::Internal {
214 message: "FixedSizeListDecoder has non-FSL data type".to_string(),
215 location: location!(),
216 });
217 }
218 };
219 let child_task = self.child.drain(num_rows * dimension)?;
220 Ok(Box::new(StructuralFixedSizeListDecodeTask::new(
221 child_task,
222 self.data_type.clone(),
223 num_rows,
224 )))
225 }
226
227 fn data_type(&self) -> &DataType {
228 &self.data_type
229 }
230}
231
232#[derive(Debug)]
233struct StructuralFixedSizeListDecodeTask {
234 child_task: Box<dyn StructuralDecodeArrayTask>,
235 data_type: DataType,
236 num_rows: u64,
237}
238
239impl StructuralFixedSizeListDecodeTask {
240 fn new(
241 child_task: Box<dyn StructuralDecodeArrayTask>,
242 data_type: DataType,
243 num_rows: u64,
244 ) -> Self {
245 Self {
246 child_task,
247 data_type,
248 num_rows,
249 }
250 }
251}
252
253impl StructuralDecodeArrayTask for StructuralFixedSizeListDecodeTask {
254 fn decode(self: Box<Self>) -> Result<DecodedArray> {
255 let DecodedArray { array, mut repdef } = self.child_task.decode()?;
256 match &self.data_type {
257 DataType::FixedSizeList(child_field, dimension) => {
258 let num_rows = self.num_rows as usize;
259 let validity = repdef.unravel_fsl_validity(num_rows, *dimension as usize);
260 let fsl_array = arrow_array::FixedSizeListArray::try_new(
261 child_field.clone(),
262 *dimension,
263 array,
264 validity,
265 )?;
266 Ok(DecodedArray {
267 array: Arc::new(fsl_array),
268 repdef,
269 })
270 }
271 _ => Err(Error::Internal {
272 message: "FixedSizeList decoder did not have a fixed-size list field".to_string(),
273 location: location!(),
274 }),
275 }
276 }
277}
278
279fn needs_garbage_filtering(data_type: &DataType) -> bool {
286 match data_type {
287 DataType::List(_)
288 | DataType::LargeList(_)
289 | DataType::ListView(_)
290 | DataType::LargeListView(_)
291 | DataType::Map(_, _) => true,
292 DataType::Struct(fields) => fields
293 .iter()
294 .any(|f| needs_garbage_filtering(f.data_type())),
295 DataType::FixedSizeList(field, _) => needs_garbage_filtering(field.data_type()),
296 _ => false,
297 }
298}
299
300fn filter_fsl_child_garbage(array: ArrayRef, is_garbage: &[bool]) -> ArrayRef {
307 debug_assert_eq!(array.len(), is_garbage.len());
308
309 match array.data_type() {
310 DataType::List(_) => filter_list_garbage(array.as_list::<i32>(), is_garbage),
311 DataType::LargeList(_) => filter_list_garbage(array.as_list::<i64>(), is_garbage),
312 DataType::ListView(_) | DataType::LargeListView(_) => {
313 unimplemented!("ListView inside complex FSL is not yet supported")
314 }
315 DataType::Map(_, _) => filter_map_garbage(array.as_map(), is_garbage),
316 DataType::FixedSizeList(_, dim) => {
317 filter_nested_fsl_garbage(array.as_fixed_size_list(), is_garbage, *dim as usize)
318 }
319 DataType::Struct(_) => filter_struct_garbage(array.as_struct(), is_garbage),
320 _ => array,
321 }
322}
323
324fn filter_struct_garbage(struct_arr: &StructArray, is_garbage: &[bool]) -> ArrayRef {
325 let needs_filtering = struct_arr
326 .fields()
327 .iter()
328 .any(|f| needs_garbage_filtering(f.data_type()));
329
330 if !needs_filtering {
331 return Arc::new(struct_arr.clone());
332 }
333
334 let new_columns: Vec<ArrayRef> = struct_arr
335 .columns()
336 .iter()
337 .zip(struct_arr.fields().iter())
338 .map(|(col, field)| {
339 if needs_garbage_filtering(field.data_type()) {
340 filter_fsl_child_garbage(col.clone(), is_garbage)
341 } else {
342 col.clone()
343 }
344 })
345 .collect();
346
347 Arc::new(StructArray::new(
348 struct_arr.fields().clone(),
349 new_columns,
350 struct_arr.nulls().cloned(),
351 ))
352}
353
354fn expand_garbage_mask(is_garbage: &[bool], dimension: usize) -> Vec<bool> {
355 let mut expanded = Vec::with_capacity(is_garbage.len() * dimension);
356 for &garbage in is_garbage {
357 for _ in 0..dimension {
358 expanded.push(garbage);
359 }
360 }
361 expanded
362}
363
364fn fsl_validity_to_garbage_mask(fsl_validity: &NullBuffer) -> Vec<bool> {
365 fsl_validity.iter().map(|valid| !valid).collect()
366}
367
368fn filter_list_garbage<O: OffsetSizeTrait>(
369 list_arr: &GenericListArray<O>,
370 is_garbage: &[bool],
371) -> ArrayRef {
372 debug_assert_eq!(
373 list_arr.len(),
374 is_garbage.len(),
375 "list length must match garbage mask length"
376 );
377
378 let old_offsets = list_arr.offsets();
379 let value_field = match list_arr.data_type() {
380 DataType::List(f) | DataType::LargeList(f) => f.clone(),
381 _ => unreachable!(),
382 };
383
384 let mut new_offsets: Vec<O> = Vec::with_capacity(list_arr.len() + 1);
385 let mut values_to_keep: Vec<usize> = Vec::new();
386 let mut validity_builder = BooleanBufferBuilder::new(list_arr.len());
387 let mut current_offset = O::usize_as(0);
388 new_offsets.push(current_offset);
389 let old_validity = list_arr.nulls();
390
391 for (i, &garbage) in is_garbage.iter().enumerate() {
392 if garbage {
393 new_offsets.push(current_offset);
394 validity_builder.append(false);
395 } else {
396 let start = old_offsets[i].as_usize();
397 let end = old_offsets[i + 1].as_usize();
398 values_to_keep.extend(start..end);
399 current_offset += O::usize_as(end - start);
400 new_offsets.push(current_offset);
401 validity_builder.append(old_validity.map(|v| v.is_valid(i)).unwrap_or(true));
402 }
403 }
404
405 let new_values = if values_to_keep.is_empty() {
406 list_arr.values().slice(0, 0)
407 } else {
408 let indices =
409 arrow_array::UInt64Array::from_iter_values(values_to_keep.iter().map(|&i| i as u64));
410 arrow_select::take::take(list_arr.values().as_ref(), &indices, None)
411 .expect("take should succeed")
412 };
413
414 let new_values = if needs_garbage_filtering(value_field.data_type()) && !new_values.is_empty() {
415 let len = new_values.len();
416 filter_fsl_child_garbage(new_values, &vec![false; len])
417 } else {
418 new_values
419 };
420
421 let new_validity = NullBuffer::new(validity_builder.finish());
422 Arc::new(GenericListArray::new(
423 value_field,
424 OffsetBuffer::new(ScalarBuffer::from(new_offsets)),
425 new_values,
426 Some(new_validity),
427 ))
428}
429
430fn filter_map_garbage(map_arr: &arrow_array::MapArray, is_garbage: &[bool]) -> ArrayRef {
431 debug_assert_eq!(map_arr.len(), is_garbage.len());
432
433 let old_offsets = map_arr.offsets();
434 let entries_field = match map_arr.data_type() {
435 DataType::Map(field, _) => field.clone(),
436 _ => unreachable!(),
437 };
438
439 let mut new_offsets: Vec<i32> = Vec::with_capacity(map_arr.len() + 1);
440 let mut values_to_keep: Vec<usize> = Vec::new();
441 let mut validity_builder = BooleanBufferBuilder::new(map_arr.len());
442 let mut current_offset: i32 = 0;
443 new_offsets.push(current_offset);
444 let old_validity = map_arr.nulls();
445
446 for (i, &garbage) in is_garbage.iter().enumerate() {
447 if garbage {
448 new_offsets.push(current_offset);
449 validity_builder.append(false);
450 } else {
451 let start = old_offsets[i] as usize;
452 let end = old_offsets[i + 1] as usize;
453 values_to_keep.extend(start..end);
454 current_offset += (end - start) as i32;
455 new_offsets.push(current_offset);
456 validity_builder.append(old_validity.map(|v| v.is_valid(i)).unwrap_or(true));
457 }
458 }
459
460 let new_entries: ArrayRef = if values_to_keep.is_empty() {
461 Arc::new(map_arr.entries().slice(0, 0))
462 } else {
463 let indices =
464 arrow_array::UInt64Array::from_iter_values(values_to_keep.iter().map(|&i| i as u64));
465 arrow_select::take::take(map_arr.entries(), &indices, None).expect("take should succeed")
466 };
467
468 let new_entries =
469 if needs_garbage_filtering(entries_field.data_type()) && !new_entries.is_empty() {
470 let len = new_entries.len();
471 filter_fsl_child_garbage(new_entries, &vec![false; len])
472 } else {
473 new_entries
474 };
475
476 let new_validity = NullBuffer::new(validity_builder.finish());
477 let keys_sorted = matches!(map_arr.data_type(), DataType::Map(_, true));
478
479 Arc::new(
480 arrow_array::MapArray::try_new(
481 entries_field,
482 OffsetBuffer::new(ScalarBuffer::from(new_offsets)),
483 new_entries.as_struct().clone(),
484 Some(new_validity),
485 keys_sorted,
486 )
487 .expect("MapArray construction should succeed"),
488 )
489}
490
491fn filter_nested_fsl_garbage(
493 fsl_arr: &arrow_array::FixedSizeListArray,
494 is_garbage: &[bool],
495 dimension: usize,
496) -> ArrayRef {
497 debug_assert_eq!(fsl_arr.len(), is_garbage.len());
498
499 let child_field = match fsl_arr.data_type() {
500 DataType::FixedSizeList(field, _) => field.clone(),
501 _ => unreachable!(),
502 };
503
504 if !needs_garbage_filtering(child_field.data_type()) {
505 return Arc::new(fsl_arr.clone());
506 }
507
508 let child_garbage = expand_garbage_mask(is_garbage, dimension);
509 let new_values = filter_fsl_child_garbage(fsl_arr.values().clone(), &child_garbage);
510
511 Arc::new(arrow_array::FixedSizeListArray::new(
512 child_field,
513 dimension as i32,
514 new_values,
515 fsl_arr.nulls().cloned(),
516 ))
517}
518
519#[cfg(test)]
520mod tests {
521 use std::{collections::HashMap, sync::Arc};
522
523 use arrow_array::{
524 builder::{Int32Builder, ListBuilder},
525 cast::AsArray,
526 Array, FixedSizeListArray,
527 };
528 use arrow_schema::{DataType, Field, Fields};
529 use rstest::rstest;
530
531 use super::filter_nested_fsl_garbage;
532 use crate::{
533 constants::{
534 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
535 STRUCTURAL_ENCODING_MINIBLOCK,
536 },
537 testing::{check_specific_random, TestCases},
538 version::LanceFileVersion,
539 };
540
541 fn make_fsl_struct_type(struct_fields: Fields, dimension: i32) -> DataType {
542 DataType::FixedSizeList(
543 Arc::new(Field::new("item", DataType::Struct(struct_fields), true)),
544 dimension,
545 )
546 }
547
548 fn simple_struct_fields() -> Fields {
549 Fields::from(vec![
550 Field::new("x", DataType::Float64, false),
551 Field::new("y", DataType::Float64, false),
552 ])
553 }
554
555 fn nested_struct_fields() -> Fields {
556 let inner = Fields::from(vec![
557 Field::new("a", DataType::Int32, false),
558 Field::new("b", DataType::Int32, false),
559 ]);
560 Fields::from(vec![
561 Field::new("outer_val", DataType::Float64, false),
562 Field::new("inner", DataType::Struct(inner), true),
563 ])
564 }
565
566 fn nested_struct_with_list_fields() -> Fields {
567 let inner = Fields::from(vec![Field::new(
568 "values",
569 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
570 true,
571 )]);
572 Fields::from(vec![
573 Field::new("id", DataType::Int32, false),
574 Field::new("inner", DataType::Struct(inner), true),
575 ])
576 }
577
578 fn struct_with_list_fields() -> Fields {
579 Fields::from(vec![
580 Field::new("id", DataType::Int32, false),
581 Field::new(
582 "values",
583 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
584 true,
585 ),
586 ])
587 }
588
589 fn struct_with_large_list_fields() -> Fields {
590 Fields::from(vec![
591 Field::new("id", DataType::Int32, false),
592 Field::new(
593 "values",
594 DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))),
595 true,
596 ),
597 ])
598 }
599
600 fn struct_with_nested_fsl_fields() -> Fields {
601 Fields::from(vec![
602 Field::new("id", DataType::Int32, false),
603 Field::new(
604 "vectors",
605 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
606 true,
607 ),
608 ])
609 }
610
611 fn struct_with_map_fields() -> Fields {
612 let entries_field = Arc::new(Field::new(
613 "entries",
614 DataType::Struct(Fields::from(vec![
615 Field::new("keys", DataType::Utf8, false),
616 Field::new("values", DataType::Int32, true),
617 ])),
618 false,
619 ));
620 Fields::from(vec![
621 Field::new("id", DataType::Int32, false),
622 Field::new("props", DataType::Map(entries_field, false), true),
623 ])
624 }
625
626 #[rstest]
627 #[case::simple(simple_struct_fields(), 2, LanceFileVersion::V2_2)]
628 #[case::nested_struct(nested_struct_fields(), 2, LanceFileVersion::V2_2)]
629 #[case::struct_with_list(struct_with_list_fields(), 2, LanceFileVersion::V2_2)]
630 #[case::struct_with_large_list(struct_with_large_list_fields(), 2, LanceFileVersion::V2_2)]
631 #[case::nested_struct_with_list(nested_struct_with_list_fields(), 2, LanceFileVersion::V2_2)]
632 #[case::struct_with_nested_fsl(struct_with_nested_fsl_fields(), 2, LanceFileVersion::V2_2)]
633 #[case::struct_with_map(struct_with_map_fields(), 2, LanceFileVersion::V2_2)]
634 #[test_log::test(tokio::test)]
635 async fn test_fsl_struct_random(
636 #[case] struct_fields: Fields,
637 #[case] dimension: i32,
638 #[case] min_version: LanceFileVersion,
639 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
640 structural_encoding: &str,
641 ) {
642 let data_type = make_fsl_struct_type(struct_fields, dimension);
643 let mut field_metadata = HashMap::new();
644 field_metadata.insert(
645 STRUCTURAL_ENCODING_META_KEY.to_string(),
646 structural_encoding.into(),
647 );
648 let field = Field::new("", data_type, true).with_metadata(field_metadata);
649 let test_cases = TestCases::basic().with_min_file_version(min_version);
650 check_specific_random(field, test_cases).await;
651 }
652
653 #[test]
655 #[should_panic(expected = "Unsupported logical type: list")]
656 fn test_fsl_list_rejected() {
657 let inner = Field::new(
658 "item",
659 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
660 true,
661 );
662 let data_type = DataType::FixedSizeList(Arc::new(inner), 2);
663 let arrow_field = Field::new("test", data_type, true);
664 let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap();
665 let _ = lance_field.data_type();
666 }
667
668 #[test]
669 #[should_panic(expected = "Unsupported logical type: map")]
670 fn test_fsl_map_rejected() {
671 let inner = Field::new(
672 "item",
673 DataType::Map(
674 Arc::new(Field::new(
675 "entries",
676 DataType::Struct(Fields::from(vec![
677 Field::new("key", DataType::Utf8, false),
678 Field::new("value", DataType::Int32, true),
679 ])),
680 false,
681 )),
682 false,
683 ),
684 true,
685 );
686 let data_type = DataType::FixedSizeList(Arc::new(inner), 2);
687 let arrow_field = Field::new("test", data_type, true);
688 let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap();
689 let _ = lance_field.data_type();
690 }
691
692 #[test]
693 fn test_filter_nested_fsl_garbage() {
694 let mut list_builder = ListBuilder::new(Int32Builder::new());
696 for i in 1..=6 {
697 list_builder.values().append_value(i);
698 list_builder.append(true);
699 }
700 let list_arr = list_builder.finish();
701
702 let fsl_field = Arc::new(Field::new(
703 "item",
704 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
705 true,
706 ));
707 let fsl = FixedSizeListArray::new(fsl_field, 2, Arc::new(list_arr), None);
708
709 let result = filter_nested_fsl_garbage(&fsl, &[false, true, false], 2);
711 let result = result.as_fixed_size_list();
712
713 let child_list = result.values().as_list::<i32>();
715 assert_eq!(
716 (0..6).map(|i| child_list.is_valid(i)).collect::<Vec<_>>(),
717 vec![true, true, false, false, true, true]
718 );
719 }
720
721 #[test]
722 fn test_filter_nested_fsl_no_list_child() {
723 let fsl_field = Arc::new(Field::new("item", DataType::Int32, true));
725 let values = arrow_array::Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
726 let fsl = FixedSizeListArray::new(fsl_field, 2, Arc::new(values), None);
727
728 let result = filter_nested_fsl_garbage(&fsl, &[false, true, false], 2);
729 assert_eq!(result.len(), 3);
731 }
732}