1use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7 Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
8 cast::AsArray,
9 new_empty_array,
10 types::{Int32Type, Int64Type, UInt64Type},
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{FutureExt, future::BoxFuture};
15use lance_core::{Error, Result, cache::LanceCache};
16use log::trace;
17use tokio::task::JoinHandle;
18
19use crate::{
20 EncodingsIo,
21 buffer::LanceBuffer,
22 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
23 decoder::{
24 DecodeArrayTask, DecodeBatchScheduler, FilterExpression, ListPriorityRange, MessageType,
25 NextDecodeTask, PageEncoding, PriorityRange, ScheduledScanLine, SchedulerContext,
26 },
27 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
28 format::pb,
29 previous::{
30 decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob},
31 encoder::{ArrayEncoder, EncodedArray},
32 encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler},
33 },
34 repdef::RepDefBuilder,
35 utils::accumulation::AccumulationQueue,
36};
37
38#[derive(Debug)]
63struct ListRequest {
64 num_lists: u64,
66 includes_extra_offset: bool,
68 null_offset_adjustment: u64,
70 items_offset: u64,
72}
73
74#[derive(Debug)]
75struct ListRequestsIter {
76 list_requests: VecDeque<ListRequest>,
78 offsets_requests: Vec<Range<u64>>,
79}
80
81impl ListRequestsIter {
82 fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
85 let mut items_offset = 0;
86 let mut offsets_offset = 0;
87 let mut page_infos_iter = page_infos.iter();
88 let mut cur_page_info = page_infos_iter.next().unwrap();
89 let mut list_requests = VecDeque::new();
90 let mut offsets_requests = Vec::new();
91
92 for range in row_ranges {
95 let mut range = range.clone();
96
97 while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
99 trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
100 offsets_offset += cur_page_info.offsets_in_page;
101 items_offset += cur_page_info.num_items_referenced_by_page;
102 cur_page_info = page_infos_iter.next().unwrap();
103 }
104
105 let mut includes_extra_offset = range.start != offsets_offset;
108 if includes_extra_offset {
109 offsets_requests.push(range.start - 1..range.end);
110 } else {
111 offsets_requests.push(range.clone());
112 }
113
114 while !range.is_empty() {
117 let end = offsets_offset + cur_page_info.offsets_in_page;
120 let last = end >= range.end;
121 let end = end.min(range.end);
122 list_requests.push_back(ListRequest {
123 num_lists: end - range.start,
124 includes_extra_offset,
125 null_offset_adjustment: cur_page_info.null_offset_adjustment,
126 items_offset,
127 });
128
129 includes_extra_offset = false;
130 range.start = end;
131 if !last {
134 offsets_offset += cur_page_info.offsets_in_page;
135 items_offset += cur_page_info.num_items_referenced_by_page;
136 cur_page_info = page_infos_iter.next().unwrap();
137 }
138 }
139 }
140 Self {
141 list_requests,
142 offsets_requests,
143 }
144 }
145
146 fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
148 let mut list_requests = Vec::new();
149 while num_offsets > 0 {
150 let req = self.list_requests.front_mut().unwrap();
151 if req.includes_extra_offset {
153 num_offsets -= 1;
154 debug_assert_ne!(num_offsets, 0);
155 }
156 if num_offsets >= req.num_lists {
157 num_offsets -= req.num_lists;
158 list_requests.push(self.list_requests.pop_front().unwrap());
159 } else {
160 let sub_req = ListRequest {
161 num_lists: num_offsets,
162 includes_extra_offset: req.includes_extra_offset,
163 null_offset_adjustment: req.null_offset_adjustment,
164 items_offset: req.items_offset,
165 };
166
167 list_requests.push(sub_req);
168 req.includes_extra_offset = false;
169 req.num_lists -= num_offsets;
170 num_offsets = 0;
171 }
172 }
173 list_requests
174 }
175}
176
177fn decode_offsets(
197 offsets: &dyn Array,
198 list_requests: &[ListRequest],
199 null_offset_adjustment: u64,
200) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
201 let numeric_offsets = offsets.as_primitive::<UInt64Type>();
203 let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
205 let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
206 let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
207 normalized_offsets.push(0);
209 let mut last_normalized_offset = 0;
210 let offsets_values = numeric_offsets.values();
211
212 let mut item_ranges = VecDeque::new();
213 let mut offsets_offset: u32 = 0;
214 debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
216 for req in list_requests {
217 let num_lists = req.num_lists;
219
220 let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
226 if !req.includes_extra_offset {
227 let first_offset_idx = 0_usize;
229 let num_offsets = num_lists as usize;
230 let items_start = 0;
231 let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
232 let items_range = items_start..items_end;
233 (items_range, first_offset_idx, num_offsets)
234 } else {
235 let first_offset_idx = offsets_offset as usize;
238 let num_offsets = num_lists as usize + 1;
239 let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
240 let items_end =
241 offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
242 let items_range = items_start..items_end;
243 (items_range, first_offset_idx, num_offsets)
244 };
245
246 let validity_start = if !req.includes_extra_offset {
259 0
260 } else {
261 offsets_to_norm_start + 1
262 };
263 for off in offsets_values
264 .slice(validity_start, num_lists as usize)
265 .iter()
266 {
267 validity_buffer.append(*off < null_offset_adjustment);
268 }
269
270 if !req.includes_extra_offset {
272 let first_item = offsets_values[0] % null_offset_adjustment;
273 normalized_offsets.push(first_item);
274 last_normalized_offset = first_item;
275 }
276
277 normalized_offsets.extend(
281 offsets_values
282 .slice(offsets_to_norm_start, num_offsets_to_norm)
283 .windows(2)
284 .map(|w| {
285 let start = w[0] % null_offset_adjustment;
286 let end = w[1] % null_offset_adjustment;
287 if end < start {
288 panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
289 }
290 let length = end - start;
291 last_normalized_offset += length;
292 last_normalized_offset
293 }),
294 );
295 trace!(
296 "List offsets range of {} lists maps to item range {:?}",
297 num_lists, items_range
298 );
299 offsets_offset += num_offsets_to_norm as u32;
300 if !items_range.is_empty() {
301 let items_range =
302 items_range.start + req.items_offset..items_range.end + req.items_offset;
303 item_ranges.push_back(items_range);
304 }
305 }
306
307 let validity = validity_buffer.finish();
308 (item_ranges, normalized_offsets, validity)
309}
310
311#[allow(clippy::too_many_arguments)]
318async fn indirect_schedule_task(
319 mut offsets_decoder: Box<dyn LogicalPageDecoder>,
320 list_requests: Vec<ListRequest>,
321 null_offset_adjustment: u64,
322 items_scheduler: Arc<dyn FieldScheduler>,
323 items_type: DataType,
324 io: Arc<dyn EncodingsIo>,
325 cache: Arc<LanceCache>,
326 priority: Box<dyn PriorityRange>,
327) -> Result<IndirectlyLoaded> {
328 let num_offsets = offsets_decoder.num_rows();
329 offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
332 let decode_task = offsets_decoder.drain(num_offsets)?;
333 let offsets = decode_task.task.decode()?;
334
335 let (item_ranges, offsets, validity) =
336 decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
337
338 trace!(
339 "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
340 item_ranges,
341 items_scheduler.num_rows(),
342 priority
343 );
344 let offsets: Arc<[u64]> = offsets.into();
345
346 if item_ranges.is_empty() {
348 debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
349 return Ok(IndirectlyLoaded {
350 root_decoder: None,
351 offsets,
352 validity,
353 });
354 }
355 let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
356 let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
357
358 let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
360 let indirect_root_scheduler =
361 SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
362 #[allow(deprecated)]
363 let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
364 Arc::new(indirect_root_scheduler),
365 root_fields.clone(),
366 cache,
367 );
368 let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
369
370 let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
371
372 let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
373 &item_ranges,
374 &FilterExpression::no_filter(),
376 io,
377 Some(priority),
378 )?;
379
380 for message in indirect_messages {
381 for decoder in message.decoders {
382 let decoder = decoder.into_legacy();
383 if !decoder.path.is_empty() {
384 root_decoder.accept_child(decoder)?;
385 }
386 }
387 }
388
389 Ok(IndirectlyLoaded {
390 offsets,
391 validity,
392 root_decoder: Some(root_decoder),
393 })
394}
395
396#[derive(Debug)]
397struct ListFieldSchedulingJob<'a> {
398 scheduler: &'a ListFieldScheduler,
399 offsets: Box<dyn SchedulingJob + 'a>,
400 num_rows: u64,
401 list_requests_iter: ListRequestsIter,
402}
403
404impl<'a> ListFieldSchedulingJob<'a> {
405 fn try_new(
406 scheduler: &'a ListFieldScheduler,
407 ranges: &[Range<u64>],
408 filter: &FilterExpression,
409 ) -> Result<Self> {
410 let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
411 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
412 let offsets = scheduler
413 .offsets_scheduler
414 .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
415 Ok(Self {
416 scheduler,
417 offsets,
418 list_requests_iter,
419 num_rows,
420 })
421 }
422}
423
424impl SchedulingJob for ListFieldSchedulingJob<'_> {
425 fn schedule_next(
426 &mut self,
427 context: &mut SchedulerContext,
428 priority: &dyn PriorityRange,
429 ) -> Result<ScheduledScanLine> {
430 let next_offsets = self.offsets.schedule_next(context, priority)?;
431 let offsets_scheduled = next_offsets.rows_scheduled;
432 let list_reqs = self.list_requests_iter.next(offsets_scheduled);
433 trace!(
434 "Scheduled {} offsets which maps to list requests: {:?}",
435 offsets_scheduled, list_reqs
436 );
437 let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
438 debug_assert!(
441 list_reqs
442 .iter()
443 .all(|req| req.null_offset_adjustment == null_offset_adjustment)
444 );
445 let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
446 let next_offsets_decoder = next_offsets
448 .decoders
449 .into_iter()
450 .next()
451 .unwrap()
452 .into_legacy()
453 .decoder;
454
455 let items_scheduler = self.scheduler.items_scheduler.clone();
456 let items_type = self.scheduler.items_field.data_type().clone();
457 let io = context.io().clone();
458 let cache = context.cache().clone();
459
460 let indirect_fut = tokio::spawn(indirect_schedule_task(
462 next_offsets_decoder,
463 list_reqs,
464 null_offset_adjustment,
465 items_scheduler,
466 items_type,
467 io,
468 cache,
469 priority.box_clone(),
470 ));
471
472 let decoder = Box::new(ListPageDecoder {
474 offsets: Arc::new([]),
475 validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
476 item_decoder: None,
477 rows_drained: 0,
478 rows_loaded: 0,
479 items_field: self.scheduler.items_field.clone(),
480 num_rows,
481 unloaded: Some(indirect_fut),
482 offset_type: self.scheduler.offset_type.clone(),
483 data_type: self.scheduler.list_type.clone(),
484 });
485 #[allow(deprecated)]
486 let decoder = context.locate_decoder(decoder);
487 Ok(ScheduledScanLine {
488 decoders: vec![MessageType::DecoderReady(decoder)],
489 rows_scheduled: num_rows,
490 })
491 }
492
493 fn num_rows(&self) -> u64 {
494 self.num_rows
495 }
496}
497
498#[derive(Debug)]
513pub struct ListFieldScheduler {
514 offsets_scheduler: Arc<dyn FieldScheduler>,
515 items_scheduler: Arc<dyn FieldScheduler>,
516 items_field: Arc<Field>,
517 offset_type: DataType,
518 list_type: DataType,
519 offset_page_info: Vec<OffsetPageInfo>,
520}
521
522#[derive(Debug)]
526pub struct OffsetPageInfo {
527 pub offsets_in_page: u64,
528 pub null_offset_adjustment: u64,
529 pub num_items_referenced_by_page: u64,
530}
531
532impl ListFieldScheduler {
533 pub fn new(
535 offsets_scheduler: Arc<dyn FieldScheduler>,
536 items_scheduler: Arc<dyn FieldScheduler>,
537 items_field: Arc<Field>,
538 offset_type: DataType,
540 offset_page_info: Vec<OffsetPageInfo>,
541 ) -> Self {
542 let list_type = match &offset_type {
543 DataType::Int32 => DataType::List(items_field.clone()),
544 DataType::Int64 => DataType::LargeList(items_field.clone()),
545 _ => panic!("Unexpected offset type {}", offset_type),
546 };
547 Self {
548 offsets_scheduler,
549 items_scheduler,
550 items_field,
551 offset_type,
552 offset_page_info,
553 list_type,
554 }
555 }
556}
557
558impl FieldScheduler for ListFieldScheduler {
559 fn schedule_ranges<'a>(
560 &'a self,
561 ranges: &[Range<u64>],
562 filter: &FilterExpression,
563 ) -> Result<Box<dyn SchedulingJob + 'a>> {
564 Ok(Box::new(ListFieldSchedulingJob::try_new(
565 self, ranges, filter,
566 )?))
567 }
568
569 fn num_rows(&self) -> u64 {
570 self.offsets_scheduler.num_rows()
571 }
572
573 fn initialize<'a>(
574 &'a self,
575 _filter: &'a FilterExpression,
576 _context: &'a SchedulerContext,
577 ) -> BoxFuture<'a, Result<()>> {
578 std::future::ready(Ok(())).boxed()
580 }
581}
582
583#[derive(Debug)]
594struct ListPageDecoder {
595 unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
596 offsets: Arc<[u64]>,
598 validity: BooleanBuffer,
599 item_decoder: Option<SimpleStructDecoder>,
600 num_rows: u64,
601 rows_drained: u64,
602 rows_loaded: u64,
603 items_field: Arc<Field>,
604 offset_type: DataType,
605 data_type: DataType,
606}
607
608struct ListDecodeTask {
609 offsets: Vec<u64>,
610 validity: BooleanBuffer,
611 items: Option<Box<dyn DecodeArrayTask>>,
613 items_field: Arc<Field>,
614 offset_type: DataType,
615}
616
617impl DecodeArrayTask for ListDecodeTask {
618 fn decode(self: Box<Self>) -> Result<ArrayRef> {
619 let items = self
620 .items
621 .map(|items| {
622 let wrapped_items = items.decode()?;
625 Result::Ok(wrapped_items.as_struct().column(0).clone())
626 })
627 .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
628
629 let offsets = UInt64Array::from(self.offsets);
635 let validity = NullBuffer::new(self.validity);
636 let validity = if validity.null_count() == 0 {
637 None
638 } else {
639 Some(validity)
640 };
641 let min_offset = UInt64Array::new_scalar(offsets.value(0));
642 let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
643 match &self.offset_type {
644 DataType::Int32 => {
645 let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
646 let offsets_i32 = offsets.as_primitive::<Int32Type>();
647 let offsets = OffsetBuffer::new(offsets_i32.values().clone());
648
649 Ok(Arc::new(ListArray::try_new(
650 self.items_field.clone(),
651 offsets,
652 items,
653 validity,
654 )?))
655 }
656 DataType::Int64 => {
657 let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
658 let offsets_i64 = offsets.as_primitive::<Int64Type>();
659 let offsets = OffsetBuffer::new(offsets_i64.values().clone());
660
661 Ok(Arc::new(LargeListArray::try_new(
662 self.items_field.clone(),
663 offsets,
664 items,
665 validity,
666 )?))
667 }
668 _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
669 }
670 }
671}
672
673fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
678 let mut result = match to_search.binary_search(&target) {
679 Ok(idx) => idx,
680 Err(idx) => idx - 1,
681 };
682 while result < (to_search.len() - 1) && to_search[result + 1] == target {
683 result += 1;
684 }
685 result as u64
686}
687
688impl LogicalPageDecoder for ListPageDecoder {
689 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
690 async move {
691 if self.unloaded.is_some() {
694 trace!("List scheduler needs to wait for indirect I/O to complete");
695 let indirectly_loaded = self.unloaded.take().unwrap().await;
696 if let Err(err) = indirectly_loaded {
697 match err.try_into_panic() {
698 Ok(err) => std::panic::resume_unwind(err),
699 Err(err) => panic!("{:?}", err),
700 };
701 }
702 let indirectly_loaded = indirectly_loaded.unwrap()?;
703
704 self.offsets = indirectly_loaded.offsets;
705 self.validity = indirectly_loaded.validity;
706 self.item_decoder = indirectly_loaded.root_decoder;
707 }
708 if self.rows_loaded > loaded_need {
709 return Ok(());
710 }
711
712 let boundary = loaded_need as usize;
713 debug_assert!(boundary < self.num_rows as usize);
714 let items_needed = self.offsets[boundary + 1].saturating_sub(1);
717 trace!(
718 "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
719 loaded_need,
720 self.rows_loaded,
721 self.num_rows,
722 items_needed,
723 );
724
725 let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
726 item_decoder.wait_for_loaded(items_needed).await?;
727 item_decoder.rows_loaded()
728 } else {
729 0
730 };
731
732 self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
733 trace!("List decoder now has {} loaded rows", self.rows_loaded);
734
735 Ok(())
736 }
737 .boxed()
738 }
739
740 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
741 let mut actual_num_rows = num_rows;
743 let item_start = self.offsets[self.rows_drained as usize];
744 if self.offset_type != DataType::Int64 {
745 while actual_num_rows > 0 {
748 let num_items =
749 self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
750 if num_items <= i32::MAX as u64 {
751 break;
752 }
753 actual_num_rows -= 1;
756 }
757 }
758 if actual_num_rows < num_rows {
759 return Err(Error::not_supported_source(format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into()));
764 }
765 let offsets = self.offsets
766 [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
767 .to_vec();
768 let validity = self
769 .validity
770 .slice(self.rows_drained as usize, actual_num_rows as usize);
771 let start = offsets[0];
772 let end = offsets[offsets.len() - 1];
773 let num_items_to_drain = end - start;
774
775 let item_decode = if num_items_to_drain == 0 {
776 None
777 } else {
778 self.item_decoder
779 .as_mut()
780 .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
781 .transpose()?
782 };
783
784 self.rows_drained += num_rows;
785 Ok(NextDecodeTask {
786 num_rows,
787 task: Box::new(ListDecodeTask {
788 offsets,
789 validity,
790 items_field: self.items_field.clone(),
791 items: item_decode,
792 offset_type: self.offset_type.clone(),
793 }) as Box<dyn DecodeArrayTask>,
794 })
795 }
796
797 fn num_rows(&self) -> u64 {
798 self.num_rows
799 }
800
801 fn rows_loaded(&self) -> u64 {
802 self.rows_loaded
803 }
804
805 fn rows_drained(&self) -> u64 {
806 self.rows_drained
807 }
808
809 fn data_type(&self) -> &DataType {
810 &self.data_type
811 }
812}
813
814struct IndirectlyLoaded {
815 offsets: Arc<[u64]>,
816 validity: BooleanBuffer,
817 root_decoder: Option<SimpleStructDecoder>,
818}
819
820impl std::fmt::Debug for IndirectlyLoaded {
821 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822 f.debug_struct("IndirectlyLoaded")
823 .field("offsets", &self.offsets)
824 .field("validity", &self.validity)
825 .finish()
826 }
827}
828
829#[derive(Debug)]
859struct ListOffsetsEncoder {
860 accumulation_queue: AccumulationQueue,
862 inner_encoder: Arc<dyn ArrayEncoder>,
864 column_index: u32,
865}
866
867impl ListOffsetsEncoder {
868 fn new(
869 cache_bytes: u64,
870 keep_original_array: bool,
871 column_index: u32,
872 inner_encoder: Arc<dyn ArrayEncoder>,
873 ) -> Self {
874 Self {
875 accumulation_queue: AccumulationQueue::new(
876 cache_bytes,
877 column_index,
878 keep_original_array,
879 ),
880 inner_encoder,
881 column_index,
882 }
883 }
884
885 fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
887 match list_arr.data_type() {
888 DataType::List(_) => {
889 let offsets = list_arr.as_list::<i32>().offsets().clone();
890 Arc::new(Int32Array::new(offsets.into_inner(), None))
891 }
892 DataType::LargeList(_) => {
893 let offsets = list_arr.as_list::<i64>().offsets().clone();
894 Arc::new(Int64Array::new(offsets.into_inner(), None))
895 }
896 _ => panic!(),
897 }
898 }
899
900 fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
903 if let Some(validity) = list_arr.nulls() {
904 Arc::new(BooleanArray::new(validity.inner().clone(), None))
905 } else {
906 new_empty_array(&DataType::Boolean)
909 }
910 }
911
912 fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
913 let inner_encoder = self.inner_encoder.clone();
914 let column_idx = self.column_index;
915 let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
918 let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
919
920 tokio::task::spawn(async move {
921 let num_rows =
922 offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
923 let num_rows = num_rows as u64;
924 let mut buffer_index = 0;
925 let array = Self::do_encode(
926 offset_arrays,
927 validity_arrays,
928 &mut buffer_index,
929 num_rows,
930 inner_encoder,
931 )?;
932 let (data, description) = array.into_buffers();
933 Ok(EncodedPage {
934 data,
935 description: PageEncoding::Legacy(description),
936 num_rows,
937 column_idx,
938 row_number: 0, })
940 })
941 .map(|res_res| res_res.unwrap())
942 .boxed()
943 }
944
945 fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
946 let offsets = Self::extract_offsets(list_arr);
947 let validity = Self::extract_validity(list_arr);
948 let num_rows = offsets.len() as u64;
949 if let Some(mut arrays) = self
952 .accumulation_queue
953 .insert(offsets, 0, num_rows)
954 {
955 arrays.0.push(validity);
956 Some(self.make_encode_task(arrays.0))
957 } else if let Some(arrays) = self
958 .accumulation_queue
959 .insert(validity, 0, num_rows)
960 {
961 Some(self.make_encode_task(arrays.0))
962 } else {
963 None
964 }
965 }
966
967 fn flush(&mut self) -> Option<EncodeTask> {
968 if let Some(arrays) = self.accumulation_queue.flush() {
969 Some(self.make_encode_task(arrays.0))
970 } else {
971 None
972 }
973 }
974
975 fn get_offset_span(array: &dyn Array) -> u64 {
978 match array.data_type() {
979 DataType::Int32 => {
980 let arr_i32 = array.as_primitive::<Int32Type>();
981 (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
982 }
983 DataType::Int64 => {
984 let arr_i64 = array.as_primitive::<Int64Type>();
985 (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
986 }
987 _ => panic!(),
988 }
989 }
990
991 fn extend_offsets_vec_u64(
994 dest: &mut Vec<u64>,
995 offsets: &dyn Array,
996 validity: Option<&BooleanArray>,
997 base: u64,
999 null_offset_adjustment: u64,
1000 ) {
1001 match offsets.data_type() {
1002 DataType::Int32 => {
1003 let offsets_i32 = offsets.as_primitive::<Int32Type>();
1004 let start = offsets_i32.value(0) as u64;
1005 let modifier = base as i64 - start as i64;
1009 if let Some(validity) = validity {
1010 dest.extend(
1011 offsets_i32
1012 .values()
1013 .iter()
1014 .skip(1)
1015 .zip(validity.values().iter())
1016 .map(|(&off, valid)| {
1017 (off as i64 + modifier) as u64
1018 + (!valid as u64 * null_offset_adjustment)
1019 }),
1020 );
1021 } else {
1022 dest.extend(
1023 offsets_i32
1024 .values()
1025 .iter()
1026 .skip(1)
1027 .map(|&v| (v as i64 + modifier) as u64),
1029 );
1030 }
1031 }
1032 DataType::Int64 => {
1033 let offsets_i64 = offsets.as_primitive::<Int64Type>();
1034 let start = offsets_i64.value(0) as u64;
1035 let modifier = base as i64 - start as i64;
1039 if let Some(validity) = validity {
1040 dest.extend(
1041 offsets_i64
1042 .values()
1043 .iter()
1044 .skip(1)
1045 .zip(validity.values().iter())
1046 .map(|(&off, valid)| {
1047 (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1048 }),
1049 )
1050 } else {
1051 dest.extend(
1052 offsets_i64
1053 .values()
1054 .iter()
1055 .skip(1)
1056 .map(|&v| (v + modifier) as u64),
1057 );
1058 }
1059 }
1060 _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1061 }
1062 }
1063
1064 fn do_encode_u64(
1065 offset_arrays: Vec<ArrayRef>,
1066 validity: Vec<Option<&BooleanArray>>,
1067 num_offsets: u64,
1068 null_offset_adjustment: u64,
1069 buffer_index: &mut u32,
1070 inner_encoder: Arc<dyn ArrayEncoder>,
1071 ) -> Result<EncodedArray> {
1072 let mut offsets = Vec::with_capacity(num_offsets as usize);
1073 for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1074 let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1075 Self::extend_offsets_vec_u64(
1076 &mut offsets,
1077 &offsets_arr,
1078 validity_arr,
1079 last_prev_offset,
1080 null_offset_adjustment,
1081 );
1082 }
1083 let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1084 bits_per_value: 64,
1085 data: LanceBuffer::reinterpret_vec(offsets),
1086 num_values: num_offsets,
1087 block_info: BlockInfo::new(),
1088 });
1089 inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1090 }
1091
1092 fn do_encode(
1093 offset_arrays: Vec<ArrayRef>,
1094 validity_arrays: Vec<ArrayRef>,
1095 buffer_index: &mut u32,
1096 num_offsets: u64,
1097 inner_encoder: Arc<dyn ArrayEncoder>,
1098 ) -> Result<EncodedArray> {
1099 let validity_arrays = validity_arrays
1100 .iter()
1101 .map(|v| {
1102 if v.is_empty() {
1103 None
1104 } else {
1105 Some(v.as_boolean())
1106 }
1107 })
1108 .collect::<Vec<_>>();
1109 debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1110 let total_span = offset_arrays
1111 .iter()
1112 .map(|arr| Self::get_offset_span(arr.as_ref()))
1113 .sum::<u64>();
1114 let null_offset_adjustment = total_span + 1;
1116 let encoded_offsets = Self::do_encode_u64(
1117 offset_arrays,
1118 validity_arrays,
1119 num_offsets,
1120 null_offset_adjustment,
1121 buffer_index,
1122 inner_encoder,
1123 )?;
1124 Ok(EncodedArray {
1125 data: encoded_offsets.data,
1126 encoding: pb::ArrayEncoding {
1127 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1128 pb::List {
1129 offsets: Some(Box::new(encoded_offsets.encoding)),
1130 null_offset_adjustment,
1131 num_items: total_span,
1132 },
1133 ))),
1134 },
1135 })
1136 }
1137}
1138
1139pub struct ListFieldEncoder {
1140 offsets_encoder: ListOffsetsEncoder,
1141 items_encoder: Box<dyn FieldEncoder>,
1142}
1143
1144impl ListFieldEncoder {
1145 pub fn new(
1146 items_encoder: Box<dyn FieldEncoder>,
1147 inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1148 cache_bytes_per_columns: u64,
1149 keep_original_array: bool,
1150 column_index: u32,
1151 ) -> Self {
1152 Self {
1153 offsets_encoder: ListOffsetsEncoder::new(
1154 cache_bytes_per_columns,
1155 keep_original_array,
1156 column_index,
1157 inner_offsets_encoder,
1158 ),
1159 items_encoder,
1160 }
1161 }
1162
1163 fn combine_tasks(
1164 offsets_tasks: Vec<EncodeTask>,
1165 item_tasks: Vec<EncodeTask>,
1166 ) -> Result<Vec<EncodeTask>> {
1167 let mut all_tasks = offsets_tasks;
1168 let item_tasks = item_tasks;
1169 all_tasks.extend(item_tasks);
1170 Ok(all_tasks)
1171 }
1172}
1173
1174impl FieldEncoder for ListFieldEncoder {
1175 fn maybe_encode(
1176 &mut self,
1177 array: ArrayRef,
1178 external_buffers: &mut OutOfLineBuffers,
1179 repdef: RepDefBuilder,
1180 row_number: u64,
1181 num_rows: u64,
1182 ) -> Result<Vec<EncodeTask>> {
1183 let items = match array.data_type() {
1187 DataType::List(_) => {
1188 let list_arr = array.as_list::<i32>();
1189 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1190 let items_end =
1191 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1192 list_arr
1193 .values()
1194 .slice(items_start, items_end - items_start)
1195 }
1196 DataType::LargeList(_) => {
1197 let list_arr = array.as_list::<i64>();
1198 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1199 let items_end =
1200 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1201 list_arr
1202 .values()
1203 .slice(items_start, items_end - items_start)
1204 }
1205 _ => panic!(),
1206 };
1207 let offsets_tasks = self
1208 .offsets_encoder
1209 .maybe_encode_offsets_and_validity(array.as_ref())
1210 .map(|task| vec![task])
1211 .unwrap_or_default();
1212 let mut item_tasks = self.items_encoder.maybe_encode(
1213 items,
1214 external_buffers,
1215 repdef,
1216 row_number,
1217 num_rows,
1218 )?;
1219 if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1220 item_tasks = self.items_encoder.flush(external_buffers)?;
1226 }
1227 Self::combine_tasks(offsets_tasks, item_tasks)
1228 }
1229
1230 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1231 let offsets_tasks = self
1232 .offsets_encoder
1233 .flush()
1234 .map(|task| vec![task])
1235 .unwrap_or_default();
1236 let item_tasks = self.items_encoder.flush(external_buffers)?;
1237 Self::combine_tasks(offsets_tasks, item_tasks)
1238 }
1239
1240 fn num_columns(&self) -> u32 {
1241 self.items_encoder.num_columns() + 1
1242 }
1243
1244 fn finish(
1245 &mut self,
1246 external_buffers: &mut OutOfLineBuffers,
1247 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1248 let inner_columns = self.items_encoder.finish(external_buffers);
1249 async move {
1250 let mut columns = vec![EncodedColumn::default()];
1251 let inner_columns = inner_columns.await?;
1252 columns.extend(inner_columns);
1253 Ok(columns)
1254 }
1255 .boxed()
1256 }
1257}