1use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7 cast::AsArray,
8 new_empty_array,
9 types::{Int32Type, Int64Type, UInt64Type},
10 Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_arrow::deepcopy::deep_copy_nulls;
16use lance_arrow::list::ListArrayExt;
17use lance_core::{cache::FileMetadataCache, Error, Result};
18use log::trace;
19use snafu::location;
20use tokio::task::JoinHandle;
21
22use crate::{
23 buffer::LanceBuffer,
24 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
25 decoder::{
26 DecodeArrayTask, DecodeBatchScheduler, DecodedArray, FieldScheduler, FilterExpression,
27 ListPriorityRange, LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding,
28 PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
29 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
30 StructuralSchedulingJob,
31 },
32 encoder::{
33 ArrayEncoder, EncodeTask, EncodedArray, EncodedColumn, EncodedPage, FieldEncoder,
34 OutOfLineBuffers,
35 },
36 encodings::logical::r#struct::SimpleStructScheduler,
37 format::pb,
38 repdef::RepDefBuilder,
39 EncodingsIo,
40};
41
42use super::{primitive::AccumulationQueue, r#struct::SimpleStructDecoder};
43
44#[derive(Debug)]
69struct ListRequest {
70 num_lists: u64,
72 includes_extra_offset: bool,
74 null_offset_adjustment: u64,
76 items_offset: u64,
78}
79
80#[derive(Debug)]
81struct ListRequestsIter {
82 list_requests: VecDeque<ListRequest>,
84 offsets_requests: Vec<Range<u64>>,
85}
86
87impl ListRequestsIter {
88 fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
91 let mut items_offset = 0;
92 let mut offsets_offset = 0;
93 let mut page_infos_iter = page_infos.iter();
94 let mut cur_page_info = page_infos_iter.next().unwrap();
95 let mut list_requests = VecDeque::new();
96 let mut offsets_requests = Vec::new();
97
98 for range in row_ranges {
101 let mut range = range.clone();
102
103 while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
105 trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
106 offsets_offset += cur_page_info.offsets_in_page;
107 items_offset += cur_page_info.num_items_referenced_by_page;
108 cur_page_info = page_infos_iter.next().unwrap();
109 }
110
111 let mut includes_extra_offset = range.start != offsets_offset;
114 if includes_extra_offset {
115 offsets_requests.push(range.start - 1..range.end);
116 } else {
117 offsets_requests.push(range.clone());
118 }
119
120 while !range.is_empty() {
123 let end = offsets_offset + cur_page_info.offsets_in_page;
126 let last = end >= range.end;
127 let end = end.min(range.end);
128 list_requests.push_back(ListRequest {
129 num_lists: end - range.start,
130 includes_extra_offset,
131 null_offset_adjustment: cur_page_info.null_offset_adjustment,
132 items_offset,
133 });
134
135 includes_extra_offset = false;
136 range.start = end;
137 if !last {
140 offsets_offset += cur_page_info.offsets_in_page;
141 items_offset += cur_page_info.num_items_referenced_by_page;
142 cur_page_info = page_infos_iter.next().unwrap();
143 }
144 }
145 }
146 Self {
147 list_requests,
148 offsets_requests,
149 }
150 }
151
152 fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
154 let mut list_requests = Vec::new();
155 while num_offsets > 0 {
156 let req = self.list_requests.front_mut().unwrap();
157 if req.includes_extra_offset {
159 num_offsets -= 1;
160 debug_assert_ne!(num_offsets, 0);
161 }
162 if num_offsets >= req.num_lists {
163 num_offsets -= req.num_lists;
164 list_requests.push(self.list_requests.pop_front().unwrap());
165 } else {
166 let sub_req = ListRequest {
167 num_lists: num_offsets,
168 includes_extra_offset: req.includes_extra_offset,
169 null_offset_adjustment: req.null_offset_adjustment,
170 items_offset: req.items_offset,
171 };
172
173 list_requests.push(sub_req);
174 req.includes_extra_offset = false;
175 req.num_lists -= num_offsets;
176 num_offsets = 0;
177 }
178 }
179 list_requests
180 }
181}
182
183fn decode_offsets(
203 offsets: &dyn Array,
204 list_requests: &[ListRequest],
205 null_offset_adjustment: u64,
206) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
207 let numeric_offsets = offsets.as_primitive::<UInt64Type>();
209 let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
211 let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
212 let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
213 normalized_offsets.push(0);
215 let mut last_normalized_offset = 0;
216 let offsets_values = numeric_offsets.values();
217
218 let mut item_ranges = VecDeque::new();
219 let mut offsets_offset: u32 = 0;
220 debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
222 for req in list_requests {
223 let num_lists = req.num_lists;
225
226 let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
232 if !req.includes_extra_offset {
233 let first_offset_idx = 0_usize;
235 let num_offsets = num_lists as usize;
236 let items_start = 0;
237 let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
238 let items_range = items_start..items_end;
239 (items_range, first_offset_idx, num_offsets)
240 } else {
241 let first_offset_idx = offsets_offset as usize;
244 let num_offsets = num_lists as usize + 1;
245 let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
246 let items_end =
247 offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
248 let items_range = items_start..items_end;
249 (items_range, first_offset_idx, num_offsets)
250 };
251
252 let validity_start = if !req.includes_extra_offset {
265 0
266 } else {
267 offsets_to_norm_start + 1
268 };
269 for off in offsets_values
270 .slice(validity_start, num_lists as usize)
271 .iter()
272 {
273 validity_buffer.append(*off < null_offset_adjustment);
274 }
275
276 if !req.includes_extra_offset {
278 let first_item = offsets_values[0] % null_offset_adjustment;
279 normalized_offsets.push(first_item);
280 last_normalized_offset = first_item;
281 }
282
283 normalized_offsets.extend(
287 offsets_values
288 .slice(offsets_to_norm_start, num_offsets_to_norm)
289 .windows(2)
290 .map(|w| {
291 let start = w[0] % null_offset_adjustment;
292 let end = w[1] % null_offset_adjustment;
293 if end < start {
294 panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
295 }
296 let length = end - start;
297 last_normalized_offset += length;
298 last_normalized_offset
299 }),
300 );
301 trace!(
302 "List offsets range of {} lists maps to item range {:?}",
303 num_lists,
304 items_range
305 );
306 offsets_offset += num_offsets_to_norm as u32;
307 if !items_range.is_empty() {
308 let items_range =
309 items_range.start + req.items_offset..items_range.end + req.items_offset;
310 item_ranges.push_back(items_range);
311 }
312 }
313
314 let validity = validity_buffer.finish();
315 (item_ranges, normalized_offsets, validity)
316}
317
318#[allow(clippy::too_many_arguments)]
325async fn indirect_schedule_task(
326 mut offsets_decoder: Box<dyn LogicalPageDecoder>,
327 list_requests: Vec<ListRequest>,
328 null_offset_adjustment: u64,
329 items_scheduler: Arc<dyn FieldScheduler>,
330 items_type: DataType,
331 io: Arc<dyn EncodingsIo>,
332 cache: Arc<FileMetadataCache>,
333 priority: Box<dyn PriorityRange>,
334) -> Result<IndirectlyLoaded> {
335 let num_offsets = offsets_decoder.num_rows();
336 offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
339 let decode_task = offsets_decoder.drain(num_offsets)?;
340 let offsets = decode_task.task.decode()?;
341
342 let (item_ranges, offsets, validity) =
343 decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
344
345 trace!(
346 "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
347 item_ranges,
348 items_scheduler.num_rows(),
349 priority
350 );
351 let offsets: Arc<[u64]> = offsets.into();
352
353 if item_ranges.is_empty() {
355 debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
356 return Ok(IndirectlyLoaded {
357 root_decoder: None,
358 offsets,
359 validity,
360 });
361 }
362 let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
363 let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
364
365 let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
367 let indirect_root_scheduler =
368 SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
369 let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
370 Arc::new(indirect_root_scheduler),
371 root_fields.clone(),
372 cache,
373 );
374 let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
375
376 let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
377
378 let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
379 &item_ranges,
380 &FilterExpression::no_filter(),
382 io,
383 Some(priority),
384 )?;
385
386 for message in indirect_messages {
387 for decoder in message.decoders {
388 let decoder = decoder.into_legacy();
389 if !decoder.path.is_empty() {
390 root_decoder.accept_child(decoder)?;
391 }
392 }
393 }
394
395 Ok(IndirectlyLoaded {
396 offsets,
397 validity,
398 root_decoder: Some(root_decoder),
399 })
400}
401
402#[derive(Debug)]
403struct ListFieldSchedulingJob<'a> {
404 scheduler: &'a ListFieldScheduler,
405 offsets: Box<dyn SchedulingJob + 'a>,
406 num_rows: u64,
407 list_requests_iter: ListRequestsIter,
408}
409
410impl<'a> ListFieldSchedulingJob<'a> {
411 fn try_new(
412 scheduler: &'a ListFieldScheduler,
413 ranges: &[Range<u64>],
414 filter: &FilterExpression,
415 ) -> Result<Self> {
416 let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
417 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
418 let offsets = scheduler
419 .offsets_scheduler
420 .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
421 Ok(Self {
422 scheduler,
423 offsets,
424 list_requests_iter,
425 num_rows,
426 })
427 }
428}
429
430impl SchedulingJob for ListFieldSchedulingJob<'_> {
431 fn schedule_next(
432 &mut self,
433 context: &mut SchedulerContext,
434 priority: &dyn PriorityRange,
435 ) -> Result<ScheduledScanLine> {
436 let next_offsets = self.offsets.schedule_next(context, priority)?;
437 let offsets_scheduled = next_offsets.rows_scheduled;
438 let list_reqs = self.list_requests_iter.next(offsets_scheduled);
439 trace!(
440 "Scheduled {} offsets which maps to list requests: {:?}",
441 offsets_scheduled,
442 list_reqs
443 );
444 let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
445 debug_assert!(list_reqs
448 .iter()
449 .all(|req| req.null_offset_adjustment == null_offset_adjustment));
450 let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
451 let next_offsets_decoder = next_offsets
453 .decoders
454 .into_iter()
455 .next()
456 .unwrap()
457 .into_legacy()
458 .decoder;
459
460 let items_scheduler = self.scheduler.items_scheduler.clone();
461 let items_type = self.scheduler.items_field.data_type().clone();
462 let io = context.io().clone();
463 let cache = context.cache().clone();
464
465 let indirect_fut = tokio::spawn(indirect_schedule_task(
467 next_offsets_decoder,
468 list_reqs,
469 null_offset_adjustment,
470 items_scheduler,
471 items_type,
472 io,
473 cache,
474 priority.box_clone(),
475 ));
476
477 let decoder = Box::new(ListPageDecoder {
479 offsets: Arc::new([]),
480 validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
481 item_decoder: None,
482 rows_drained: 0,
483 rows_loaded: 0,
484 items_field: self.scheduler.items_field.clone(),
485 num_rows,
486 unloaded: Some(indirect_fut),
487 offset_type: self.scheduler.offset_type.clone(),
488 data_type: self.scheduler.list_type.clone(),
489 });
490 let decoder = context.locate_decoder(decoder);
491 Ok(ScheduledScanLine {
492 decoders: vec![MessageType::DecoderReady(decoder)],
493 rows_scheduled: num_rows,
494 })
495 }
496
497 fn num_rows(&self) -> u64 {
498 self.num_rows
499 }
500}
501
502#[derive(Debug)]
517pub struct ListFieldScheduler {
518 offsets_scheduler: Arc<dyn FieldScheduler>,
519 items_scheduler: Arc<dyn FieldScheduler>,
520 items_field: Arc<Field>,
521 offset_type: DataType,
522 list_type: DataType,
523 offset_page_info: Vec<OffsetPageInfo>,
524}
525
526#[derive(Debug)]
530pub struct OffsetPageInfo {
531 pub offsets_in_page: u64,
532 pub null_offset_adjustment: u64,
533 pub num_items_referenced_by_page: u64,
534}
535
536impl ListFieldScheduler {
537 pub fn new(
539 offsets_scheduler: Arc<dyn FieldScheduler>,
540 items_scheduler: Arc<dyn FieldScheduler>,
541 items_field: Arc<Field>,
542 offset_type: DataType,
544 offset_page_info: Vec<OffsetPageInfo>,
545 ) -> Self {
546 let list_type = match &offset_type {
547 DataType::Int32 => DataType::List(items_field.clone()),
548 DataType::Int64 => DataType::LargeList(items_field.clone()),
549 _ => panic!("Unexpected offset type {}", offset_type),
550 };
551 Self {
552 offsets_scheduler,
553 items_scheduler,
554 items_field,
555 offset_type,
556 offset_page_info,
557 list_type,
558 }
559 }
560}
561
562impl FieldScheduler for ListFieldScheduler {
563 fn schedule_ranges<'a>(
564 &'a self,
565 ranges: &[Range<u64>],
566 filter: &FilterExpression,
567 ) -> Result<Box<dyn SchedulingJob + 'a>> {
568 Ok(Box::new(ListFieldSchedulingJob::try_new(
569 self, ranges, filter,
570 )?))
571 }
572
573 fn num_rows(&self) -> u64 {
574 self.offsets_scheduler.num_rows()
575 }
576
577 fn initialize<'a>(
578 &'a self,
579 _filter: &'a FilterExpression,
580 _context: &'a SchedulerContext,
581 ) -> BoxFuture<'a, Result<()>> {
582 std::future::ready(Ok(())).boxed()
584 }
585}
586
587#[derive(Debug)]
598struct ListPageDecoder {
599 unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
600 offsets: Arc<[u64]>,
602 validity: BooleanBuffer,
603 item_decoder: Option<SimpleStructDecoder>,
604 num_rows: u64,
605 rows_drained: u64,
606 rows_loaded: u64,
607 items_field: Arc<Field>,
608 offset_type: DataType,
609 data_type: DataType,
610}
611
612struct ListDecodeTask {
613 offsets: Vec<u64>,
614 validity: BooleanBuffer,
615 items: Option<Box<dyn DecodeArrayTask>>,
617 items_field: Arc<Field>,
618 offset_type: DataType,
619}
620
621impl DecodeArrayTask for ListDecodeTask {
622 fn decode(self: Box<Self>) -> Result<ArrayRef> {
623 let items = self
624 .items
625 .map(|items| {
626 let wrapped_items = items.decode()?;
629 Result::Ok(wrapped_items.as_struct().column(0).clone())
630 })
631 .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
632
633 let offsets = UInt64Array::from(self.offsets);
639 let validity = NullBuffer::new(self.validity);
640 let validity = if validity.null_count() == 0 {
641 None
642 } else {
643 Some(validity)
644 };
645 let min_offset = UInt64Array::new_scalar(offsets.value(0));
646 let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
647 match &self.offset_type {
648 DataType::Int32 => {
649 let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
650 let offsets_i32 = offsets.as_primitive::<Int32Type>();
651 let offsets = OffsetBuffer::new(offsets_i32.values().clone());
652
653 Ok(Arc::new(ListArray::try_new(
654 self.items_field.clone(),
655 offsets,
656 items,
657 validity,
658 )?))
659 }
660 DataType::Int64 => {
661 let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
662 let offsets_i64 = offsets.as_primitive::<Int64Type>();
663 let offsets = OffsetBuffer::new(offsets_i64.values().clone());
664
665 Ok(Arc::new(LargeListArray::try_new(
666 self.items_field.clone(),
667 offsets,
668 items,
669 validity,
670 )?))
671 }
672 _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
673 }
674 }
675}
676
677fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
682 let mut result = match to_search.binary_search(&target) {
683 Ok(idx) => idx,
684 Err(idx) => idx - 1,
685 };
686 while result < (to_search.len() - 1) && to_search[result + 1] == target {
687 result += 1;
688 }
689 result as u64
690}
691
692impl LogicalPageDecoder for ListPageDecoder {
693 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
694 async move {
695 if self.unloaded.is_some() {
698 trace!("List scheduler needs to wait for indirect I/O to complete");
699 let indirectly_loaded = self.unloaded.take().unwrap().await;
700 if indirectly_loaded.is_err() {
701 match indirectly_loaded.unwrap_err().try_into_panic() {
702 Ok(err) => std::panic::resume_unwind(err),
703 Err(err) => panic!("{:?}", err),
704 };
705 }
706 let indirectly_loaded = indirectly_loaded.unwrap()?;
707
708 self.offsets = indirectly_loaded.offsets;
709 self.validity = indirectly_loaded.validity;
710 self.item_decoder = indirectly_loaded.root_decoder;
711 }
712 if self.rows_loaded > loaded_need {
713 return Ok(());
714 }
715
716 let boundary = loaded_need as usize;
717 debug_assert!(boundary < self.num_rows as usize);
718 let items_needed = self.offsets[boundary + 1].saturating_sub(1);
721 trace!(
722 "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
723 loaded_need,
724 self.rows_loaded,
725 self.num_rows,
726 items_needed,
727 );
728
729 let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
730 item_decoder.wait_for_loaded(items_needed).await?;
731 item_decoder.rows_loaded()
732 } else {
733 0
734 };
735
736 self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
737 trace!("List decoder now has {} loaded rows", self.rows_loaded);
738
739 Ok(())
740 }
741 .boxed()
742 }
743
744 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
745 let mut actual_num_rows = num_rows;
747 let item_start = self.offsets[self.rows_drained as usize];
748 if self.offset_type != DataType::Int64 {
749 while actual_num_rows > 0 {
752 let num_items =
753 self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
754 if num_items <= i32::MAX as u64 {
755 break;
756 }
757 actual_num_rows -= 1;
760 }
761 }
762 if actual_num_rows < num_rows {
763 return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
768 }
769 let offsets = self.offsets
770 [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
771 .to_vec();
772 let validity = self
773 .validity
774 .slice(self.rows_drained as usize, actual_num_rows as usize);
775 let start = offsets[0];
776 let end = offsets[offsets.len() - 1];
777 let num_items_to_drain = end - start;
778
779 let item_decode = if num_items_to_drain == 0 {
780 None
781 } else {
782 self.item_decoder
783 .as_mut()
784 .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
785 .transpose()?
786 };
787
788 self.rows_drained += num_rows;
789 Ok(NextDecodeTask {
790 num_rows,
791 task: Box::new(ListDecodeTask {
792 offsets,
793 validity,
794 items_field: self.items_field.clone(),
795 items: item_decode,
796 offset_type: self.offset_type.clone(),
797 }) as Box<dyn DecodeArrayTask>,
798 })
799 }
800
801 fn num_rows(&self) -> u64 {
802 self.num_rows
803 }
804
805 fn rows_loaded(&self) -> u64 {
806 self.rows_loaded
807 }
808
809 fn rows_drained(&self) -> u64 {
810 self.rows_drained
811 }
812
813 fn data_type(&self) -> &DataType {
814 &self.data_type
815 }
816}
817
818struct IndirectlyLoaded {
819 offsets: Arc<[u64]>,
820 validity: BooleanBuffer,
821 root_decoder: Option<SimpleStructDecoder>,
822}
823
824impl std::fmt::Debug for IndirectlyLoaded {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 f.debug_struct("IndirectlyLoaded")
827 .field("offsets", &self.offsets)
828 .field("validity", &self.validity)
829 .finish()
830 }
831}
832
833#[derive(Debug)]
863struct ListOffsetsEncoder {
864 accumulation_queue: AccumulationQueue,
866 inner_encoder: Arc<dyn ArrayEncoder>,
868 column_index: u32,
869}
870
871impl ListOffsetsEncoder {
872 fn new(
873 cache_bytes: u64,
874 keep_original_array: bool,
875 column_index: u32,
876 inner_encoder: Arc<dyn ArrayEncoder>,
877 ) -> Self {
878 Self {
879 accumulation_queue: AccumulationQueue::new(
880 cache_bytes,
881 column_index,
882 keep_original_array,
883 ),
884 inner_encoder,
885 column_index,
886 }
887 }
888
889 fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
891 match list_arr.data_type() {
892 DataType::List(_) => {
893 let offsets = list_arr.as_list::<i32>().offsets().clone();
894 Arc::new(Int32Array::new(offsets.into_inner(), None))
895 }
896 DataType::LargeList(_) => {
897 let offsets = list_arr.as_list::<i64>().offsets().clone();
898 Arc::new(Int64Array::new(offsets.into_inner(), None))
899 }
900 _ => panic!(),
901 }
902 }
903
904 fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
907 if let Some(validity) = list_arr.nulls() {
908 Arc::new(BooleanArray::new(validity.inner().clone(), None))
909 } else {
910 new_empty_array(&DataType::Boolean)
913 }
914 }
915
916 fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
917 let inner_encoder = self.inner_encoder.clone();
918 let column_idx = self.column_index;
919 let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
922 let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
923
924 tokio::task::spawn(async move {
925 let num_rows =
926 offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
927 let num_rows = num_rows as u64;
928 let mut buffer_index = 0;
929 let array = Self::do_encode(
930 offset_arrays,
931 validity_arrays,
932 &mut buffer_index,
933 num_rows,
934 inner_encoder,
935 )?;
936 let (data, description) = array.into_buffers();
937 Ok(EncodedPage {
938 data,
939 description: PageEncoding::Legacy(description),
940 num_rows,
941 column_idx,
942 row_number: 0, })
944 })
945 .map(|res_res| res_res.unwrap())
946 .boxed()
947 }
948
949 fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
950 let offsets = Self::extract_offsets(list_arr);
951 let validity = Self::extract_validity(list_arr);
952 let num_rows = offsets.len() as u64;
953 if let Some(mut arrays) = self
956 .accumulation_queue
957 .insert(offsets, 0, num_rows)
958 {
959 arrays.0.push(validity);
960 Some(self.make_encode_task(arrays.0))
961 } else if let Some(arrays) = self
962 .accumulation_queue
963 .insert(validity, 0, num_rows)
964 {
965 Some(self.make_encode_task(arrays.0))
966 } else {
967 None
968 }
969 }
970
971 fn flush(&mut self) -> Option<EncodeTask> {
972 if let Some(arrays) = self.accumulation_queue.flush() {
973 Some(self.make_encode_task(arrays.0))
974 } else {
975 None
976 }
977 }
978
979 fn get_offset_span(array: &dyn Array) -> u64 {
982 match array.data_type() {
983 DataType::Int32 => {
984 let arr_i32 = array.as_primitive::<Int32Type>();
985 (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
986 }
987 DataType::Int64 => {
988 let arr_i64 = array.as_primitive::<Int64Type>();
989 (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
990 }
991 _ => panic!(),
992 }
993 }
994
995 fn extend_offsets_vec_u64(
998 dest: &mut Vec<u64>,
999 offsets: &dyn Array,
1000 validity: Option<&BooleanArray>,
1001 base: u64,
1003 null_offset_adjustment: u64,
1004 ) {
1005 match offsets.data_type() {
1006 DataType::Int32 => {
1007 let offsets_i32 = offsets.as_primitive::<Int32Type>();
1008 let start = offsets_i32.value(0) as u64;
1009 let modifier = base as i64 - start as i64;
1013 if let Some(validity) = validity {
1014 dest.extend(
1015 offsets_i32
1016 .values()
1017 .iter()
1018 .skip(1)
1019 .zip(validity.values().iter())
1020 .map(|(&off, valid)| {
1021 (off as i64 + modifier) as u64
1022 + (!valid as u64 * null_offset_adjustment)
1023 }),
1024 );
1025 } else {
1026 dest.extend(
1027 offsets_i32
1028 .values()
1029 .iter()
1030 .skip(1)
1031 .map(|&v| (v as i64 + modifier) as u64),
1033 );
1034 }
1035 }
1036 DataType::Int64 => {
1037 let offsets_i64 = offsets.as_primitive::<Int64Type>();
1038 let start = offsets_i64.value(0) as u64;
1039 let modifier = base as i64 - start as i64;
1043 if let Some(validity) = validity {
1044 dest.extend(
1045 offsets_i64
1046 .values()
1047 .iter()
1048 .skip(1)
1049 .zip(validity.values().iter())
1050 .map(|(&off, valid)| {
1051 (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1052 }),
1053 )
1054 } else {
1055 dest.extend(
1056 offsets_i64
1057 .values()
1058 .iter()
1059 .skip(1)
1060 .map(|&v| (v + modifier) as u64),
1061 );
1062 }
1063 }
1064 _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1065 }
1066 }
1067
1068 fn do_encode_u64(
1069 offset_arrays: Vec<ArrayRef>,
1070 validity: Vec<Option<&BooleanArray>>,
1071 num_offsets: u64,
1072 null_offset_adjustment: u64,
1073 buffer_index: &mut u32,
1074 inner_encoder: Arc<dyn ArrayEncoder>,
1075 ) -> Result<EncodedArray> {
1076 let mut offsets = Vec::with_capacity(num_offsets as usize);
1077 for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1078 let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1079 Self::extend_offsets_vec_u64(
1080 &mut offsets,
1081 &offsets_arr,
1082 validity_arr,
1083 last_prev_offset,
1084 null_offset_adjustment,
1085 );
1086 }
1087 let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1088 bits_per_value: 64,
1089 data: LanceBuffer::reinterpret_vec(offsets),
1090 num_values: num_offsets,
1091 block_info: BlockInfo::new(),
1092 });
1093 inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1094 }
1095
1096 fn do_encode(
1097 offset_arrays: Vec<ArrayRef>,
1098 validity_arrays: Vec<ArrayRef>,
1099 buffer_index: &mut u32,
1100 num_offsets: u64,
1101 inner_encoder: Arc<dyn ArrayEncoder>,
1102 ) -> Result<EncodedArray> {
1103 let validity_arrays = validity_arrays
1104 .iter()
1105 .map(|v| {
1106 if v.is_empty() {
1107 None
1108 } else {
1109 Some(v.as_boolean())
1110 }
1111 })
1112 .collect::<Vec<_>>();
1113 debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1114 let total_span = offset_arrays
1115 .iter()
1116 .map(|arr| Self::get_offset_span(arr.as_ref()))
1117 .sum::<u64>();
1118 let null_offset_adjustment = total_span + 1;
1120 let encoded_offsets = Self::do_encode_u64(
1121 offset_arrays,
1122 validity_arrays,
1123 num_offsets,
1124 null_offset_adjustment,
1125 buffer_index,
1126 inner_encoder,
1127 )?;
1128 Ok(EncodedArray {
1129 data: encoded_offsets.data,
1130 encoding: pb::ArrayEncoding {
1131 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1132 pb::List {
1133 offsets: Some(Box::new(encoded_offsets.encoding)),
1134 null_offset_adjustment,
1135 num_items: total_span,
1136 },
1137 ))),
1138 },
1139 })
1140 }
1141}
1142
1143pub struct ListFieldEncoder {
1144 offsets_encoder: ListOffsetsEncoder,
1145 items_encoder: Box<dyn FieldEncoder>,
1146}
1147
1148impl ListFieldEncoder {
1149 pub fn new(
1150 items_encoder: Box<dyn FieldEncoder>,
1151 inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1152 cache_bytes_per_columns: u64,
1153 keep_original_array: bool,
1154 column_index: u32,
1155 ) -> Self {
1156 Self {
1157 offsets_encoder: ListOffsetsEncoder::new(
1158 cache_bytes_per_columns,
1159 keep_original_array,
1160 column_index,
1161 inner_offsets_encoder,
1162 ),
1163 items_encoder,
1164 }
1165 }
1166
1167 fn combine_tasks(
1168 offsets_tasks: Vec<EncodeTask>,
1169 item_tasks: Vec<EncodeTask>,
1170 ) -> Result<Vec<EncodeTask>> {
1171 let mut all_tasks = offsets_tasks;
1172 let item_tasks = item_tasks;
1173 all_tasks.extend(item_tasks);
1174 Ok(all_tasks)
1175 }
1176}
1177
1178impl FieldEncoder for ListFieldEncoder {
1179 fn maybe_encode(
1180 &mut self,
1181 array: ArrayRef,
1182 external_buffers: &mut OutOfLineBuffers,
1183 repdef: RepDefBuilder,
1184 row_number: u64,
1185 num_rows: u64,
1186 ) -> Result<Vec<EncodeTask>> {
1187 let items = match array.data_type() {
1191 DataType::List(_) => {
1192 let list_arr = array.as_list::<i32>();
1193 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1194 let items_end =
1195 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1196 list_arr
1197 .values()
1198 .slice(items_start, items_end - items_start)
1199 }
1200 DataType::LargeList(_) => {
1201 let list_arr = array.as_list::<i64>();
1202 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1203 let items_end =
1204 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1205 list_arr
1206 .values()
1207 .slice(items_start, items_end - items_start)
1208 }
1209 _ => panic!(),
1210 };
1211 let offsets_tasks = self
1212 .offsets_encoder
1213 .maybe_encode_offsets_and_validity(array.as_ref())
1214 .map(|task| vec![task])
1215 .unwrap_or_default();
1216 let mut item_tasks = self.items_encoder.maybe_encode(
1217 items,
1218 external_buffers,
1219 repdef,
1220 row_number,
1221 num_rows,
1222 )?;
1223 if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1224 item_tasks = self.items_encoder.flush(external_buffers)?;
1230 }
1231 Self::combine_tasks(offsets_tasks, item_tasks)
1232 }
1233
1234 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1235 let offsets_tasks = self
1236 .offsets_encoder
1237 .flush()
1238 .map(|task| vec![task])
1239 .unwrap_or_default();
1240 let item_tasks = self.items_encoder.flush(external_buffers)?;
1241 Self::combine_tasks(offsets_tasks, item_tasks)
1242 }
1243
1244 fn num_columns(&self) -> u32 {
1245 self.items_encoder.num_columns() + 1
1246 }
1247
1248 fn finish(
1249 &mut self,
1250 external_buffers: &mut OutOfLineBuffers,
1251 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1252 let inner_columns = self.items_encoder.finish(external_buffers);
1253 async move {
1254 let mut columns = vec![EncodedColumn::default()];
1255 let inner_columns = inner_columns.await?;
1256 columns.extend(inner_columns);
1257 Ok(columns)
1258 }
1259 .boxed()
1260 }
1261}
1262
1263pub struct ListStructuralEncoder {
1271 keep_original_array: bool,
1272 child: Box<dyn FieldEncoder>,
1273}
1274
1275impl ListStructuralEncoder {
1276 pub fn new(keep_original_array: bool, child: Box<dyn FieldEncoder>) -> Self {
1277 Self {
1278 keep_original_array,
1279 child,
1280 }
1281 }
1282}
1283
1284impl FieldEncoder for ListStructuralEncoder {
1285 fn maybe_encode(
1286 &mut self,
1287 array: ArrayRef,
1288 external_buffers: &mut OutOfLineBuffers,
1289 mut repdef: RepDefBuilder,
1290 row_number: u64,
1291 num_rows: u64,
1292 ) -> Result<Vec<EncodeTask>> {
1293 let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
1294 let has_garbage_values = if self.keep_original_array {
1295 repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
1296 } else {
1297 repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
1299 };
1300 if has_garbage_values {
1301 list_arr.filter_garbage_nulls().trimmed_values()
1302 } else {
1303 list_arr.trimmed_values()
1304 }
1305 } else if let Some(list_arr) = array.as_list_opt::<i64>() {
1306 let has_garbage_values = if self.keep_original_array {
1307 repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned())
1308 } else {
1309 repdef.add_offsets(list_arr.offsets().clone(), deep_copy_nulls(array.nulls()))
1310 };
1311 if has_garbage_values {
1312 list_arr.filter_garbage_nulls().trimmed_values()
1313 } else {
1314 list_arr.trimmed_values()
1315 }
1316 } else {
1317 panic!("List encoder used for non-list data")
1318 };
1319 self.child
1320 .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
1321 }
1322
1323 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1324 self.child.flush(external_buffers)
1325 }
1326
1327 fn num_columns(&self) -> u32 {
1328 self.child.num_columns()
1329 }
1330
1331 fn finish(
1332 &mut self,
1333 external_buffers: &mut OutOfLineBuffers,
1334 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1335 self.child.finish(external_buffers)
1336 }
1337}
1338
1339#[derive(Debug)]
1340pub struct StructuralListScheduler {
1341 child: Box<dyn StructuralFieldScheduler>,
1342}
1343
1344impl StructuralListScheduler {
1345 pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
1346 Self { child }
1347 }
1348}
1349
1350impl StructuralFieldScheduler for StructuralListScheduler {
1351 fn schedule_ranges<'a>(
1352 &'a self,
1353 ranges: &[Range<u64>],
1354 filter: &FilterExpression,
1355 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
1356 let child = self.child.schedule_ranges(ranges, filter)?;
1357
1358 Ok(Box::new(StructuralListSchedulingJob::new(child)))
1359 }
1360
1361 fn initialize<'a>(
1362 &'a mut self,
1363 filter: &'a FilterExpression,
1364 context: &'a SchedulerContext,
1365 ) -> BoxFuture<'a, Result<()>> {
1366 self.child.initialize(filter, context)
1367 }
1368}
1369
1370#[derive(Debug)]
1375struct StructuralListSchedulingJob<'a> {
1376 child: Box<dyn StructuralSchedulingJob + 'a>,
1377}
1378
1379impl<'a> StructuralListSchedulingJob<'a> {
1380 fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
1381 Self { child }
1382 }
1383}
1384
1385impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
1386 fn schedule_next(
1387 &mut self,
1388 context: &mut SchedulerContext,
1389 ) -> Result<Option<ScheduledScanLine>> {
1390 self.child.schedule_next(context)
1391 }
1392}
1393
1394#[derive(Debug)]
1395pub struct StructuralListDecoder {
1396 child: Box<dyn StructuralFieldDecoder>,
1397 data_type: DataType,
1398}
1399
1400impl StructuralListDecoder {
1401 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
1402 Self { child, data_type }
1403 }
1404}
1405
1406impl StructuralFieldDecoder for StructuralListDecoder {
1407 fn accept_page(&mut self, child: crate::decoder::LoadedPage) -> Result<()> {
1408 self.child.accept_page(child)
1409 }
1410
1411 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
1412 let child_task = self.child.drain(num_rows)?;
1413 Ok(Box::new(StructuralListDecodeTask::new(
1414 child_task,
1415 self.data_type.clone(),
1416 )))
1417 }
1418
1419 fn data_type(&self) -> &DataType {
1420 &self.data_type
1421 }
1422}
1423
1424#[derive(Debug)]
1425struct StructuralListDecodeTask {
1426 child_task: Box<dyn StructuralDecodeArrayTask>,
1427 data_type: DataType,
1428}
1429
1430impl StructuralListDecodeTask {
1431 fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
1432 Self {
1433 child_task,
1434 data_type,
1435 }
1436 }
1437}
1438
1439impl StructuralDecodeArrayTask for StructuralListDecodeTask {
1440 fn decode(self: Box<Self>) -> Result<DecodedArray> {
1441 let DecodedArray { array, mut repdef } = self.child_task.decode()?;
1442 match &self.data_type {
1443 DataType::List(child_field) => {
1444 let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
1445 let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
1446 Ok(DecodedArray {
1447 array: Arc::new(list_array),
1448 repdef,
1449 })
1450 }
1451 DataType::LargeList(child_field) => {
1452 let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
1453 let list_array =
1454 LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
1455 Ok(DecodedArray {
1456 array: Arc::new(list_array),
1457 repdef,
1458 })
1459 }
1460 _ => panic!("List decoder did not have a list field"),
1461 }
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467
1468 use std::{collections::HashMap, sync::Arc};
1469
1470 use arrow::array::{Int64Builder, LargeListBuilder, StringBuilder};
1471 use arrow_array::{
1472 builder::{Int32Builder, ListBuilder},
1473 Array, ArrayRef, BooleanArray, DictionaryArray, LargeStringArray, ListArray, StructArray,
1474 UInt64Array, UInt8Array,
1475 };
1476 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
1477 use arrow_schema::{DataType, Field, Fields};
1478 use lance_core::datatypes::{
1479 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
1480 };
1481 use rstest::rstest;
1482
1483 use crate::{
1484 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1485 version::LanceFileVersion,
1486 };
1487
1488 fn make_list_type(inner_type: DataType) -> DataType {
1489 DataType::List(Arc::new(Field::new("item", inner_type, true)))
1490 }
1491
1492 fn make_large_list_type(inner_type: DataType) -> DataType {
1493 DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
1494 }
1495
1496 #[rstest]
1497 #[test_log::test(tokio::test)]
1498 async fn test_list(
1499 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1500 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1501 structural_encoding: &str,
1502 ) {
1503 let mut field_metadata = HashMap::new();
1504 field_metadata.insert(
1505 STRUCTURAL_ENCODING_META_KEY.to_string(),
1506 structural_encoding.into(),
1507 );
1508 let field =
1509 Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
1510 check_round_trip_encoding_random(field, version).await;
1511 }
1512
1513 #[rstest]
1514 #[test_log::test(tokio::test)]
1515 async fn test_deeply_nested_lists(
1516 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1517 structural_encoding: &str,
1518 ) {
1519 let mut field_metadata = HashMap::new();
1520 field_metadata.insert(
1521 STRUCTURAL_ENCODING_META_KEY.to_string(),
1522 structural_encoding.into(),
1523 );
1524 let field = Field::new("item", DataType::Int32, true).with_metadata(field_metadata);
1525 for _ in 0..5 {
1526 let field = Field::new("", make_list_type(field.data_type().clone()), true);
1527 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1528 }
1529 }
1530
1531 #[test_log::test(tokio::test)]
1532 async fn test_large_list() {
1533 let field = Field::new("", make_large_list_type(DataType::Int32), true);
1534 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1535 }
1536
1537 #[test_log::test(tokio::test)]
1538 async fn test_nested_strings() {
1539 let field = Field::new("", make_list_type(DataType::Utf8), true);
1540 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1541 }
1542
1543 #[test_log::test(tokio::test)]
1544 async fn test_nested_list() {
1545 let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
1546 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1547 }
1548
1549 #[test_log::test(tokio::test)]
1550 async fn test_list_struct_list() {
1551 let struct_type = DataType::Struct(Fields::from(vec![Field::new(
1552 "inner_str",
1553 DataType::Utf8,
1554 false,
1555 )]));
1556
1557 let field = Field::new("", make_list_type(struct_type), true);
1558 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1559 }
1560
1561 #[test_log::test(tokio::test)]
1562 async fn test_list_struct_empty() {
1563 let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
1564 let items = UInt64Array::from(Vec::<u64>::new());
1565 let structs = StructArray::new(fields, vec![Arc::new(items)], None);
1566 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
1567 let lists = ListArray::new(
1568 Arc::new(Field::new("item", structs.data_type().clone(), true)),
1569 offsets,
1570 Arc::new(structs),
1571 None,
1572 );
1573
1574 check_round_trip_encoding_of_data(
1575 vec![Arc::new(lists)],
1576 &TestCases::default(),
1577 HashMap::new(),
1578 )
1579 .await;
1580 }
1581
1582 #[rstest]
1583 #[test_log::test(tokio::test)]
1584 async fn test_simple_list(
1585 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1586 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1587 structural_encoding: &str,
1588 ) {
1589 let items_builder = Int32Builder::new();
1590 let mut list_builder = ListBuilder::new(items_builder);
1591 list_builder.append_value([Some(1), Some(2), Some(3)]);
1592 list_builder.append_value([Some(4), Some(5)]);
1593 list_builder.append_null();
1594 list_builder.append_value([Some(6), Some(7), Some(8)]);
1595 let list_array = list_builder.finish();
1596
1597 let mut field_metadata = HashMap::new();
1598 field_metadata.insert(
1599 STRUCTURAL_ENCODING_META_KEY.to_string(),
1600 structural_encoding.into(),
1601 );
1602
1603 let test_cases = TestCases::default()
1604 .with_range(0..2)
1605 .with_range(0..3)
1606 .with_range(1..3)
1607 .with_indices(vec![1, 3])
1608 .with_indices(vec![2])
1609 .with_file_version(version);
1610 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1611 .await;
1612 }
1613
1614 #[rstest]
1615 #[test_log::test(tokio::test)]
1616 async fn test_simple_nested_list_ends_with_null(
1617 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1618 structural_encoding: &str,
1619 ) {
1620 use arrow_array::Int32Array;
1621
1622 let values = Int32Array::from(vec![1, 2, 3, 4, 5]);
1623 let inner_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 5]);
1624 let inner_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
1625 let outer_offsets = ScalarBuffer::<i32>::from(vec![0, 1, 2, 3, 4, 5, 6, 6]);
1626 let outer_validity = BooleanBuffer::from(vec![true, true, true, true, true, true, false]);
1627
1628 let inner_list = ListArray::new(
1629 Arc::new(Field::new("item", DataType::Int32, true)),
1630 OffsetBuffer::new(inner_offsets),
1631 Arc::new(values),
1632 Some(NullBuffer::new(inner_validity)),
1633 );
1634 let outer_list = ListArray::new(
1635 Arc::new(Field::new(
1636 "item",
1637 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1638 true,
1639 )),
1640 OffsetBuffer::new(outer_offsets),
1641 Arc::new(inner_list),
1642 Some(NullBuffer::new(outer_validity)),
1643 );
1644
1645 let mut field_metadata = HashMap::new();
1646 field_metadata.insert(
1647 STRUCTURAL_ENCODING_META_KEY.to_string(),
1648 structural_encoding.into(),
1649 );
1650
1651 let test_cases = TestCases::default()
1652 .with_range(0..2)
1653 .with_range(0..3)
1654 .with_range(5..7)
1655 .with_indices(vec![1, 6])
1656 .with_indices(vec![6])
1657 .with_file_version(LanceFileVersion::V2_1);
1658 check_round_trip_encoding_of_data(vec![Arc::new(outer_list)], &test_cases, field_metadata)
1659 .await;
1660 }
1661
1662 #[rstest]
1663 #[test_log::test(tokio::test)]
1664 async fn test_simple_string_list(
1665 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1666 structural_encoding: &str,
1667 ) {
1668 let items_builder = StringBuilder::new();
1669 let mut list_builder = ListBuilder::new(items_builder);
1670 list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
1671 list_builder.append_value([Some("gh"), None]);
1672 list_builder.append_null();
1673 list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
1674 let list_array = list_builder.finish();
1675
1676 let mut field_metadata = HashMap::new();
1677 field_metadata.insert(
1678 STRUCTURAL_ENCODING_META_KEY.to_string(),
1679 structural_encoding.into(),
1680 );
1681
1682 let test_cases = TestCases::default()
1683 .with_range(0..2)
1684 .with_range(0..3)
1685 .with_range(1..3)
1686 .with_indices(vec![1, 3])
1687 .with_indices(vec![2])
1688 .with_file_version(LanceFileVersion::V2_1);
1689 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1690 .await;
1691 }
1692
1693 #[rstest]
1694 #[test_log::test(tokio::test)]
1695 async fn test_simple_sliced_list(
1696 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1697 structural_encoding: &str,
1698 ) {
1699 let items_builder = Int32Builder::new();
1700 let mut list_builder = ListBuilder::new(items_builder);
1701 list_builder.append_value([Some(1), Some(2), Some(3)]);
1702 list_builder.append_value([Some(4), Some(5)]);
1703 list_builder.append_null();
1704 list_builder.append_value([Some(6), Some(7), Some(8)]);
1705 let list_array = list_builder.finish();
1706
1707 let list_array = list_array.slice(1, 2);
1708
1709 let mut field_metadata = HashMap::new();
1710 field_metadata.insert(
1711 STRUCTURAL_ENCODING_META_KEY.to_string(),
1712 structural_encoding.into(),
1713 );
1714
1715 let test_cases = TestCases::default()
1716 .with_range(0..2)
1717 .with_range(1..2)
1718 .with_indices(vec![0])
1719 .with_indices(vec![1])
1720 .with_file_version(LanceFileVersion::V2_1);
1721 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1722 .await;
1723 }
1724
1725 #[test_log::test(tokio::test)]
1726 async fn test_simple_list_dict() {
1727 let values = LargeStringArray::from_iter_values(["a", "bb", "ccc"]);
1728 let indices = UInt8Array::from(vec![0, 1, 2, 0, 1, 2, 0, 1, 2]);
1729 let dict_array = DictionaryArray::new(indices, Arc::new(values));
1730 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 5, 6, 9]));
1731 let list_array = ListArray::new(
1732 Arc::new(Field::new("item", dict_array.data_type().clone(), true)),
1733 offsets,
1734 Arc::new(dict_array),
1735 None,
1736 );
1737
1738 let test_cases = TestCases::default()
1739 .with_range(0..2)
1740 .with_range(1..3)
1741 .with_range(2..4)
1742 .with_indices(vec![1])
1743 .with_indices(vec![2]);
1744 check_round_trip_encoding_of_data(
1745 vec![Arc::new(list_array)],
1746 &test_cases,
1747 HashMap::default(),
1748 )
1749 .await;
1750 }
1751
1752 #[rstest]
1753 #[test_log::test(tokio::test)]
1754 async fn test_list_with_garbage_nulls(
1755 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1756 structural_encoding: &str,
1757 ) {
1758 let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1761 let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
1762 let offsets = OffsetBuffer::new(offsets);
1763 let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
1764 let list_arr = ListArray::new(
1765 Arc::new(Field::new("item", DataType::UInt64, true)),
1766 offsets,
1767 Arc::new(items),
1768 Some(list_validity),
1769 );
1770
1771 let mut field_metadata = HashMap::new();
1772 field_metadata.insert(
1773 STRUCTURAL_ENCODING_META_KEY.to_string(),
1774 structural_encoding.into(),
1775 );
1776
1777 let test_cases = TestCases::default()
1778 .with_range(0..3)
1779 .with_range(1..2)
1780 .with_indices(vec![1])
1781 .with_indices(vec![2])
1782 .with_file_version(LanceFileVersion::V2_1);
1783 check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
1784 .await;
1785 }
1786
1787 #[rstest]
1788 #[test_log::test(tokio::test)]
1789 async fn test_simple_two_page_list(
1790 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1791 structural_encoding: &str,
1792 ) {
1793 let items_builder = Int64Builder::new();
1796 let mut list_builder = ListBuilder::new(items_builder);
1797 for i in 0..512 {
1798 list_builder.append_value([Some(i), Some(i * 2)]);
1799 }
1800 let list_array_1 = list_builder.finish();
1801
1802 let items_builder = Int64Builder::new();
1803 let mut list_builder = ListBuilder::new(items_builder);
1804 for i in 0..512 {
1805 let i = i + 512;
1806 list_builder.append_value([Some(i), Some(i * 2)]);
1807 }
1808 let list_array_2 = list_builder.finish();
1809
1810 let mut metadata = HashMap::new();
1811 metadata.insert(
1812 STRUCTURAL_ENCODING_META_KEY.to_string(),
1813 structural_encoding.into(),
1814 );
1815
1816 let test_cases = TestCases::default()
1817 .with_file_version(LanceFileVersion::V2_1)
1818 .with_page_sizes(vec![100])
1819 .with_range(800..900);
1820 check_round_trip_encoding_of_data(
1821 vec![Arc::new(list_array_1), Arc::new(list_array_2)],
1822 &test_cases,
1823 metadata,
1824 )
1825 .await;
1826 }
1827
1828 #[test_log::test(tokio::test)]
1829 async fn test_simple_large_list() {
1830 let items_builder = Int32Builder::new();
1831 let mut list_builder = LargeListBuilder::new(items_builder);
1832 list_builder.append_value([Some(1), Some(2), Some(3)]);
1833 list_builder.append_value([Some(4), Some(5)]);
1834 list_builder.append_null();
1835 list_builder.append_value([Some(6), Some(7), Some(8)]);
1836 let list_array = list_builder.finish();
1837
1838 let test_cases = TestCases::default()
1839 .with_range(0..2)
1840 .with_range(0..3)
1841 .with_range(1..3)
1842 .with_indices(vec![1, 3]);
1843 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1844 .await;
1845 }
1846
1847 #[test_log::test(tokio::test)]
1848 async fn test_empty_lists() {
1849 let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
1852 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
1854 let items_builder = Int32Builder::new();
1855 let mut list_builder = ListBuilder::new(items_builder);
1856 for idx in order {
1857 list_builder.append_value(values[idx].clone());
1858 }
1859 let list_array = Arc::new(list_builder.finish());
1860 let test_cases = TestCases::default()
1861 .with_indices(vec![1])
1862 .with_indices(vec![0])
1863 .with_indices(vec![2])
1864 .with_indices(vec![0, 1]);
1865 check_round_trip_encoding_of_data(
1866 vec![list_array.clone()],
1867 &test_cases,
1868 HashMap::new(),
1869 )
1870 .await;
1871 let test_cases = test_cases.with_batch_size(1);
1872 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1873 }
1874
1875 let items_builder = Int32Builder::new();
1880 let mut list_builder = ListBuilder::new(items_builder);
1881 list_builder.append(true);
1882 list_builder.append_null();
1883 list_builder.append(true);
1884 let list_array = Arc::new(list_builder.finish());
1885
1886 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1887 check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1888 .await;
1889 let test_cases = test_cases.with_batch_size(1);
1890 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1891
1892 let items_builder = StringBuilder::new();
1897 let mut list_builder = ListBuilder::new(items_builder);
1898 list_builder.append(true);
1899 list_builder.append_null();
1900 list_builder.append(true);
1901 let list_array = Arc::new(list_builder.finish());
1902
1903 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1904 check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1905 .await;
1906 let test_cases = test_cases.with_batch_size(1);
1907 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1908 }
1909
1910 #[test_log::test(tokio::test)]
1911 #[ignore] async fn test_jumbo_list() {
1913 let items = BooleanArray::new_null(1024 * 1024);
1917 let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
1918 let list_arr = Arc::new(ListArray::new(
1919 Arc::new(Field::new("item", DataType::Boolean, true)),
1920 offsets,
1921 Arc::new(items),
1922 None,
1923 )) as ArrayRef;
1924 let arrs = vec![list_arr; 5000];
1925
1926 let test_cases = TestCases::default().without_validation();
1928 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
1929 }
1930}