1use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7 cast::AsArray,
8 new_empty_array,
9 types::{Int32Type, Int64Type, UInt64Type},
10 Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_arrow::list::ListArrayExt;
16use log::trace;
17use snafu::location;
18use tokio::task::JoinHandle;
19
20use lance_core::{cache::FileMetadataCache, Error, Result};
21
22use crate::{
23 buffer::LanceBuffer,
24 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
25 decoder::{
26 DecodeArrayTask, DecodeBatchScheduler, DecodedArray, FieldScheduler, FilterExpression,
27 ListPriorityRange, LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding,
28 PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
29 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
30 StructuralSchedulingJob,
31 },
32 encoder::{
33 ArrayEncoder, EncodeTask, EncodedArray, EncodedColumn, EncodedPage, FieldEncoder,
34 OutOfLineBuffers,
35 },
36 encodings::logical::r#struct::SimpleStructScheduler,
37 format::pb,
38 repdef::RepDefBuilder,
39 EncodingsIo,
40};
41
42use super::{primitive::AccumulationQueue, r#struct::SimpleStructDecoder};
43
44#[derive(Debug)]
69struct ListRequest {
70 num_lists: u64,
72 includes_extra_offset: bool,
74 null_offset_adjustment: u64,
76 items_offset: u64,
78}
79
80#[derive(Debug)]
81struct ListRequestsIter {
82 list_requests: VecDeque<ListRequest>,
84 offsets_requests: Vec<Range<u64>>,
85}
86
87impl ListRequestsIter {
88 fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
91 let mut items_offset = 0;
92 let mut offsets_offset = 0;
93 let mut page_infos_iter = page_infos.iter();
94 let mut cur_page_info = page_infos_iter.next().unwrap();
95 let mut list_requests = VecDeque::new();
96 let mut offsets_requests = Vec::new();
97
98 for range in row_ranges {
101 let mut range = range.clone();
102
103 while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
105 trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
106 offsets_offset += cur_page_info.offsets_in_page;
107 items_offset += cur_page_info.num_items_referenced_by_page;
108 cur_page_info = page_infos_iter.next().unwrap();
109 }
110
111 let mut includes_extra_offset = range.start != offsets_offset;
114 if includes_extra_offset {
115 offsets_requests.push(range.start - 1..range.end);
116 } else {
117 offsets_requests.push(range.clone());
118 }
119
120 while !range.is_empty() {
123 let end = offsets_offset + cur_page_info.offsets_in_page;
126 let last = end >= range.end;
127 let end = end.min(range.end);
128 list_requests.push_back(ListRequest {
129 num_lists: end - range.start,
130 includes_extra_offset,
131 null_offset_adjustment: cur_page_info.null_offset_adjustment,
132 items_offset,
133 });
134
135 includes_extra_offset = false;
136 range.start = end;
137 if !last {
140 offsets_offset += cur_page_info.offsets_in_page;
141 items_offset += cur_page_info.num_items_referenced_by_page;
142 cur_page_info = page_infos_iter.next().unwrap();
143 }
144 }
145 }
146 Self {
147 list_requests,
148 offsets_requests,
149 }
150 }
151
152 fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
154 let mut list_requests = Vec::new();
155 while num_offsets > 0 {
156 let req = self.list_requests.front_mut().unwrap();
157 if req.includes_extra_offset {
159 num_offsets -= 1;
160 debug_assert_ne!(num_offsets, 0);
161 }
162 if num_offsets >= req.num_lists {
163 num_offsets -= req.num_lists;
164 list_requests.push(self.list_requests.pop_front().unwrap());
165 } else {
166 let sub_req = ListRequest {
167 num_lists: num_offsets,
168 includes_extra_offset: req.includes_extra_offset,
169 null_offset_adjustment: req.null_offset_adjustment,
170 items_offset: req.items_offset,
171 };
172
173 list_requests.push(sub_req);
174 req.includes_extra_offset = false;
175 req.num_lists -= num_offsets;
176 num_offsets = 0;
177 }
178 }
179 list_requests
180 }
181}
182
183fn decode_offsets(
203 offsets: &dyn Array,
204 list_requests: &[ListRequest],
205 null_offset_adjustment: u64,
206) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
207 let numeric_offsets = offsets.as_primitive::<UInt64Type>();
209 let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
211 let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
212 let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
213 normalized_offsets.push(0);
215 let mut last_normalized_offset = 0;
216 let offsets_values = numeric_offsets.values();
217
218 let mut item_ranges = VecDeque::new();
219 let mut offsets_offset: u32 = 0;
220 debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
222 for req in list_requests {
223 let num_lists = req.num_lists;
225
226 let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
232 if !req.includes_extra_offset {
233 let first_offset_idx = 0_usize;
235 let num_offsets = num_lists as usize;
236 let items_start = 0;
237 let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
238 let items_range = items_start..items_end;
239 (items_range, first_offset_idx, num_offsets)
240 } else {
241 let first_offset_idx = offsets_offset as usize;
244 let num_offsets = num_lists as usize + 1;
245 let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
246 let items_end =
247 offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
248 let items_range = items_start..items_end;
249 (items_range, first_offset_idx, num_offsets)
250 };
251
252 let validity_start = if !req.includes_extra_offset {
265 0
266 } else {
267 offsets_to_norm_start + 1
268 };
269 for off in offsets_values
270 .slice(validity_start, num_lists as usize)
271 .iter()
272 {
273 validity_buffer.append(*off < null_offset_adjustment);
274 }
275
276 if !req.includes_extra_offset {
278 let first_item = offsets_values[0] % null_offset_adjustment;
279 normalized_offsets.push(first_item);
280 last_normalized_offset = first_item;
281 }
282
283 normalized_offsets.extend(
287 offsets_values
288 .slice(offsets_to_norm_start, num_offsets_to_norm)
289 .windows(2)
290 .map(|w| {
291 let start = w[0] % null_offset_adjustment;
292 let end = w[1] % null_offset_adjustment;
293 if end < start {
294 panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
295 }
296 let length = end - start;
297 last_normalized_offset += length;
298 last_normalized_offset
299 }),
300 );
301 trace!(
302 "List offsets range of {} lists maps to item range {:?}",
303 num_lists,
304 items_range
305 );
306 offsets_offset += num_offsets_to_norm as u32;
307 if !items_range.is_empty() {
308 let items_range =
309 items_range.start + req.items_offset..items_range.end + req.items_offset;
310 item_ranges.push_back(items_range);
311 }
312 }
313
314 let validity = validity_buffer.finish();
315 (item_ranges, normalized_offsets, validity)
316}
317
318#[allow(clippy::too_many_arguments)]
325async fn indirect_schedule_task(
326 mut offsets_decoder: Box<dyn LogicalPageDecoder>,
327 list_requests: Vec<ListRequest>,
328 null_offset_adjustment: u64,
329 items_scheduler: Arc<dyn FieldScheduler>,
330 items_type: DataType,
331 io: Arc<dyn EncodingsIo>,
332 cache: Arc<FileMetadataCache>,
333 priority: Box<dyn PriorityRange>,
334) -> Result<IndirectlyLoaded> {
335 let num_offsets = offsets_decoder.num_rows();
336 offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
339 let decode_task = offsets_decoder.drain(num_offsets)?;
340 let offsets = decode_task.task.decode()?;
341
342 let (item_ranges, offsets, validity) =
343 decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
344
345 trace!(
346 "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
347 item_ranges,
348 items_scheduler.num_rows(),
349 priority
350 );
351 let offsets: Arc<[u64]> = offsets.into();
352
353 if item_ranges.is_empty() {
355 debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
356 return Ok(IndirectlyLoaded {
357 root_decoder: None,
358 offsets,
359 validity,
360 });
361 }
362 let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
363 let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
364
365 let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
367 let indirect_root_scheduler =
368 SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone());
369 let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
370 Arc::new(indirect_root_scheduler),
371 root_fields.clone(),
372 cache,
373 );
374 let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
375
376 let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
377
378 let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
379 &item_ranges,
380 &FilterExpression::no_filter(),
382 io,
383 Some(priority),
384 )?;
385
386 for message in indirect_messages {
387 for decoder in message.decoders {
388 let decoder = decoder.into_legacy();
389 if !decoder.path.is_empty() {
390 root_decoder.accept_child(decoder)?;
391 }
392 }
393 }
394
395 Ok(IndirectlyLoaded {
396 offsets,
397 validity,
398 root_decoder: Some(root_decoder),
399 })
400}
401
402#[derive(Debug)]
403struct ListFieldSchedulingJob<'a> {
404 scheduler: &'a ListFieldScheduler,
405 offsets: Box<dyn SchedulingJob + 'a>,
406 num_rows: u64,
407 list_requests_iter: ListRequestsIter,
408}
409
410impl<'a> ListFieldSchedulingJob<'a> {
411 fn try_new(
412 scheduler: &'a ListFieldScheduler,
413 ranges: &[Range<u64>],
414 filter: &FilterExpression,
415 ) -> Result<Self> {
416 let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
417 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
418 let offsets = scheduler
419 .offsets_scheduler
420 .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
421 Ok(Self {
422 scheduler,
423 offsets,
424 list_requests_iter,
425 num_rows,
426 })
427 }
428}
429
430impl SchedulingJob for ListFieldSchedulingJob<'_> {
431 fn schedule_next(
432 &mut self,
433 context: &mut SchedulerContext,
434 priority: &dyn PriorityRange,
435 ) -> Result<ScheduledScanLine> {
436 let next_offsets = self.offsets.schedule_next(context, priority)?;
437 let offsets_scheduled = next_offsets.rows_scheduled;
438 let list_reqs = self.list_requests_iter.next(offsets_scheduled);
439 trace!(
440 "Scheduled {} offsets which maps to list requests: {:?}",
441 offsets_scheduled,
442 list_reqs
443 );
444 let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
445 debug_assert!(list_reqs
448 .iter()
449 .all(|req| req.null_offset_adjustment == null_offset_adjustment));
450 let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
451 let next_offsets_decoder = next_offsets
453 .decoders
454 .into_iter()
455 .next()
456 .unwrap()
457 .into_legacy()
458 .decoder;
459
460 let items_scheduler = self.scheduler.items_scheduler.clone();
461 let items_type = self.scheduler.items_field.data_type().clone();
462 let io = context.io().clone();
463 let cache = context.cache().clone();
464
465 let indirect_fut = tokio::spawn(indirect_schedule_task(
467 next_offsets_decoder,
468 list_reqs,
469 null_offset_adjustment,
470 items_scheduler,
471 items_type,
472 io,
473 cache,
474 priority.box_clone(),
475 ));
476
477 let decoder = Box::new(ListPageDecoder {
479 offsets: Arc::new([]),
480 validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
481 item_decoder: None,
482 rows_drained: 0,
483 rows_loaded: 0,
484 items_field: self.scheduler.items_field.clone(),
485 num_rows,
486 unloaded: Some(indirect_fut),
487 offset_type: self.scheduler.offset_type.clone(),
488 data_type: self.scheduler.list_type.clone(),
489 });
490 let decoder = context.locate_decoder(decoder);
491 Ok(ScheduledScanLine {
492 decoders: vec![MessageType::DecoderReady(decoder)],
493 rows_scheduled: num_rows,
494 })
495 }
496
497 fn num_rows(&self) -> u64 {
498 self.num_rows
499 }
500}
501
502#[derive(Debug)]
517pub struct ListFieldScheduler {
518 offsets_scheduler: Arc<dyn FieldScheduler>,
519 items_scheduler: Arc<dyn FieldScheduler>,
520 items_field: Arc<Field>,
521 offset_type: DataType,
522 list_type: DataType,
523 offset_page_info: Vec<OffsetPageInfo>,
524}
525
526#[derive(Debug)]
530pub struct OffsetPageInfo {
531 pub offsets_in_page: u64,
532 pub null_offset_adjustment: u64,
533 pub num_items_referenced_by_page: u64,
534}
535
536impl ListFieldScheduler {
537 pub fn new(
539 offsets_scheduler: Arc<dyn FieldScheduler>,
540 items_scheduler: Arc<dyn FieldScheduler>,
541 items_field: Arc<Field>,
542 offset_type: DataType,
544 offset_page_info: Vec<OffsetPageInfo>,
545 ) -> Self {
546 let list_type = match &offset_type {
547 DataType::Int32 => DataType::List(items_field.clone()),
548 DataType::Int64 => DataType::LargeList(items_field.clone()),
549 _ => panic!("Unexpected offset type {}", offset_type),
550 };
551 Self {
552 offsets_scheduler,
553 items_scheduler,
554 items_field,
555 offset_type,
556 offset_page_info,
557 list_type,
558 }
559 }
560}
561
562impl FieldScheduler for ListFieldScheduler {
563 fn schedule_ranges<'a>(
564 &'a self,
565 ranges: &[Range<u64>],
566 filter: &FilterExpression,
567 ) -> Result<Box<dyn SchedulingJob + 'a>> {
568 Ok(Box::new(ListFieldSchedulingJob::try_new(
569 self, ranges, filter,
570 )?))
571 }
572
573 fn num_rows(&self) -> u64 {
574 self.offsets_scheduler.num_rows()
575 }
576
577 fn initialize<'a>(
578 &'a self,
579 _filter: &'a FilterExpression,
580 _context: &'a SchedulerContext,
581 ) -> BoxFuture<'a, Result<()>> {
582 std::future::ready(Ok(())).boxed()
584 }
585}
586
587#[derive(Debug)]
598struct ListPageDecoder {
599 unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
600 offsets: Arc<[u64]>,
602 validity: BooleanBuffer,
603 item_decoder: Option<SimpleStructDecoder>,
604 num_rows: u64,
605 rows_drained: u64,
606 rows_loaded: u64,
607 items_field: Arc<Field>,
608 offset_type: DataType,
609 data_type: DataType,
610}
611
612struct ListDecodeTask {
613 offsets: Vec<u64>,
614 validity: BooleanBuffer,
615 items: Option<Box<dyn DecodeArrayTask>>,
617 items_field: Arc<Field>,
618 offset_type: DataType,
619}
620
621impl DecodeArrayTask for ListDecodeTask {
622 fn decode(self: Box<Self>) -> Result<ArrayRef> {
623 let items = self
624 .items
625 .map(|items| {
626 let wrapped_items = items.decode()?;
629 Result::Ok(wrapped_items.as_struct().column(0).clone())
630 })
631 .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
632
633 let offsets = UInt64Array::from(self.offsets);
639 let validity = NullBuffer::new(self.validity);
640 let validity = if validity.null_count() == 0 {
641 None
642 } else {
643 Some(validity)
644 };
645 let min_offset = UInt64Array::new_scalar(offsets.value(0));
646 let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
647 match &self.offset_type {
648 DataType::Int32 => {
649 let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
650 let offsets_i32 = offsets.as_primitive::<Int32Type>();
651 let offsets = OffsetBuffer::new(offsets_i32.values().clone());
652
653 Ok(Arc::new(ListArray::try_new(
654 self.items_field.clone(),
655 offsets,
656 items,
657 validity,
658 )?))
659 }
660 DataType::Int64 => {
661 let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
662 let offsets_i64 = offsets.as_primitive::<Int64Type>();
663 let offsets = OffsetBuffer::new(offsets_i64.values().clone());
664
665 Ok(Arc::new(LargeListArray::try_new(
666 self.items_field.clone(),
667 offsets,
668 items,
669 validity,
670 )?))
671 }
672 _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
673 }
674 }
675}
676
677fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
682 let mut result = match to_search.binary_search(&target) {
683 Ok(idx) => idx,
684 Err(idx) => idx - 1,
685 };
686 while result < (to_search.len() - 1) && to_search[result + 1] == target {
687 result += 1;
688 }
689 result as u64
690}
691
692impl LogicalPageDecoder for ListPageDecoder {
693 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
694 async move {
695 if self.unloaded.is_some() {
698 trace!("List scheduler needs to wait for indirect I/O to complete");
699 let indirectly_loaded = self.unloaded.take().unwrap().await;
700 if indirectly_loaded.is_err() {
701 match indirectly_loaded.unwrap_err().try_into_panic() {
702 Ok(err) => std::panic::resume_unwind(err),
703 Err(err) => panic!("{:?}", err),
704 };
705 }
706 let indirectly_loaded = indirectly_loaded.unwrap()?;
707
708 self.offsets = indirectly_loaded.offsets;
709 self.validity = indirectly_loaded.validity;
710 self.item_decoder = indirectly_loaded.root_decoder;
711 }
712 if self.rows_loaded > loaded_need {
713 return Ok(());
714 }
715
716 let boundary = loaded_need as usize;
717 debug_assert!(boundary < self.num_rows as usize);
718 let items_needed = self.offsets[boundary + 1].saturating_sub(1);
721 trace!(
722 "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
723 loaded_need,
724 self.rows_loaded,
725 self.num_rows,
726 items_needed,
727 );
728
729 let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
730 item_decoder.wait_for_loaded(items_needed).await?;
731 item_decoder.rows_loaded()
732 } else {
733 0
734 };
735
736 self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
737 trace!("List decoder now has {} loaded rows", self.rows_loaded);
738
739 Ok(())
740 }
741 .boxed()
742 }
743
744 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
745 let mut actual_num_rows = num_rows;
747 let item_start = self.offsets[self.rows_drained as usize];
748 if self.offset_type != DataType::Int64 {
749 while actual_num_rows > 0 {
752 let num_items =
753 self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
754 if num_items <= i32::MAX as u64 {
755 break;
756 }
757 actual_num_rows -= 1;
760 }
761 }
762 if actual_num_rows < num_rows {
763 return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
768 }
769 let offsets = self.offsets
770 [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
771 .to_vec();
772 let validity = self
773 .validity
774 .slice(self.rows_drained as usize, actual_num_rows as usize);
775 let start = offsets[0];
776 let end = offsets[offsets.len() - 1];
777 let num_items_to_drain = end - start;
778
779 let item_decode = if num_items_to_drain == 0 {
780 None
781 } else {
782 self.item_decoder
783 .as_mut()
784 .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
785 .transpose()?
786 };
787
788 self.rows_drained += num_rows;
789 Ok(NextDecodeTask {
790 num_rows,
791 task: Box::new(ListDecodeTask {
792 offsets,
793 validity,
794 items_field: self.items_field.clone(),
795 items: item_decode,
796 offset_type: self.offset_type.clone(),
797 }) as Box<dyn DecodeArrayTask>,
798 })
799 }
800
801 fn num_rows(&self) -> u64 {
802 self.num_rows
803 }
804
805 fn rows_loaded(&self) -> u64 {
806 self.rows_loaded
807 }
808
809 fn rows_drained(&self) -> u64 {
810 self.rows_drained
811 }
812
813 fn data_type(&self) -> &DataType {
814 &self.data_type
815 }
816}
817
818struct IndirectlyLoaded {
819 offsets: Arc<[u64]>,
820 validity: BooleanBuffer,
821 root_decoder: Option<SimpleStructDecoder>,
822}
823
824impl std::fmt::Debug for IndirectlyLoaded {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 f.debug_struct("IndirectlyLoaded")
827 .field("offsets", &self.offsets)
828 .field("validity", &self.validity)
829 .finish()
830 }
831}
832
833#[derive(Debug)]
863struct ListOffsetsEncoder {
864 accumulation_queue: AccumulationQueue,
866 inner_encoder: Arc<dyn ArrayEncoder>,
868 column_index: u32,
869}
870
871impl ListOffsetsEncoder {
872 fn new(
873 cache_bytes: u64,
874 keep_original_array: bool,
875 column_index: u32,
876 inner_encoder: Arc<dyn ArrayEncoder>,
877 ) -> Self {
878 Self {
879 accumulation_queue: AccumulationQueue::new(
880 cache_bytes,
881 column_index,
882 keep_original_array,
883 ),
884 inner_encoder,
885 column_index,
886 }
887 }
888
889 fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
891 match list_arr.data_type() {
892 DataType::List(_) => {
893 let offsets = list_arr.as_list::<i32>().offsets().clone();
894 Arc::new(Int32Array::new(offsets.into_inner(), None))
895 }
896 DataType::LargeList(_) => {
897 let offsets = list_arr.as_list::<i64>().offsets().clone();
898 Arc::new(Int64Array::new(offsets.into_inner(), None))
899 }
900 _ => panic!(),
901 }
902 }
903
904 fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
907 if let Some(validity) = list_arr.nulls() {
908 Arc::new(BooleanArray::new(validity.inner().clone(), None))
909 } else {
910 new_empty_array(&DataType::Boolean)
913 }
914 }
915
916 fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
917 let inner_encoder = self.inner_encoder.clone();
918 let column_idx = self.column_index;
919 let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
922 let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
923
924 tokio::task::spawn(async move {
925 let num_rows =
926 offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
927 let num_rows = num_rows as u64;
928 let mut buffer_index = 0;
929 let array = Self::do_encode(
930 offset_arrays,
931 validity_arrays,
932 &mut buffer_index,
933 num_rows,
934 inner_encoder,
935 )?;
936 let (data, description) = array.into_buffers();
937 Ok(EncodedPage {
938 data,
939 description: PageEncoding::Legacy(description),
940 num_rows,
941 column_idx,
942 row_number: 0, })
944 })
945 .map(|res_res| res_res.unwrap())
946 .boxed()
947 }
948
949 fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
950 let offsets = Self::extract_offsets(list_arr);
951 let validity = Self::extract_validity(list_arr);
952 let num_rows = offsets.len() as u64;
953 if let Some(mut arrays) = self
956 .accumulation_queue
957 .insert(offsets, 0, num_rows)
958 {
959 arrays.0.push(validity);
960 Some(self.make_encode_task(arrays.0))
961 } else if let Some(arrays) = self
962 .accumulation_queue
963 .insert(validity, 0, num_rows)
964 {
965 Some(self.make_encode_task(arrays.0))
966 } else {
967 None
968 }
969 }
970
971 fn flush(&mut self) -> Option<EncodeTask> {
972 if let Some(arrays) = self.accumulation_queue.flush() {
973 Some(self.make_encode_task(arrays.0))
974 } else {
975 None
976 }
977 }
978
979 fn get_offset_span(array: &dyn Array) -> u64 {
982 match array.data_type() {
983 DataType::Int32 => {
984 let arr_i32 = array.as_primitive::<Int32Type>();
985 (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
986 }
987 DataType::Int64 => {
988 let arr_i64 = array.as_primitive::<Int64Type>();
989 (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
990 }
991 _ => panic!(),
992 }
993 }
994
995 fn extend_offsets_vec_u64(
998 dest: &mut Vec<u64>,
999 offsets: &dyn Array,
1000 validity: Option<&BooleanArray>,
1001 base: u64,
1003 null_offset_adjustment: u64,
1004 ) {
1005 match offsets.data_type() {
1006 DataType::Int32 => {
1007 let offsets_i32 = offsets.as_primitive::<Int32Type>();
1008 let start = offsets_i32.value(0) as u64;
1009 let modifier = base as i64 - start as i64;
1013 if let Some(validity) = validity {
1014 dest.extend(
1015 offsets_i32
1016 .values()
1017 .iter()
1018 .skip(1)
1019 .zip(validity.values().iter())
1020 .map(|(&off, valid)| {
1021 (off as i64 + modifier) as u64
1022 + (!valid as u64 * null_offset_adjustment)
1023 }),
1024 );
1025 } else {
1026 dest.extend(
1027 offsets_i32
1028 .values()
1029 .iter()
1030 .skip(1)
1031 .map(|&v| (v as i64 + modifier) as u64),
1033 );
1034 }
1035 }
1036 DataType::Int64 => {
1037 let offsets_i64 = offsets.as_primitive::<Int64Type>();
1038 let start = offsets_i64.value(0) as u64;
1039 let modifier = base as i64 - start as i64;
1043 if let Some(validity) = validity {
1044 dest.extend(
1045 offsets_i64
1046 .values()
1047 .iter()
1048 .skip(1)
1049 .zip(validity.values().iter())
1050 .map(|(&off, valid)| {
1051 (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1052 }),
1053 )
1054 } else {
1055 dest.extend(
1056 offsets_i64
1057 .values()
1058 .iter()
1059 .skip(1)
1060 .map(|&v| (v + modifier) as u64),
1061 );
1062 }
1063 }
1064 _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1065 }
1066 }
1067
1068 fn do_encode_u64(
1069 offset_arrays: Vec<ArrayRef>,
1070 validity: Vec<Option<&BooleanArray>>,
1071 num_offsets: u64,
1072 null_offset_adjustment: u64,
1073 buffer_index: &mut u32,
1074 inner_encoder: Arc<dyn ArrayEncoder>,
1075 ) -> Result<EncodedArray> {
1076 let mut offsets = Vec::with_capacity(num_offsets as usize);
1077 for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1078 let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1079 Self::extend_offsets_vec_u64(
1080 &mut offsets,
1081 &offsets_arr,
1082 validity_arr,
1083 last_prev_offset,
1084 null_offset_adjustment,
1085 );
1086 }
1087 let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1088 bits_per_value: 64,
1089 data: LanceBuffer::reinterpret_vec(offsets),
1090 num_values: num_offsets,
1091 block_info: BlockInfo::new(),
1092 });
1093 inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1094 }
1095
1096 fn do_encode(
1097 offset_arrays: Vec<ArrayRef>,
1098 validity_arrays: Vec<ArrayRef>,
1099 buffer_index: &mut u32,
1100 num_offsets: u64,
1101 inner_encoder: Arc<dyn ArrayEncoder>,
1102 ) -> Result<EncodedArray> {
1103 let validity_arrays = validity_arrays
1104 .iter()
1105 .map(|v| {
1106 if v.is_empty() {
1107 None
1108 } else {
1109 Some(v.as_boolean())
1110 }
1111 })
1112 .collect::<Vec<_>>();
1113 debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1114 let total_span = offset_arrays
1115 .iter()
1116 .map(|arr| Self::get_offset_span(arr.as_ref()))
1117 .sum::<u64>();
1118 let null_offset_adjustment = total_span + 1;
1120 let encoded_offsets = Self::do_encode_u64(
1121 offset_arrays,
1122 validity_arrays,
1123 num_offsets,
1124 null_offset_adjustment,
1125 buffer_index,
1126 inner_encoder,
1127 )?;
1128 Ok(EncodedArray {
1129 data: encoded_offsets.data,
1130 encoding: pb::ArrayEncoding {
1131 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1132 pb::List {
1133 offsets: Some(Box::new(encoded_offsets.encoding)),
1134 null_offset_adjustment,
1135 num_items: total_span,
1136 },
1137 ))),
1138 },
1139 })
1140 }
1141}
1142
1143pub struct ListFieldEncoder {
1144 offsets_encoder: ListOffsetsEncoder,
1145 items_encoder: Box<dyn FieldEncoder>,
1146}
1147
1148impl ListFieldEncoder {
1149 pub fn new(
1150 items_encoder: Box<dyn FieldEncoder>,
1151 inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1152 cache_bytes_per_columns: u64,
1153 keep_original_array: bool,
1154 column_index: u32,
1155 ) -> Self {
1156 Self {
1157 offsets_encoder: ListOffsetsEncoder::new(
1158 cache_bytes_per_columns,
1159 keep_original_array,
1160 column_index,
1161 inner_offsets_encoder,
1162 ),
1163 items_encoder,
1164 }
1165 }
1166
1167 fn combine_tasks(
1168 offsets_tasks: Vec<EncodeTask>,
1169 item_tasks: Vec<EncodeTask>,
1170 ) -> Result<Vec<EncodeTask>> {
1171 let mut all_tasks = offsets_tasks;
1172 let item_tasks = item_tasks;
1173 all_tasks.extend(item_tasks);
1174 Ok(all_tasks)
1175 }
1176}
1177
1178impl FieldEncoder for ListFieldEncoder {
1179 fn maybe_encode(
1180 &mut self,
1181 array: ArrayRef,
1182 external_buffers: &mut OutOfLineBuffers,
1183 repdef: RepDefBuilder,
1184 row_number: u64,
1185 num_rows: u64,
1186 ) -> Result<Vec<EncodeTask>> {
1187 let items = match array.data_type() {
1191 DataType::List(_) => {
1192 let list_arr = array.as_list::<i32>();
1193 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1194 let items_end =
1195 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1196 list_arr
1197 .values()
1198 .slice(items_start, items_end - items_start)
1199 }
1200 DataType::LargeList(_) => {
1201 let list_arr = array.as_list::<i64>();
1202 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1203 let items_end =
1204 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1205 list_arr
1206 .values()
1207 .slice(items_start, items_end - items_start)
1208 }
1209 _ => panic!(),
1210 };
1211 let offsets_tasks = self
1212 .offsets_encoder
1213 .maybe_encode_offsets_and_validity(array.as_ref())
1214 .map(|task| vec![task])
1215 .unwrap_or_default();
1216 let mut item_tasks = self.items_encoder.maybe_encode(
1217 items,
1218 external_buffers,
1219 repdef,
1220 row_number,
1221 num_rows,
1222 )?;
1223 if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1224 item_tasks = self.items_encoder.flush(external_buffers)?;
1230 }
1231 Self::combine_tasks(offsets_tasks, item_tasks)
1232 }
1233
1234 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1235 let offsets_tasks = self
1236 .offsets_encoder
1237 .flush()
1238 .map(|task| vec![task])
1239 .unwrap_or_default();
1240 let item_tasks = self.items_encoder.flush(external_buffers)?;
1241 Self::combine_tasks(offsets_tasks, item_tasks)
1242 }
1243
1244 fn num_columns(&self) -> u32 {
1245 self.items_encoder.num_columns() + 1
1246 }
1247
1248 fn finish(
1249 &mut self,
1250 external_buffers: &mut OutOfLineBuffers,
1251 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1252 let inner_columns = self.items_encoder.finish(external_buffers);
1253 async move {
1254 let mut columns = vec![EncodedColumn::default()];
1255 let inner_columns = inner_columns.await?;
1256 columns.extend(inner_columns);
1257 Ok(columns)
1258 }
1259 .boxed()
1260 }
1261}
1262
1263pub struct ListStructuralEncoder {
1271 child: Box<dyn FieldEncoder>,
1272}
1273
1274impl ListStructuralEncoder {
1275 pub fn new(child: Box<dyn FieldEncoder>) -> Self {
1276 Self { child }
1277 }
1278}
1279
1280impl FieldEncoder for ListStructuralEncoder {
1281 fn maybe_encode(
1282 &mut self,
1283 array: ArrayRef,
1284 external_buffers: &mut OutOfLineBuffers,
1285 mut repdef: RepDefBuilder,
1286 row_number: u64,
1287 num_rows: u64,
1288 ) -> Result<Vec<EncodeTask>> {
1289 let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
1290 let has_garbage_values =
1291 repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
1292 if has_garbage_values {
1293 list_arr.filter_garbage_nulls().trimmed_values()
1294 } else {
1295 list_arr.trimmed_values()
1296 }
1297 } else if let Some(list_arr) = array.as_list_opt::<i64>() {
1298 let has_garbage_values =
1299 repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
1300 if has_garbage_values {
1301 list_arr.filter_garbage_nulls().trimmed_values()
1302 } else {
1303 list_arr.trimmed_values()
1304 }
1305 } else {
1306 panic!("List encoder used for non-list data")
1307 };
1308 self.child
1309 .maybe_encode(values, external_buffers, repdef, row_number, num_rows)
1310 }
1311
1312 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1313 self.child.flush(external_buffers)
1314 }
1315
1316 fn num_columns(&self) -> u32 {
1317 self.child.num_columns()
1318 }
1319
1320 fn finish(
1321 &mut self,
1322 external_buffers: &mut OutOfLineBuffers,
1323 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
1324 self.child.finish(external_buffers)
1325 }
1326}
1327
1328#[derive(Debug)]
1329pub struct StructuralListScheduler {
1330 child: Box<dyn StructuralFieldScheduler>,
1331}
1332
1333impl StructuralListScheduler {
1334 pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
1335 Self { child }
1336 }
1337}
1338
1339impl StructuralFieldScheduler for StructuralListScheduler {
1340 fn schedule_ranges<'a>(
1341 &'a self,
1342 ranges: &[Range<u64>],
1343 filter: &FilterExpression,
1344 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
1345 let child = self.child.schedule_ranges(ranges, filter)?;
1346
1347 Ok(Box::new(StructuralListSchedulingJob::new(child)))
1348 }
1349
1350 fn initialize<'a>(
1351 &'a mut self,
1352 filter: &'a FilterExpression,
1353 context: &'a SchedulerContext,
1354 ) -> BoxFuture<'a, Result<()>> {
1355 self.child.initialize(filter, context)
1356 }
1357}
1358
1359#[derive(Debug)]
1364struct StructuralListSchedulingJob<'a> {
1365 child: Box<dyn StructuralSchedulingJob + 'a>,
1366}
1367
1368impl<'a> StructuralListSchedulingJob<'a> {
1369 fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
1370 Self { child }
1371 }
1372}
1373
1374impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
1375 fn schedule_next(
1376 &mut self,
1377 context: &mut SchedulerContext,
1378 ) -> Result<Option<ScheduledScanLine>> {
1379 self.child.schedule_next(context)
1380 }
1381}
1382
1383#[derive(Debug)]
1384pub struct StructuralListDecoder {
1385 child: Box<dyn StructuralFieldDecoder>,
1386 data_type: DataType,
1387}
1388
1389impl StructuralListDecoder {
1390 pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
1391 Self { child, data_type }
1392 }
1393}
1394
1395impl StructuralFieldDecoder for StructuralListDecoder {
1396 fn accept_page(&mut self, child: crate::decoder::LoadedPage) -> Result<()> {
1397 self.child.accept_page(child)
1398 }
1399
1400 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
1401 let child_task = self.child.drain(num_rows)?;
1402 Ok(Box::new(StructuralListDecodeTask::new(
1403 child_task,
1404 self.data_type.clone(),
1405 )))
1406 }
1407
1408 fn data_type(&self) -> &DataType {
1409 &self.data_type
1410 }
1411}
1412
1413#[derive(Debug)]
1414struct StructuralListDecodeTask {
1415 child_task: Box<dyn StructuralDecodeArrayTask>,
1416 data_type: DataType,
1417}
1418
1419impl StructuralListDecodeTask {
1420 fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
1421 Self {
1422 child_task,
1423 data_type,
1424 }
1425 }
1426}
1427
1428impl StructuralDecodeArrayTask for StructuralListDecodeTask {
1429 fn decode(self: Box<Self>) -> Result<DecodedArray> {
1430 let DecodedArray { array, mut repdef } = self.child_task.decode()?;
1431 match &self.data_type {
1432 DataType::List(child_field) => {
1433 let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
1434 let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
1435 Ok(DecodedArray {
1436 array: Arc::new(list_array),
1437 repdef,
1438 })
1439 }
1440 DataType::LargeList(child_field) => {
1441 let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
1442 let list_array =
1443 LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
1444 Ok(DecodedArray {
1445 array: Arc::new(list_array),
1446 repdef,
1447 })
1448 }
1449 _ => panic!("List decoder did not have a list field"),
1450 }
1451 }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456
1457 use std::{collections::HashMap, sync::Arc};
1458
1459 use arrow::array::{Int64Builder, LargeListBuilder, StringBuilder};
1460 use arrow_array::{
1461 builder::{Int32Builder, ListBuilder},
1462 Array, ArrayRef, BooleanArray, ListArray, StructArray, UInt64Array,
1463 };
1464 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
1465 use arrow_schema::{DataType, Field, Fields};
1466 use lance_core::datatypes::{
1467 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
1468 };
1469 use rstest::rstest;
1470
1471 use crate::{
1472 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1473 version::LanceFileVersion,
1474 };
1475
1476 fn make_list_type(inner_type: DataType) -> DataType {
1477 DataType::List(Arc::new(Field::new("item", inner_type, true)))
1478 }
1479
1480 fn make_large_list_type(inner_type: DataType) -> DataType {
1481 DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
1482 }
1483
1484 #[rstest]
1485 #[test_log::test(tokio::test)]
1486 async fn test_list(
1487 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1488 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1489 structural_encoding: &str,
1490 ) {
1491 let mut field_metadata = HashMap::new();
1492 field_metadata.insert(
1493 STRUCTURAL_ENCODING_META_KEY.to_string(),
1494 structural_encoding.into(),
1495 );
1496 let field =
1497 Field::new("", make_list_type(DataType::Int32), true).with_metadata(field_metadata);
1498 check_round_trip_encoding_random(field, version).await;
1499 }
1500
1501 #[test_log::test(tokio::test)]
1502 async fn test_large_list() {
1503 let field = Field::new("", make_large_list_type(DataType::Int32), true);
1504 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1505 }
1506
1507 #[test_log::test(tokio::test)]
1508 async fn test_nested_strings() {
1509 let field = Field::new("", make_list_type(DataType::Utf8), true);
1510 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1511 }
1512
1513 #[test_log::test(tokio::test)]
1514 async fn test_nested_list() {
1515 let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
1516 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1517 }
1518
1519 #[test_log::test(tokio::test)]
1520 async fn test_list_struct_list() {
1521 let struct_type = DataType::Struct(Fields::from(vec![Field::new(
1522 "inner_str",
1523 DataType::Utf8,
1524 false,
1525 )]));
1526
1527 let field = Field::new("", make_list_type(struct_type), true);
1528 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
1529 }
1530
1531 #[test_log::test(tokio::test)]
1532 async fn test_list_struct_empty() {
1533 let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
1534 let items = UInt64Array::from(Vec::<u64>::new());
1535 let structs = StructArray::new(fields, vec![Arc::new(items)], None);
1536 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
1537 let lists = ListArray::new(
1538 Arc::new(Field::new("item", structs.data_type().clone(), true)),
1539 offsets,
1540 Arc::new(structs),
1541 None,
1542 );
1543
1544 check_round_trip_encoding_of_data(
1545 vec![Arc::new(lists)],
1546 &TestCases::default(),
1547 HashMap::new(),
1548 )
1549 .await;
1550 }
1551
1552 #[rstest]
1553 #[test_log::test(tokio::test)]
1554 async fn test_simple_list(
1555 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1556 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1557 structural_encoding: &str,
1558 ) {
1559 let items_builder = Int32Builder::new();
1560 let mut list_builder = ListBuilder::new(items_builder);
1561 list_builder.append_value([Some(1), Some(2), Some(3)]);
1562 list_builder.append_value([Some(4), Some(5)]);
1563 list_builder.append_null();
1564 list_builder.append_value([Some(6), Some(7), Some(8)]);
1565 let list_array = list_builder.finish();
1566
1567 let mut field_metadata = HashMap::new();
1568 field_metadata.insert(
1569 STRUCTURAL_ENCODING_META_KEY.to_string(),
1570 structural_encoding.into(),
1571 );
1572
1573 let test_cases = TestCases::default()
1574 .with_range(0..2)
1575 .with_range(0..3)
1576 .with_range(1..3)
1577 .with_indices(vec![1, 3])
1578 .with_indices(vec![2])
1579 .with_file_version(version);
1580 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1581 .await;
1582 }
1583
1584 #[rstest]
1585 #[test_log::test(tokio::test)]
1586 async fn test_simple_string_list(
1587 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1588 structural_encoding: &str,
1589 ) {
1590 let items_builder = StringBuilder::new();
1591 let mut list_builder = ListBuilder::new(items_builder);
1592 list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
1593 list_builder.append_value([Some("gh"), None]);
1594 list_builder.append_null();
1595 list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
1596 let list_array = list_builder.finish();
1597
1598 let mut field_metadata = HashMap::new();
1599 field_metadata.insert(
1600 STRUCTURAL_ENCODING_META_KEY.to_string(),
1601 structural_encoding.into(),
1602 );
1603
1604 let test_cases = TestCases::default()
1605 .with_range(0..2)
1606 .with_range(0..3)
1607 .with_range(1..3)
1608 .with_indices(vec![1, 3])
1609 .with_indices(vec![2])
1610 .with_file_version(LanceFileVersion::V2_1);
1611 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1612 .await;
1613 }
1614
1615 #[rstest]
1616 #[test_log::test(tokio::test)]
1617 async fn test_simple_sliced_list(
1618 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1619 structural_encoding: &str,
1620 ) {
1621 let items_builder = Int32Builder::new();
1622 let mut list_builder = ListBuilder::new(items_builder);
1623 list_builder.append_value([Some(1), Some(2), Some(3)]);
1624 list_builder.append_value([Some(4), Some(5)]);
1625 list_builder.append_null();
1626 list_builder.append_value([Some(6), Some(7), Some(8)]);
1627 let list_array = list_builder.finish();
1628
1629 let list_array = list_array.slice(1, 2);
1630
1631 let mut field_metadata = HashMap::new();
1632 field_metadata.insert(
1633 STRUCTURAL_ENCODING_META_KEY.to_string(),
1634 structural_encoding.into(),
1635 );
1636
1637 let test_cases = TestCases::default()
1638 .with_range(0..2)
1639 .with_range(1..2)
1640 .with_indices(vec![0])
1641 .with_indices(vec![1])
1642 .with_file_version(LanceFileVersion::V2_1);
1643 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1644 .await;
1645 }
1646
1647 #[rstest]
1648 #[test_log::test(tokio::test)]
1649 async fn test_list_with_garbage_nulls(
1650 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1651 structural_encoding: &str,
1652 ) {
1653 let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1656 let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
1657 let offsets = OffsetBuffer::new(offsets);
1658 let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
1659 let list_arr = ListArray::new(
1660 Arc::new(Field::new("item", DataType::UInt64, true)),
1661 offsets,
1662 Arc::new(items),
1663 Some(list_validity),
1664 );
1665
1666 let mut field_metadata = HashMap::new();
1667 field_metadata.insert(
1668 STRUCTURAL_ENCODING_META_KEY.to_string(),
1669 structural_encoding.into(),
1670 );
1671
1672 let test_cases = TestCases::default()
1673 .with_range(0..3)
1674 .with_range(1..2)
1675 .with_indices(vec![1])
1676 .with_indices(vec![2])
1677 .with_file_version(LanceFileVersion::V2_1);
1678 check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
1679 .await;
1680 }
1681
1682 #[rstest]
1683 #[test_log::test(tokio::test)]
1684 async fn test_simple_two_page_list(
1685 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1686 structural_encoding: &str,
1687 ) {
1688 let items_builder = Int64Builder::new();
1691 let mut list_builder = ListBuilder::new(items_builder);
1692 for i in 0..512 {
1693 list_builder.append_value([Some(i), Some(i * 2)]);
1694 }
1695 let list_array_1 = list_builder.finish();
1696
1697 let items_builder = Int64Builder::new();
1698 let mut list_builder = ListBuilder::new(items_builder);
1699 for i in 0..512 {
1700 let i = i + 512;
1701 list_builder.append_value([Some(i), Some(i * 2)]);
1702 }
1703 let list_array_2 = list_builder.finish();
1704
1705 let mut metadata = HashMap::new();
1706 metadata.insert(
1707 STRUCTURAL_ENCODING_META_KEY.to_string(),
1708 structural_encoding.into(),
1709 );
1710
1711 let test_cases = TestCases::default()
1712 .with_file_version(LanceFileVersion::V2_1)
1713 .with_page_sizes(vec![100])
1714 .with_range(800..900);
1715 check_round_trip_encoding_of_data(
1716 vec![Arc::new(list_array_1), Arc::new(list_array_2)],
1717 &test_cases,
1718 metadata,
1719 )
1720 .await;
1721 }
1722
1723 #[test_log::test(tokio::test)]
1724 async fn test_simple_large_list() {
1725 let items_builder = Int32Builder::new();
1726 let mut list_builder = LargeListBuilder::new(items_builder);
1727 list_builder.append_value([Some(1), Some(2), Some(3)]);
1728 list_builder.append_value([Some(4), Some(5)]);
1729 list_builder.append_null();
1730 list_builder.append_value([Some(6), Some(7), Some(8)]);
1731 let list_array = list_builder.finish();
1732
1733 let test_cases = TestCases::default()
1734 .with_range(0..2)
1735 .with_range(0..3)
1736 .with_range(1..3)
1737 .with_indices(vec![1, 3]);
1738 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1739 .await;
1740 }
1741
1742 #[test_log::test(tokio::test)]
1743 async fn test_empty_lists() {
1744 let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
1747 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
1749 let items_builder = Int32Builder::new();
1750 let mut list_builder = ListBuilder::new(items_builder);
1751 for idx in order {
1752 list_builder.append_value(values[idx].clone());
1753 }
1754 let list_array = Arc::new(list_builder.finish());
1755 let test_cases = TestCases::default()
1756 .with_indices(vec![1])
1757 .with_indices(vec![0])
1758 .with_indices(vec![2])
1759 .with_indices(vec![0, 1]);
1760 check_round_trip_encoding_of_data(
1761 vec![list_array.clone()],
1762 &test_cases,
1763 HashMap::new(),
1764 )
1765 .await;
1766 let test_cases = test_cases.with_batch_size(1);
1767 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1768 }
1769
1770 let items_builder = Int32Builder::new();
1775 let mut list_builder = ListBuilder::new(items_builder);
1776 list_builder.append(true);
1777 list_builder.append_null();
1778 list_builder.append(true);
1779 let list_array = Arc::new(list_builder.finish());
1780
1781 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1782 check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1783 .await;
1784 let test_cases = test_cases.with_batch_size(1);
1785 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1786
1787 let items_builder = StringBuilder::new();
1792 let mut list_builder = ListBuilder::new(items_builder);
1793 list_builder.append(true);
1794 list_builder.append_null();
1795 list_builder.append(true);
1796 let list_array = Arc::new(list_builder.finish());
1797
1798 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1799 check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
1800 .await;
1801 let test_cases = test_cases.with_batch_size(1);
1802 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
1803 }
1804
1805 #[test_log::test(tokio::test)]
1806 #[ignore] async fn test_jumbo_list() {
1808 let items = BooleanArray::new_null(1024 * 1024);
1812 let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
1813 let list_arr = Arc::new(ListArray::new(
1814 Arc::new(Field::new("item", DataType::Boolean, true)),
1815 offsets,
1816 Arc::new(items),
1817 None,
1818 )) as ArrayRef;
1819 let arrs = vec![list_arr; 5000];
1820
1821 let test_cases = TestCases::default().without_validation();
1823 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
1824 }
1825}