1use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7 cast::AsArray,
8 new_empty_array,
9 types::{Int32Type, Int64Type, UInt64Type},
10 Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_core::{cache::LanceCache, Error, Result};
16use log::trace;
17use snafu::location;
18use tokio::task::JoinHandle;
19
20use crate::{
21 buffer::LanceBuffer,
22 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
23 decoder::{
24 DecodeArrayTask, DecodeBatchScheduler, FilterExpression, ListPriorityRange, MessageType,
25 NextDecodeTask, PageEncoding, PriorityRange, ScheduledScanLine, SchedulerContext,
26 },
27 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
28 format::pb,
29 previous::{
30 decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob},
31 encoder::{ArrayEncoder, EncodedArray},
32 encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler},
33 },
34 repdef::RepDefBuilder,
35 utils::accumulation::AccumulationQueue,
36 EncodingsIo,
37};
38
39#[derive(Debug)]
64struct ListRequest {
65 num_lists: u64,
67 includes_extra_offset: bool,
69 null_offset_adjustment: u64,
71 items_offset: u64,
73}
74
75#[derive(Debug)]
76struct ListRequestsIter {
77 list_requests: VecDeque<ListRequest>,
79 offsets_requests: Vec<Range<u64>>,
80}
81
82impl ListRequestsIter {
83 fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
86 let mut items_offset = 0;
87 let mut offsets_offset = 0;
88 let mut page_infos_iter = page_infos.iter();
89 let mut cur_page_info = page_infos_iter.next().unwrap();
90 let mut list_requests = VecDeque::new();
91 let mut offsets_requests = Vec::new();
92
93 for range in row_ranges {
96 let mut range = range.clone();
97
98 while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
100 trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
101 offsets_offset += cur_page_info.offsets_in_page;
102 items_offset += cur_page_info.num_items_referenced_by_page;
103 cur_page_info = page_infos_iter.next().unwrap();
104 }
105
106 let mut includes_extra_offset = range.start != offsets_offset;
109 if includes_extra_offset {
110 offsets_requests.push(range.start - 1..range.end);
111 } else {
112 offsets_requests.push(range.clone());
113 }
114
115 while !range.is_empty() {
118 let end = offsets_offset + cur_page_info.offsets_in_page;
121 let last = end >= range.end;
122 let end = end.min(range.end);
123 list_requests.push_back(ListRequest {
124 num_lists: end - range.start,
125 includes_extra_offset,
126 null_offset_adjustment: cur_page_info.null_offset_adjustment,
127 items_offset,
128 });
129
130 includes_extra_offset = false;
131 range.start = end;
132 if !last {
135 offsets_offset += cur_page_info.offsets_in_page;
136 items_offset += cur_page_info.num_items_referenced_by_page;
137 cur_page_info = page_infos_iter.next().unwrap();
138 }
139 }
140 }
141 Self {
142 list_requests,
143 offsets_requests,
144 }
145 }
146
147 fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
149 let mut list_requests = Vec::new();
150 while num_offsets > 0 {
151 let req = self.list_requests.front_mut().unwrap();
152 if req.includes_extra_offset {
154 num_offsets -= 1;
155 debug_assert_ne!(num_offsets, 0);
156 }
157 if num_offsets >= req.num_lists {
158 num_offsets -= req.num_lists;
159 list_requests.push(self.list_requests.pop_front().unwrap());
160 } else {
161 let sub_req = ListRequest {
162 num_lists: num_offsets,
163 includes_extra_offset: req.includes_extra_offset,
164 null_offset_adjustment: req.null_offset_adjustment,
165 items_offset: req.items_offset,
166 };
167
168 list_requests.push(sub_req);
169 req.includes_extra_offset = false;
170 req.num_lists -= num_offsets;
171 num_offsets = 0;
172 }
173 }
174 list_requests
175 }
176}
177
178fn decode_offsets(
198 offsets: &dyn Array,
199 list_requests: &[ListRequest],
200 null_offset_adjustment: u64,
201) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
202 let numeric_offsets = offsets.as_primitive::<UInt64Type>();
204 let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
206 let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
207 let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
208 normalized_offsets.push(0);
210 let mut last_normalized_offset = 0;
211 let offsets_values = numeric_offsets.values();
212
213 let mut item_ranges = VecDeque::new();
214 let mut offsets_offset: u32 = 0;
215 debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
217 for req in list_requests {
218 let num_lists = req.num_lists;
220
221 let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
227 if !req.includes_extra_offset {
228 let first_offset_idx = 0_usize;
230 let num_offsets = num_lists as usize;
231 let items_start = 0;
232 let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
233 let items_range = items_start..items_end;
234 (items_range, first_offset_idx, num_offsets)
235 } else {
236 let first_offset_idx = offsets_offset as usize;
239 let num_offsets = num_lists as usize + 1;
240 let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
241 let items_end =
242 offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
243 let items_range = items_start..items_end;
244 (items_range, first_offset_idx, num_offsets)
245 };
246
247 let validity_start = if !req.includes_extra_offset {
260 0
261 } else {
262 offsets_to_norm_start + 1
263 };
264 for off in offsets_values
265 .slice(validity_start, num_lists as usize)
266 .iter()
267 {
268 validity_buffer.append(*off < null_offset_adjustment);
269 }
270
271 if !req.includes_extra_offset {
273 let first_item = offsets_values[0] % null_offset_adjustment;
274 normalized_offsets.push(first_item);
275 last_normalized_offset = first_item;
276 }
277
278 normalized_offsets.extend(
282 offsets_values
283 .slice(offsets_to_norm_start, num_offsets_to_norm)
284 .windows(2)
285 .map(|w| {
286 let start = w[0] % null_offset_adjustment;
287 let end = w[1] % null_offset_adjustment;
288 if end < start {
289 panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
290 }
291 let length = end - start;
292 last_normalized_offset += length;
293 last_normalized_offset
294 }),
295 );
296 trace!(
297 "List offsets range of {} lists maps to item range {:?}",
298 num_lists,
299 items_range
300 );
301 offsets_offset += num_offsets_to_norm as u32;
302 if !items_range.is_empty() {
303 let items_range =
304 items_range.start + req.items_offset..items_range.end + req.items_offset;
305 item_ranges.push_back(items_range);
306 }
307 }
308
309 let validity = validity_buffer.finish();
310 (item_ranges, normalized_offsets, validity)
311}
312
313#[allow(clippy::too_many_arguments)]
320async fn indirect_schedule_task(
321 mut offsets_decoder: Box<dyn LogicalPageDecoder>,
322 list_requests: Vec<ListRequest>,
323 null_offset_adjustment: u64,
324 items_scheduler: Arc<dyn FieldScheduler>,
325 items_type: DataType,
326 io: Arc<dyn EncodingsIo>,
327 cache: Arc<LanceCache>,
328 priority: Box<dyn PriorityRange>,
329) -> Result<IndirectlyLoaded> {
330 let num_offsets = offsets_decoder.num_rows();
331 offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
334 let decode_task = offsets_decoder.drain(num_offsets)?;
335 let offsets = decode_task.task.decode()?;
336
337 let (item_ranges, offsets, validity) =
338 decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
339
340 trace!(
341 "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
342 item_ranges,
343 items_scheduler.num_rows(),
344 priority
345 );
346 let offsets: Arc<[u64]> = offsets.into();
347
348 if item_ranges.is_empty() {
350 debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
351 return Ok(IndirectlyLoaded {
352 root_decoder: None,
353 offsets,
354 validity,
355 });
356 }
357 let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
358 let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
359
360 let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
362 let indirect_root_scheduler =
363 SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
364 #[allow(deprecated)]
365 let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
366 Arc::new(indirect_root_scheduler),
367 root_fields.clone(),
368 cache,
369 );
370 let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
371
372 let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
373
374 let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
375 &item_ranges,
376 &FilterExpression::no_filter(),
378 io,
379 Some(priority),
380 )?;
381
382 for message in indirect_messages {
383 for decoder in message.decoders {
384 let decoder = decoder.into_legacy();
385 if !decoder.path.is_empty() {
386 root_decoder.accept_child(decoder)?;
387 }
388 }
389 }
390
391 Ok(IndirectlyLoaded {
392 offsets,
393 validity,
394 root_decoder: Some(root_decoder),
395 })
396}
397
398#[derive(Debug)]
399struct ListFieldSchedulingJob<'a> {
400 scheduler: &'a ListFieldScheduler,
401 offsets: Box<dyn SchedulingJob + 'a>,
402 num_rows: u64,
403 list_requests_iter: ListRequestsIter,
404}
405
406impl<'a> ListFieldSchedulingJob<'a> {
407 fn try_new(
408 scheduler: &'a ListFieldScheduler,
409 ranges: &[Range<u64>],
410 filter: &FilterExpression,
411 ) -> Result<Self> {
412 let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
413 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
414 let offsets = scheduler
415 .offsets_scheduler
416 .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
417 Ok(Self {
418 scheduler,
419 offsets,
420 list_requests_iter,
421 num_rows,
422 })
423 }
424}
425
426impl SchedulingJob for ListFieldSchedulingJob<'_> {
427 fn schedule_next(
428 &mut self,
429 context: &mut SchedulerContext,
430 priority: &dyn PriorityRange,
431 ) -> Result<ScheduledScanLine> {
432 let next_offsets = self.offsets.schedule_next(context, priority)?;
433 let offsets_scheduled = next_offsets.rows_scheduled;
434 let list_reqs = self.list_requests_iter.next(offsets_scheduled);
435 trace!(
436 "Scheduled {} offsets which maps to list requests: {:?}",
437 offsets_scheduled,
438 list_reqs
439 );
440 let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
441 debug_assert!(list_reqs
444 .iter()
445 .all(|req| req.null_offset_adjustment == null_offset_adjustment));
446 let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
447 let next_offsets_decoder = next_offsets
449 .decoders
450 .into_iter()
451 .next()
452 .unwrap()
453 .into_legacy()
454 .decoder;
455
456 let items_scheduler = self.scheduler.items_scheduler.clone();
457 let items_type = self.scheduler.items_field.data_type().clone();
458 let io = context.io().clone();
459 let cache = context.cache().clone();
460
461 let indirect_fut = tokio::spawn(indirect_schedule_task(
463 next_offsets_decoder,
464 list_reqs,
465 null_offset_adjustment,
466 items_scheduler,
467 items_type,
468 io,
469 cache,
470 priority.box_clone(),
471 ));
472
473 let decoder = Box::new(ListPageDecoder {
475 offsets: Arc::new([]),
476 validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
477 item_decoder: None,
478 rows_drained: 0,
479 rows_loaded: 0,
480 items_field: self.scheduler.items_field.clone(),
481 num_rows,
482 unloaded: Some(indirect_fut),
483 offset_type: self.scheduler.offset_type.clone(),
484 data_type: self.scheduler.list_type.clone(),
485 });
486 #[allow(deprecated)]
487 let decoder = context.locate_decoder(decoder);
488 Ok(ScheduledScanLine {
489 decoders: vec![MessageType::DecoderReady(decoder)],
490 rows_scheduled: num_rows,
491 })
492 }
493
494 fn num_rows(&self) -> u64 {
495 self.num_rows
496 }
497}
498
499#[derive(Debug)]
514pub struct ListFieldScheduler {
515 offsets_scheduler: Arc<dyn FieldScheduler>,
516 items_scheduler: Arc<dyn FieldScheduler>,
517 items_field: Arc<Field>,
518 offset_type: DataType,
519 list_type: DataType,
520 offset_page_info: Vec<OffsetPageInfo>,
521}
522
523#[derive(Debug)]
527pub struct OffsetPageInfo {
528 pub offsets_in_page: u64,
529 pub null_offset_adjustment: u64,
530 pub num_items_referenced_by_page: u64,
531}
532
533impl ListFieldScheduler {
534 pub fn new(
536 offsets_scheduler: Arc<dyn FieldScheduler>,
537 items_scheduler: Arc<dyn FieldScheduler>,
538 items_field: Arc<Field>,
539 offset_type: DataType,
541 offset_page_info: Vec<OffsetPageInfo>,
542 ) -> Self {
543 let list_type = match &offset_type {
544 DataType::Int32 => DataType::List(items_field.clone()),
545 DataType::Int64 => DataType::LargeList(items_field.clone()),
546 _ => panic!("Unexpected offset type {}", offset_type),
547 };
548 Self {
549 offsets_scheduler,
550 items_scheduler,
551 items_field,
552 offset_type,
553 offset_page_info,
554 list_type,
555 }
556 }
557}
558
559impl FieldScheduler for ListFieldScheduler {
560 fn schedule_ranges<'a>(
561 &'a self,
562 ranges: &[Range<u64>],
563 filter: &FilterExpression,
564 ) -> Result<Box<dyn SchedulingJob + 'a>> {
565 Ok(Box::new(ListFieldSchedulingJob::try_new(
566 self, ranges, filter,
567 )?))
568 }
569
570 fn num_rows(&self) -> u64 {
571 self.offsets_scheduler.num_rows()
572 }
573
574 fn initialize<'a>(
575 &'a self,
576 _filter: &'a FilterExpression,
577 _context: &'a SchedulerContext,
578 ) -> BoxFuture<'a, Result<()>> {
579 std::future::ready(Ok(())).boxed()
581 }
582}
583
584#[derive(Debug)]
595struct ListPageDecoder {
596 unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
597 offsets: Arc<[u64]>,
599 validity: BooleanBuffer,
600 item_decoder: Option<SimpleStructDecoder>,
601 num_rows: u64,
602 rows_drained: u64,
603 rows_loaded: u64,
604 items_field: Arc<Field>,
605 offset_type: DataType,
606 data_type: DataType,
607}
608
609struct ListDecodeTask {
610 offsets: Vec<u64>,
611 validity: BooleanBuffer,
612 items: Option<Box<dyn DecodeArrayTask>>,
614 items_field: Arc<Field>,
615 offset_type: DataType,
616}
617
618impl DecodeArrayTask for ListDecodeTask {
619 fn decode(self: Box<Self>) -> Result<ArrayRef> {
620 let items = self
621 .items
622 .map(|items| {
623 let wrapped_items = items.decode()?;
626 Result::Ok(wrapped_items.as_struct().column(0).clone())
627 })
628 .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
629
630 let offsets = UInt64Array::from(self.offsets);
636 let validity = NullBuffer::new(self.validity);
637 let validity = if validity.null_count() == 0 {
638 None
639 } else {
640 Some(validity)
641 };
642 let min_offset = UInt64Array::new_scalar(offsets.value(0));
643 let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
644 match &self.offset_type {
645 DataType::Int32 => {
646 let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
647 let offsets_i32 = offsets.as_primitive::<Int32Type>();
648 let offsets = OffsetBuffer::new(offsets_i32.values().clone());
649
650 Ok(Arc::new(ListArray::try_new(
651 self.items_field.clone(),
652 offsets,
653 items,
654 validity,
655 )?))
656 }
657 DataType::Int64 => {
658 let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
659 let offsets_i64 = offsets.as_primitive::<Int64Type>();
660 let offsets = OffsetBuffer::new(offsets_i64.values().clone());
661
662 Ok(Arc::new(LargeListArray::try_new(
663 self.items_field.clone(),
664 offsets,
665 items,
666 validity,
667 )?))
668 }
669 _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
670 }
671 }
672}
673
674fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
679 let mut result = match to_search.binary_search(&target) {
680 Ok(idx) => idx,
681 Err(idx) => idx - 1,
682 };
683 while result < (to_search.len() - 1) && to_search[result + 1] == target {
684 result += 1;
685 }
686 result as u64
687}
688
689impl LogicalPageDecoder for ListPageDecoder {
690 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
691 async move {
692 if self.unloaded.is_some() {
695 trace!("List scheduler needs to wait for indirect I/O to complete");
696 let indirectly_loaded = self.unloaded.take().unwrap().await;
697 if let Err(err) = indirectly_loaded {
698 match err.try_into_panic() {
699 Ok(err) => std::panic::resume_unwind(err),
700 Err(err) => panic!("{:?}", err),
701 };
702 }
703 let indirectly_loaded = indirectly_loaded.unwrap()?;
704
705 self.offsets = indirectly_loaded.offsets;
706 self.validity = indirectly_loaded.validity;
707 self.item_decoder = indirectly_loaded.root_decoder;
708 }
709 if self.rows_loaded > loaded_need {
710 return Ok(());
711 }
712
713 let boundary = loaded_need as usize;
714 debug_assert!(boundary < self.num_rows as usize);
715 let items_needed = self.offsets[boundary + 1].saturating_sub(1);
718 trace!(
719 "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
720 loaded_need,
721 self.rows_loaded,
722 self.num_rows,
723 items_needed,
724 );
725
726 let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
727 item_decoder.wait_for_loaded(items_needed).await?;
728 item_decoder.rows_loaded()
729 } else {
730 0
731 };
732
733 self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
734 trace!("List decoder now has {} loaded rows", self.rows_loaded);
735
736 Ok(())
737 }
738 .boxed()
739 }
740
741 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
742 let mut actual_num_rows = num_rows;
744 let item_start = self.offsets[self.rows_drained as usize];
745 if self.offset_type != DataType::Int64 {
746 while actual_num_rows > 0 {
749 let num_items =
750 self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
751 if num_items <= i32::MAX as u64 {
752 break;
753 }
754 actual_num_rows -= 1;
757 }
758 }
759 if actual_num_rows < num_rows {
760 return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
765 }
766 let offsets = self.offsets
767 [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
768 .to_vec();
769 let validity = self
770 .validity
771 .slice(self.rows_drained as usize, actual_num_rows as usize);
772 let start = offsets[0];
773 let end = offsets[offsets.len() - 1];
774 let num_items_to_drain = end - start;
775
776 let item_decode = if num_items_to_drain == 0 {
777 None
778 } else {
779 self.item_decoder
780 .as_mut()
781 .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
782 .transpose()?
783 };
784
785 self.rows_drained += num_rows;
786 Ok(NextDecodeTask {
787 num_rows,
788 task: Box::new(ListDecodeTask {
789 offsets,
790 validity,
791 items_field: self.items_field.clone(),
792 items: item_decode,
793 offset_type: self.offset_type.clone(),
794 }) as Box<dyn DecodeArrayTask>,
795 })
796 }
797
798 fn num_rows(&self) -> u64 {
799 self.num_rows
800 }
801
802 fn rows_loaded(&self) -> u64 {
803 self.rows_loaded
804 }
805
806 fn rows_drained(&self) -> u64 {
807 self.rows_drained
808 }
809
810 fn data_type(&self) -> &DataType {
811 &self.data_type
812 }
813}
814
815struct IndirectlyLoaded {
816 offsets: Arc<[u64]>,
817 validity: BooleanBuffer,
818 root_decoder: Option<SimpleStructDecoder>,
819}
820
821impl std::fmt::Debug for IndirectlyLoaded {
822 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
823 f.debug_struct("IndirectlyLoaded")
824 .field("offsets", &self.offsets)
825 .field("validity", &self.validity)
826 .finish()
827 }
828}
829
830#[derive(Debug)]
860struct ListOffsetsEncoder {
861 accumulation_queue: AccumulationQueue,
863 inner_encoder: Arc<dyn ArrayEncoder>,
865 column_index: u32,
866}
867
868impl ListOffsetsEncoder {
869 fn new(
870 cache_bytes: u64,
871 keep_original_array: bool,
872 column_index: u32,
873 inner_encoder: Arc<dyn ArrayEncoder>,
874 ) -> Self {
875 Self {
876 accumulation_queue: AccumulationQueue::new(
877 cache_bytes,
878 column_index,
879 keep_original_array,
880 ),
881 inner_encoder,
882 column_index,
883 }
884 }
885
886 fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
888 match list_arr.data_type() {
889 DataType::List(_) => {
890 let offsets = list_arr.as_list::<i32>().offsets().clone();
891 Arc::new(Int32Array::new(offsets.into_inner(), None))
892 }
893 DataType::LargeList(_) => {
894 let offsets = list_arr.as_list::<i64>().offsets().clone();
895 Arc::new(Int64Array::new(offsets.into_inner(), None))
896 }
897 _ => panic!(),
898 }
899 }
900
901 fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
904 if let Some(validity) = list_arr.nulls() {
905 Arc::new(BooleanArray::new(validity.inner().clone(), None))
906 } else {
907 new_empty_array(&DataType::Boolean)
910 }
911 }
912
913 fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
914 let inner_encoder = self.inner_encoder.clone();
915 let column_idx = self.column_index;
916 let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
919 let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
920
921 tokio::task::spawn(async move {
922 let num_rows =
923 offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
924 let num_rows = num_rows as u64;
925 let mut buffer_index = 0;
926 let array = Self::do_encode(
927 offset_arrays,
928 validity_arrays,
929 &mut buffer_index,
930 num_rows,
931 inner_encoder,
932 )?;
933 let (data, description) = array.into_buffers();
934 Ok(EncodedPage {
935 data,
936 description: PageEncoding::Legacy(description),
937 num_rows,
938 column_idx,
939 row_number: 0, })
941 })
942 .map(|res_res| res_res.unwrap())
943 .boxed()
944 }
945
946 fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
947 let offsets = Self::extract_offsets(list_arr);
948 let validity = Self::extract_validity(list_arr);
949 let num_rows = offsets.len() as u64;
950 if let Some(mut arrays) = self
953 .accumulation_queue
954 .insert(offsets, 0, num_rows)
955 {
956 arrays.0.push(validity);
957 Some(self.make_encode_task(arrays.0))
958 } else if let Some(arrays) = self
959 .accumulation_queue
960 .insert(validity, 0, num_rows)
961 {
962 Some(self.make_encode_task(arrays.0))
963 } else {
964 None
965 }
966 }
967
968 fn flush(&mut self) -> Option<EncodeTask> {
969 if let Some(arrays) = self.accumulation_queue.flush() {
970 Some(self.make_encode_task(arrays.0))
971 } else {
972 None
973 }
974 }
975
976 fn get_offset_span(array: &dyn Array) -> u64 {
979 match array.data_type() {
980 DataType::Int32 => {
981 let arr_i32 = array.as_primitive::<Int32Type>();
982 (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
983 }
984 DataType::Int64 => {
985 let arr_i64 = array.as_primitive::<Int64Type>();
986 (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
987 }
988 _ => panic!(),
989 }
990 }
991
992 fn extend_offsets_vec_u64(
995 dest: &mut Vec<u64>,
996 offsets: &dyn Array,
997 validity: Option<&BooleanArray>,
998 base: u64,
1000 null_offset_adjustment: u64,
1001 ) {
1002 match offsets.data_type() {
1003 DataType::Int32 => {
1004 let offsets_i32 = offsets.as_primitive::<Int32Type>();
1005 let start = offsets_i32.value(0) as u64;
1006 let modifier = base as i64 - start as i64;
1010 if let Some(validity) = validity {
1011 dest.extend(
1012 offsets_i32
1013 .values()
1014 .iter()
1015 .skip(1)
1016 .zip(validity.values().iter())
1017 .map(|(&off, valid)| {
1018 (off as i64 + modifier) as u64
1019 + (!valid as u64 * null_offset_adjustment)
1020 }),
1021 );
1022 } else {
1023 dest.extend(
1024 offsets_i32
1025 .values()
1026 .iter()
1027 .skip(1)
1028 .map(|&v| (v as i64 + modifier) as u64),
1030 );
1031 }
1032 }
1033 DataType::Int64 => {
1034 let offsets_i64 = offsets.as_primitive::<Int64Type>();
1035 let start = offsets_i64.value(0) as u64;
1036 let modifier = base as i64 - start as i64;
1040 if let Some(validity) = validity {
1041 dest.extend(
1042 offsets_i64
1043 .values()
1044 .iter()
1045 .skip(1)
1046 .zip(validity.values().iter())
1047 .map(|(&off, valid)| {
1048 (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1049 }),
1050 )
1051 } else {
1052 dest.extend(
1053 offsets_i64
1054 .values()
1055 .iter()
1056 .skip(1)
1057 .map(|&v| (v + modifier) as u64),
1058 );
1059 }
1060 }
1061 _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1062 }
1063 }
1064
1065 fn do_encode_u64(
1066 offset_arrays: Vec<ArrayRef>,
1067 validity: Vec<Option<&BooleanArray>>,
1068 num_offsets: u64,
1069 null_offset_adjustment: u64,
1070 buffer_index: &mut u32,
1071 inner_encoder: Arc<dyn ArrayEncoder>,
1072 ) -> Result<EncodedArray> {
1073 let mut offsets = Vec::with_capacity(num_offsets as usize);
1074 for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1075 let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1076 Self::extend_offsets_vec_u64(
1077 &mut offsets,
1078 &offsets_arr,
1079 validity_arr,
1080 last_prev_offset,
1081 null_offset_adjustment,
1082 );
1083 }
1084 let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1085 bits_per_value: 64,
1086 data: LanceBuffer::reinterpret_vec(offsets),
1087 num_values: num_offsets,
1088 block_info: BlockInfo::new(),
1089 });
1090 inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1091 }
1092
1093 fn do_encode(
1094 offset_arrays: Vec<ArrayRef>,
1095 validity_arrays: Vec<ArrayRef>,
1096 buffer_index: &mut u32,
1097 num_offsets: u64,
1098 inner_encoder: Arc<dyn ArrayEncoder>,
1099 ) -> Result<EncodedArray> {
1100 let validity_arrays = validity_arrays
1101 .iter()
1102 .map(|v| {
1103 if v.is_empty() {
1104 None
1105 } else {
1106 Some(v.as_boolean())
1107 }
1108 })
1109 .collect::<Vec<_>>();
1110 debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1111 let total_span = offset_arrays
1112 .iter()
1113 .map(|arr| Self::get_offset_span(arr.as_ref()))
1114 .sum::<u64>();
1115 let null_offset_adjustment = total_span + 1;
1117 let encoded_offsets = Self::do_encode_u64(
1118 offset_arrays,
1119 validity_arrays,
1120 num_offsets,
1121 null_offset_adjustment,
1122 buffer_index,
1123 inner_encoder,
1124 )?;
1125 Ok(EncodedArray {
1126 data: encoded_offsets.data,
1127 encoding: pb::ArrayEncoding {
1128 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1129 pb::List {
1130 offsets: Some(Box::new(encoded_offsets.encoding)),
1131 null_offset_adjustment,
1132 num_items: total_span,
1133 },
1134 ))),
1135 },
1136 })
1137 }
1138}
1139
1140pub struct ListFieldEncoder {
1141 offsets_encoder: ListOffsetsEncoder,
1142 items_encoder: Box<dyn FieldEncoder>,
1143}
1144
1145impl ListFieldEncoder {
1146 pub fn new(
1147 items_encoder: Box<dyn FieldEncoder>,
1148 inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1149 cache_bytes_per_columns: u64,
1150 keep_original_array: bool,
1151 column_index: u32,
1152 ) -> Self {
1153 Self {
1154 offsets_encoder: ListOffsetsEncoder::new(
1155 cache_bytes_per_columns,
1156 keep_original_array,
1157 column_index,
1158 inner_offsets_encoder,
1159 ),
1160 items_encoder,
1161 }
1162 }
1163
1164 fn combine_tasks(
1165 offsets_tasks: Vec<EncodeTask>,
1166 item_tasks: Vec<EncodeTask>,
1167 ) -> Result<Vec<EncodeTask>> {
1168 let mut all_tasks = offsets_tasks;
1169 let item_tasks = item_tasks;
1170 all_tasks.extend(item_tasks);
1171 Ok(all_tasks)
1172 }
1173}
1174
1175impl FieldEncoder for ListFieldEncoder {
1176 fn maybe_encode(
1177 &mut self,
1178 array: ArrayRef,
1179 external_buffers: &mut OutOfLineBuffers,
1180 repdef: RepDefBuilder,
1181 row_number: u64,
1182 num_rows: u64,
1183 ) -> Result<Vec<EncodeTask>> {
1184 let items = match array.data_type() {
1188 DataType::List(_) => {
1189 let list_arr = array.as_list::<i32>();
1190 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1191 let items_end =
1192 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1193 list_arr
1194 .values()
1195 .slice(items_start, items_end - items_start)
1196 }
1197 DataType::LargeList(_) => {
1198 let list_arr = array.as_list::<i64>();
1199 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1200 let items_end =
1201 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1202 list_arr
1203 .values()
1204 .slice(items_start, items_end - items_start)
1205 }
1206 _ => panic!(),
1207 };
1208 let offsets_tasks = self
1209 .offsets_encoder
1210 .maybe_encode_offsets_and_validity(array.as_ref())
1211 .map(|task| vec![task])
1212 .unwrap_or_default();
1213 let mut item_tasks = self.items_encoder.maybe_encode(
1214 items,
1215 external_buffers,
1216 repdef,
1217 row_number,
1218 num_rows,
1219 )?;
1220 if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1221 item_tasks = self.items_encoder.flush(external_buffers)?;
1227 }
1228 Self::combine_tasks(offsets_tasks, item_tasks)
1229 }
1230
1231 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1232 let offsets_tasks = self
1233 .offsets_encoder
1234 .flush()
1235 .map(|task| vec![task])
1236 .unwrap_or_default();
1237 let item_tasks = self.items_encoder.flush(external_buffers)?;
1238 Self::combine_tasks(offsets_tasks, item_tasks)
1239 }
1240
1241 fn num_columns(&self) -> u32 {
1242 self.items_encoder.num_columns() + 1
1243 }
1244
1245 fn finish(
1246 &mut self,
1247 external_buffers: &mut OutOfLineBuffers,
1248 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1249 let inner_columns = self.items_encoder.finish(external_buffers);
1250 async move {
1251 let mut columns = vec![EncodedColumn::default()];
1252 let inner_columns = inner_columns.await?;
1253 columns.extend(inner_columns);
1254 Ok(columns)
1255 }
1256 .boxed()
1257 }
1258}