1use std::{
5 collections::VecDeque,
6 ops::Range,
7 sync::{Arc, OnceLock},
8};
9
10use arrow_array::{
11 Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
12 cast::AsArray,
13 new_empty_array,
14 types::{Int32Type, Int64Type, UInt64Type},
15};
16use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
17use arrow_schema::{DataType, Field, Fields};
18use futures::{FutureExt, future::BoxFuture};
19use lance_core::{Error, Result, cache::LanceCache, utils::parse::str_is_truthy};
20use log::trace;
21use tokio::task::JoinHandle;
22
23use crate::{
24 EncodingsIo,
25 buffer::LanceBuffer,
26 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
27 decoder::{
28 DecodeArrayTask, DecodeBatchScheduler, FilterExpression, ListPriorityRange, MessageType,
29 NextDecodeTask, PageEncoding, PriorityRange, ScheduledScanLine, SchedulerContext,
30 },
31 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
32 format::pb,
33 previous::{
34 decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob},
35 encoder::{ArrayEncoder, EncodedArray},
36 encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler},
37 },
38 repdef::RepDefBuilder,
39 utils::accumulation::AccumulationQueue,
40};
41
42static BYPASS_INDIRECT_IO_BACKPRESSURE: OnceLock<bool> = OnceLock::new();
47
48fn bypass_indirect_io_backpressure() -> bool {
49 *BYPASS_INDIRECT_IO_BACKPRESSURE.get_or_init(|| {
50 std::env::var("LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE")
51 .map(|val| str_is_truthy(&val))
52 .unwrap_or(false)
53 })
54}
55
56#[derive(Debug)]
81struct ListRequest {
82 num_lists: u64,
84 includes_extra_offset: bool,
86 null_offset_adjustment: u64,
88 items_offset: u64,
90}
91
92#[derive(Debug)]
93struct ListRequestsIter {
94 list_requests: VecDeque<ListRequest>,
96 offsets_requests: Vec<Range<u64>>,
97}
98
99impl ListRequestsIter {
100 fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
103 let mut items_offset = 0;
104 let mut offsets_offset = 0;
105 let mut page_infos_iter = page_infos.iter();
106 let mut cur_page_info = page_infos_iter.next().unwrap();
107 let mut list_requests = VecDeque::new();
108 let mut offsets_requests = Vec::new();
109
110 for range in row_ranges {
113 let mut range = range.clone();
114
115 while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
117 trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
118 offsets_offset += cur_page_info.offsets_in_page;
119 items_offset += cur_page_info.num_items_referenced_by_page;
120 cur_page_info = page_infos_iter.next().unwrap();
121 }
122
123 let mut includes_extra_offset = range.start != offsets_offset;
126 if includes_extra_offset {
127 offsets_requests.push(range.start - 1..range.end);
128 } else {
129 offsets_requests.push(range.clone());
130 }
131
132 while !range.is_empty() {
135 let end = offsets_offset + cur_page_info.offsets_in_page;
138 let last = end >= range.end;
139 let end = end.min(range.end);
140 list_requests.push_back(ListRequest {
141 num_lists: end - range.start,
142 includes_extra_offset,
143 null_offset_adjustment: cur_page_info.null_offset_adjustment,
144 items_offset,
145 });
146
147 includes_extra_offset = false;
148 range.start = end;
149 if !last {
152 offsets_offset += cur_page_info.offsets_in_page;
153 items_offset += cur_page_info.num_items_referenced_by_page;
154 cur_page_info = page_infos_iter.next().unwrap();
155 }
156 }
157 }
158 Self {
159 list_requests,
160 offsets_requests,
161 }
162 }
163
164 fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
166 let mut list_requests = Vec::new();
167 while num_offsets > 0 {
168 let req = self.list_requests.front_mut().unwrap();
169 if req.includes_extra_offset {
171 num_offsets -= 1;
172 debug_assert_ne!(num_offsets, 0);
173 }
174 if num_offsets >= req.num_lists {
175 num_offsets -= req.num_lists;
176 list_requests.push(self.list_requests.pop_front().unwrap());
177 } else {
178 let sub_req = ListRequest {
179 num_lists: num_offsets,
180 includes_extra_offset: req.includes_extra_offset,
181 null_offset_adjustment: req.null_offset_adjustment,
182 items_offset: req.items_offset,
183 };
184
185 list_requests.push(sub_req);
186 req.includes_extra_offset = false;
187 req.num_lists -= num_offsets;
188 num_offsets = 0;
189 }
190 }
191 list_requests
192 }
193}
194
195fn decode_offsets(
215 offsets: &dyn Array,
216 list_requests: &[ListRequest],
217 null_offset_adjustment: u64,
218) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
219 let numeric_offsets = offsets.as_primitive::<UInt64Type>();
221 let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
223 let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
224 let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
225 normalized_offsets.push(0);
227 let mut last_normalized_offset = 0;
228 let offsets_values = numeric_offsets.values();
229
230 let mut item_ranges = VecDeque::new();
231 let mut offsets_offset: u32 = 0;
232 debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
234 for req in list_requests {
235 let num_lists = req.num_lists;
237
238 let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
244 if !req.includes_extra_offset {
245 let first_offset_idx = 0_usize;
247 let num_offsets = num_lists as usize;
248 let items_start = 0;
249 let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
250 let items_range = items_start..items_end;
251 (items_range, first_offset_idx, num_offsets)
252 } else {
253 let first_offset_idx = offsets_offset as usize;
256 let num_offsets = num_lists as usize + 1;
257 let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
258 let items_end =
259 offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
260 let items_range = items_start..items_end;
261 (items_range, first_offset_idx, num_offsets)
262 };
263
264 let validity_start = if !req.includes_extra_offset {
277 0
278 } else {
279 offsets_to_norm_start + 1
280 };
281 for off in offsets_values
282 .slice(validity_start, num_lists as usize)
283 .iter()
284 {
285 validity_buffer.append(*off < null_offset_adjustment);
286 }
287
288 if !req.includes_extra_offset {
290 let first_item = offsets_values[0] % null_offset_adjustment;
291 normalized_offsets.push(first_item);
292 last_normalized_offset = first_item;
293 }
294
295 normalized_offsets.extend(
299 offsets_values
300 .slice(offsets_to_norm_start, num_offsets_to_norm)
301 .windows(2)
302 .map(|w| {
303 let start = w[0] % null_offset_adjustment;
304 let end = w[1] % null_offset_adjustment;
305 if end < start {
306 panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
307 }
308 let length = end - start;
309 last_normalized_offset += length;
310 last_normalized_offset
311 }),
312 );
313 trace!(
314 "List offsets range of {} lists maps to item range {:?}",
315 num_lists, items_range
316 );
317 offsets_offset += num_offsets_to_norm as u32;
318 if !items_range.is_empty() {
319 let items_range =
320 items_range.start + req.items_offset..items_range.end + req.items_offset;
321 item_ranges.push_back(items_range);
322 }
323 }
324
325 let validity = validity_buffer.finish();
326 (item_ranges, normalized_offsets, validity)
327}
328
329#[allow(clippy::too_many_arguments)]
336async fn indirect_schedule_task(
337 mut offsets_decoder: Box<dyn LogicalPageDecoder>,
338 list_requests: Vec<ListRequest>,
339 null_offset_adjustment: u64,
340 items_scheduler: Arc<dyn FieldScheduler>,
341 items_type: DataType,
342 io: Arc<dyn EncodingsIo>,
343 cache: Arc<LanceCache>,
344 priority: Box<dyn PriorityRange>,
345) -> Result<IndirectlyLoaded> {
346 let num_offsets = offsets_decoder.num_rows();
347 offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
350 let decode_task = offsets_decoder.drain(num_offsets)?;
351 let (offsets, _) = decode_task.task.decode()?;
352
353 let (item_ranges, offsets, validity) =
354 decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
355
356 trace!(
357 "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
358 item_ranges,
359 items_scheduler.num_rows(),
360 priority
361 );
362 let offsets: Arc<[u64]> = offsets.into();
363
364 if item_ranges.is_empty() {
366 debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
367 return Ok(IndirectlyLoaded {
368 root_decoder: None,
369 offsets,
370 validity,
371 });
372 }
373 let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
374 let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
375
376 let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
378 let indirect_root_scheduler =
379 SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
380 #[allow(deprecated)]
381 let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
382 Arc::new(indirect_root_scheduler),
383 root_fields.clone(),
384 cache,
385 );
386 let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
387
388 let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
389
390 let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
391 &item_ranges,
392 &FilterExpression::no_filter(),
394 io,
395 Some(priority),
396 )?;
397
398 for message in indirect_messages {
399 for decoder in message.decoders {
400 let decoder = decoder.into_legacy();
401 if !decoder.path.is_empty() {
402 root_decoder.accept_child(decoder)?;
403 }
404 }
405 }
406
407 Ok(IndirectlyLoaded {
408 offsets,
409 validity,
410 root_decoder: Some(root_decoder),
411 })
412}
413
414#[derive(Debug)]
415struct ListFieldSchedulingJob<'a> {
416 scheduler: &'a ListFieldScheduler,
417 offsets: Box<dyn SchedulingJob + 'a>,
418 num_rows: u64,
419 list_requests_iter: ListRequestsIter,
420}
421
422impl<'a> ListFieldSchedulingJob<'a> {
423 fn try_new(
424 scheduler: &'a ListFieldScheduler,
425 ranges: &[Range<u64>],
426 filter: &FilterExpression,
427 ) -> Result<Self> {
428 let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
429 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
430 let offsets = scheduler
431 .offsets_scheduler
432 .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
433 Ok(Self {
434 scheduler,
435 offsets,
436 list_requests_iter,
437 num_rows,
438 })
439 }
440}
441
442impl SchedulingJob for ListFieldSchedulingJob<'_> {
443 fn schedule_next(
444 &mut self,
445 context: &mut SchedulerContext,
446 priority: &dyn PriorityRange,
447 ) -> Result<ScheduledScanLine> {
448 let next_offsets = self.offsets.schedule_next(context, priority)?;
449 let offsets_scheduled = next_offsets.rows_scheduled;
450 let list_reqs = self.list_requests_iter.next(offsets_scheduled);
451 trace!(
452 "Scheduled {} offsets which maps to list requests: {:?}",
453 offsets_scheduled, list_reqs
454 );
455 let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
456 debug_assert!(
459 list_reqs
460 .iter()
461 .all(|req| req.null_offset_adjustment == null_offset_adjustment)
462 );
463 let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
464 let next_offsets_decoder = next_offsets
466 .decoders
467 .into_iter()
468 .next()
469 .unwrap()
470 .into_legacy()
471 .decoder;
472
473 let items_scheduler = self.scheduler.items_scheduler.clone();
474 let items_type = self.scheduler.items_field.data_type().clone();
475 let base_io = context.io().clone();
476 let io = if bypass_indirect_io_backpressure() {
477 base_io.with_bypass_backpressure().unwrap_or(base_io)
478 } else {
479 base_io
480 };
481 let cache = context.cache().clone();
482
483 let indirect_fut = tokio::spawn(indirect_schedule_task(
485 next_offsets_decoder,
486 list_reqs,
487 null_offset_adjustment,
488 items_scheduler,
489 items_type,
490 io,
491 cache,
492 priority.box_clone(),
493 ));
494
495 let decoder = Box::new(ListPageDecoder {
497 offsets: Arc::new([]),
498 validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
499 item_decoder: None,
500 rows_drained: 0,
501 rows_loaded: 0,
502 items_field: self.scheduler.items_field.clone(),
503 num_rows,
504 unloaded: Some(indirect_fut),
505 offset_type: self.scheduler.offset_type.clone(),
506 data_type: self.scheduler.list_type.clone(),
507 });
508 #[allow(deprecated)]
509 let decoder = context.locate_decoder(decoder);
510 Ok(ScheduledScanLine {
511 decoders: vec![MessageType::DecoderReady(decoder)],
512 rows_scheduled: num_rows,
513 })
514 }
515
516 fn num_rows(&self) -> u64 {
517 self.num_rows
518 }
519}
520
521#[derive(Debug)]
536pub struct ListFieldScheduler {
537 offsets_scheduler: Arc<dyn FieldScheduler>,
538 items_scheduler: Arc<dyn FieldScheduler>,
539 items_field: Arc<Field>,
540 offset_type: DataType,
541 list_type: DataType,
542 offset_page_info: Vec<OffsetPageInfo>,
543}
544
545#[derive(Debug)]
549pub struct OffsetPageInfo {
550 pub offsets_in_page: u64,
551 pub null_offset_adjustment: u64,
552 pub num_items_referenced_by_page: u64,
553}
554
555impl ListFieldScheduler {
556 pub fn new(
558 offsets_scheduler: Arc<dyn FieldScheduler>,
559 items_scheduler: Arc<dyn FieldScheduler>,
560 items_field: Arc<Field>,
561 offset_type: DataType,
563 offset_page_info: Vec<OffsetPageInfo>,
564 ) -> Self {
565 let list_type = match &offset_type {
566 DataType::Int32 => DataType::List(items_field.clone()),
567 DataType::Int64 => DataType::LargeList(items_field.clone()),
568 _ => panic!("Unexpected offset type {}", offset_type),
569 };
570 Self {
571 offsets_scheduler,
572 items_scheduler,
573 items_field,
574 offset_type,
575 offset_page_info,
576 list_type,
577 }
578 }
579}
580
581impl FieldScheduler for ListFieldScheduler {
582 fn schedule_ranges<'a>(
583 &'a self,
584 ranges: &[Range<u64>],
585 filter: &FilterExpression,
586 ) -> Result<Box<dyn SchedulingJob + 'a>> {
587 Ok(Box::new(ListFieldSchedulingJob::try_new(
588 self, ranges, filter,
589 )?))
590 }
591
592 fn num_rows(&self) -> u64 {
593 self.offsets_scheduler.num_rows()
594 }
595
596 fn initialize<'a>(
597 &'a self,
598 _filter: &'a FilterExpression,
599 _context: &'a SchedulerContext,
600 ) -> BoxFuture<'a, Result<()>> {
601 std::future::ready(Ok(())).boxed()
603 }
604}
605
606#[derive(Debug)]
617struct ListPageDecoder {
618 unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
619 offsets: Arc<[u64]>,
621 validity: BooleanBuffer,
622 item_decoder: Option<SimpleStructDecoder>,
623 num_rows: u64,
624 rows_drained: u64,
625 rows_loaded: u64,
626 items_field: Arc<Field>,
627 offset_type: DataType,
628 data_type: DataType,
629}
630
631struct ListDecodeTask {
632 offsets: Vec<u64>,
633 validity: BooleanBuffer,
634 items: Option<Box<dyn DecodeArrayTask>>,
636 items_field: Arc<Field>,
637 offset_type: DataType,
638}
639
640impl DecodeArrayTask for ListDecodeTask {
641 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
642 let items = self
643 .items
644 .map(|items| {
645 let (wrapped_items, _) = items.decode()?;
648 Result::Ok(wrapped_items.as_struct().column(0).clone())
649 })
650 .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
651
652 let offsets = UInt64Array::from(self.offsets);
658 let validity = NullBuffer::new(self.validity);
659 let validity = if validity.null_count() == 0 {
660 None
661 } else {
662 Some(validity)
663 };
664 let min_offset = UInt64Array::new_scalar(offsets.value(0));
665 let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
666 let array: ArrayRef = match &self.offset_type {
667 DataType::Int32 => {
668 let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
669 let offsets_i32 = offsets.as_primitive::<Int32Type>();
670 let offsets = OffsetBuffer::new(offsets_i32.values().clone());
671
672 Arc::new(ListArray::try_new(
673 self.items_field.clone(),
674 offsets,
675 items,
676 validity,
677 )?)
678 }
679 DataType::Int64 => {
680 let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
681 let offsets_i64 = offsets.as_primitive::<Int64Type>();
682 let offsets = OffsetBuffer::new(offsets_i64.values().clone());
683
684 Arc::new(LargeListArray::try_new(
685 self.items_field.clone(),
686 offsets,
687 items,
688 validity,
689 )?)
690 }
691 _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
692 };
693 Ok((array, 0))
696 }
697}
698
699fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
704 let mut result = match to_search.binary_search(&target) {
705 Ok(idx) => idx,
706 Err(idx) => idx - 1,
707 };
708 while result < (to_search.len() - 1) && to_search[result + 1] == target {
709 result += 1;
710 }
711 result as u64
712}
713
714impl LogicalPageDecoder for ListPageDecoder {
715 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
716 async move {
717 if self.unloaded.is_some() {
720 trace!("List scheduler needs to wait for indirect I/O to complete");
721 let indirectly_loaded = self.unloaded.take().unwrap().await;
722 if let Err(err) = indirectly_loaded {
723 match err.try_into_panic() {
724 Ok(err) => std::panic::resume_unwind(err),
725 Err(err) => panic!("{:?}", err),
726 };
727 }
728 let indirectly_loaded = indirectly_loaded.unwrap()?;
729
730 self.offsets = indirectly_loaded.offsets;
731 self.validity = indirectly_loaded.validity;
732 self.item_decoder = indirectly_loaded.root_decoder;
733 }
734 if self.rows_loaded > loaded_need {
735 return Ok(());
736 }
737
738 let boundary = loaded_need as usize;
739 debug_assert!(boundary < self.num_rows as usize);
740 let items_needed = self.offsets[boundary + 1].saturating_sub(1);
743 trace!(
744 "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
745 loaded_need,
746 self.rows_loaded,
747 self.num_rows,
748 items_needed,
749 );
750
751 let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
752 item_decoder.wait_for_loaded(items_needed).await?;
753 item_decoder.rows_loaded()
754 } else {
755 0
756 };
757
758 self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
759 trace!("List decoder now has {} loaded rows", self.rows_loaded);
760
761 Ok(())
762 }
763 .boxed()
764 }
765
766 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
767 let mut actual_num_rows = num_rows;
769 let item_start = self.offsets[self.rows_drained as usize];
770 if self.offset_type != DataType::Int64 {
771 while actual_num_rows > 0 {
774 let num_items =
775 self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
776 if num_items <= i32::MAX as u64 {
777 break;
778 }
779 actual_num_rows -= 1;
782 }
783 }
784 if actual_num_rows < num_rows {
785 return Err(Error::not_supported_source(format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into()));
790 }
791 let offsets = self.offsets
792 [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
793 .to_vec();
794 let validity = self
795 .validity
796 .slice(self.rows_drained as usize, actual_num_rows as usize);
797 let start = offsets[0];
798 let end = offsets[offsets.len() - 1];
799 let num_items_to_drain = end - start;
800
801 let item_decode = if num_items_to_drain == 0 {
802 None
803 } else {
804 self.item_decoder
805 .as_mut()
806 .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
807 .transpose()?
808 };
809
810 self.rows_drained += num_rows;
811 Ok(NextDecodeTask {
812 num_rows,
813 task: Box::new(ListDecodeTask {
814 offsets,
815 validity,
816 items_field: self.items_field.clone(),
817 items: item_decode,
818 offset_type: self.offset_type.clone(),
819 }) as Box<dyn DecodeArrayTask>,
820 })
821 }
822
823 fn num_rows(&self) -> u64 {
824 self.num_rows
825 }
826
827 fn rows_loaded(&self) -> u64 {
828 self.rows_loaded
829 }
830
831 fn rows_drained(&self) -> u64 {
832 self.rows_drained
833 }
834
835 fn data_type(&self) -> &DataType {
836 &self.data_type
837 }
838}
839
840struct IndirectlyLoaded {
841 offsets: Arc<[u64]>,
842 validity: BooleanBuffer,
843 root_decoder: Option<SimpleStructDecoder>,
844}
845
846impl std::fmt::Debug for IndirectlyLoaded {
847 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
848 f.debug_struct("IndirectlyLoaded")
849 .field("offsets", &self.offsets)
850 .field("validity", &self.validity)
851 .finish()
852 }
853}
854
855#[derive(Debug)]
885struct ListOffsetsEncoder {
886 accumulation_queue: AccumulationQueue,
888 inner_encoder: Arc<dyn ArrayEncoder>,
890 column_index: u32,
891}
892
893impl ListOffsetsEncoder {
894 fn new(
895 cache_bytes: u64,
896 keep_original_array: bool,
897 column_index: u32,
898 inner_encoder: Arc<dyn ArrayEncoder>,
899 ) -> Self {
900 Self {
901 accumulation_queue: AccumulationQueue::new(
902 cache_bytes,
903 column_index,
904 keep_original_array,
905 ),
906 inner_encoder,
907 column_index,
908 }
909 }
910
911 fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
913 match list_arr.data_type() {
914 DataType::List(_) => {
915 let offsets = list_arr.as_list::<i32>().offsets().clone();
916 Arc::new(Int32Array::new(offsets.into_inner(), None))
917 }
918 DataType::LargeList(_) => {
919 let offsets = list_arr.as_list::<i64>().offsets().clone();
920 Arc::new(Int64Array::new(offsets.into_inner(), None))
921 }
922 _ => panic!(),
923 }
924 }
925
926 fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
929 if let Some(validity) = list_arr.nulls() {
930 Arc::new(BooleanArray::new(validity.inner().clone(), None))
931 } else {
932 new_empty_array(&DataType::Boolean)
935 }
936 }
937
938 fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
939 let inner_encoder = self.inner_encoder.clone();
940 let column_idx = self.column_index;
941 let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
944 let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
945
946 tokio::task::spawn(async move {
947 let num_rows =
948 offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
949 let num_rows = num_rows as u64;
950 let mut buffer_index = 0;
951 let array = Self::do_encode(
952 offset_arrays,
953 validity_arrays,
954 &mut buffer_index,
955 num_rows,
956 inner_encoder,
957 )?;
958 let (data, description) = array.into_buffers();
959 Ok(EncodedPage {
960 data,
961 description: PageEncoding::Legacy(description),
962 num_rows,
963 column_idx,
964 row_number: 0, })
966 })
967 .map(|res_res| res_res.unwrap())
968 .boxed()
969 }
970
971 fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
972 let offsets = Self::extract_offsets(list_arr);
973 let validity = Self::extract_validity(list_arr);
974 let num_rows = offsets.len() as u64;
975 if let Some(mut arrays) = self
978 .accumulation_queue
979 .insert(offsets, 0, num_rows)
980 {
981 arrays.0.push(validity);
982 Some(self.make_encode_task(arrays.0))
983 } else if let Some(arrays) = self
984 .accumulation_queue
985 .insert(validity, 0, num_rows)
986 {
987 Some(self.make_encode_task(arrays.0))
988 } else {
989 None
990 }
991 }
992
993 fn flush(&mut self) -> Option<EncodeTask> {
994 if let Some(arrays) = self.accumulation_queue.flush() {
995 Some(self.make_encode_task(arrays.0))
996 } else {
997 None
998 }
999 }
1000
1001 fn get_offset_span(array: &dyn Array) -> u64 {
1004 match array.data_type() {
1005 DataType::Int32 => {
1006 let arr_i32 = array.as_primitive::<Int32Type>();
1007 (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
1008 }
1009 DataType::Int64 => {
1010 let arr_i64 = array.as_primitive::<Int64Type>();
1011 (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
1012 }
1013 _ => panic!(),
1014 }
1015 }
1016
1017 fn extend_offsets_vec_u64(
1020 dest: &mut Vec<u64>,
1021 offsets: &dyn Array,
1022 validity: Option<&BooleanArray>,
1023 base: u64,
1025 null_offset_adjustment: u64,
1026 ) {
1027 match offsets.data_type() {
1028 DataType::Int32 => {
1029 let offsets_i32 = offsets.as_primitive::<Int32Type>();
1030 let start = offsets_i32.value(0) as u64;
1031 let modifier = base as i64 - start as i64;
1035 if let Some(validity) = validity {
1036 dest.extend(
1037 offsets_i32
1038 .values()
1039 .iter()
1040 .skip(1)
1041 .zip(validity.values().iter())
1042 .map(|(&off, valid)| {
1043 (off as i64 + modifier) as u64
1044 + (!valid as u64 * null_offset_adjustment)
1045 }),
1046 );
1047 } else {
1048 dest.extend(
1049 offsets_i32
1050 .values()
1051 .iter()
1052 .skip(1)
1053 .map(|&v| (v as i64 + modifier) as u64),
1055 );
1056 }
1057 }
1058 DataType::Int64 => {
1059 let offsets_i64 = offsets.as_primitive::<Int64Type>();
1060 let start = offsets_i64.value(0) as u64;
1061 let modifier = base as i64 - start as i64;
1065 if let Some(validity) = validity {
1066 dest.extend(
1067 offsets_i64
1068 .values()
1069 .iter()
1070 .skip(1)
1071 .zip(validity.values().iter())
1072 .map(|(&off, valid)| {
1073 (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1074 }),
1075 )
1076 } else {
1077 dest.extend(
1078 offsets_i64
1079 .values()
1080 .iter()
1081 .skip(1)
1082 .map(|&v| (v + modifier) as u64),
1083 );
1084 }
1085 }
1086 _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1087 }
1088 }
1089
1090 fn do_encode_u64(
1091 offset_arrays: Vec<ArrayRef>,
1092 validity: Vec<Option<&BooleanArray>>,
1093 num_offsets: u64,
1094 null_offset_adjustment: u64,
1095 buffer_index: &mut u32,
1096 inner_encoder: Arc<dyn ArrayEncoder>,
1097 ) -> Result<EncodedArray> {
1098 let mut offsets = Vec::with_capacity(num_offsets as usize);
1099 for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1100 let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1101 Self::extend_offsets_vec_u64(
1102 &mut offsets,
1103 &offsets_arr,
1104 validity_arr,
1105 last_prev_offset,
1106 null_offset_adjustment,
1107 );
1108 }
1109 let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1110 bits_per_value: 64,
1111 data: LanceBuffer::reinterpret_vec(offsets),
1112 num_values: num_offsets,
1113 block_info: BlockInfo::new(),
1114 });
1115 inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1116 }
1117
1118 fn do_encode(
1119 offset_arrays: Vec<ArrayRef>,
1120 validity_arrays: Vec<ArrayRef>,
1121 buffer_index: &mut u32,
1122 num_offsets: u64,
1123 inner_encoder: Arc<dyn ArrayEncoder>,
1124 ) -> Result<EncodedArray> {
1125 let validity_arrays = validity_arrays
1126 .iter()
1127 .map(|v| {
1128 if v.is_empty() {
1129 None
1130 } else {
1131 Some(v.as_boolean())
1132 }
1133 })
1134 .collect::<Vec<_>>();
1135 debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1136 let total_span = offset_arrays
1137 .iter()
1138 .map(|arr| Self::get_offset_span(arr.as_ref()))
1139 .sum::<u64>();
1140 let null_offset_adjustment = total_span + 1;
1142 let encoded_offsets = Self::do_encode_u64(
1143 offset_arrays,
1144 validity_arrays,
1145 num_offsets,
1146 null_offset_adjustment,
1147 buffer_index,
1148 inner_encoder,
1149 )?;
1150 Ok(EncodedArray {
1151 data: encoded_offsets.data,
1152 encoding: pb::ArrayEncoding {
1153 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1154 pb::List {
1155 offsets: Some(Box::new(encoded_offsets.encoding)),
1156 null_offset_adjustment,
1157 num_items: total_span,
1158 },
1159 ))),
1160 },
1161 })
1162 }
1163}
1164
1165pub struct ListFieldEncoder {
1166 offsets_encoder: ListOffsetsEncoder,
1167 items_encoder: Box<dyn FieldEncoder>,
1168}
1169
1170impl ListFieldEncoder {
1171 pub fn new(
1172 items_encoder: Box<dyn FieldEncoder>,
1173 inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1174 cache_bytes_per_columns: u64,
1175 keep_original_array: bool,
1176 column_index: u32,
1177 ) -> Self {
1178 Self {
1179 offsets_encoder: ListOffsetsEncoder::new(
1180 cache_bytes_per_columns,
1181 keep_original_array,
1182 column_index,
1183 inner_offsets_encoder,
1184 ),
1185 items_encoder,
1186 }
1187 }
1188
1189 fn combine_tasks(
1190 offsets_tasks: Vec<EncodeTask>,
1191 item_tasks: Vec<EncodeTask>,
1192 ) -> Result<Vec<EncodeTask>> {
1193 let mut all_tasks = offsets_tasks;
1194 let item_tasks = item_tasks;
1195 all_tasks.extend(item_tasks);
1196 Ok(all_tasks)
1197 }
1198}
1199
1200impl FieldEncoder for ListFieldEncoder {
1201 fn maybe_encode(
1202 &mut self,
1203 array: ArrayRef,
1204 external_buffers: &mut OutOfLineBuffers,
1205 repdef: RepDefBuilder,
1206 row_number: u64,
1207 num_rows: u64,
1208 ) -> Result<Vec<EncodeTask>> {
1209 let items = match array.data_type() {
1213 DataType::List(_) => {
1214 let list_arr = array.as_list::<i32>();
1215 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1216 let items_end =
1217 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1218 list_arr
1219 .values()
1220 .slice(items_start, items_end - items_start)
1221 }
1222 DataType::LargeList(_) => {
1223 let list_arr = array.as_list::<i64>();
1224 let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1225 let items_end =
1226 list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1227 list_arr
1228 .values()
1229 .slice(items_start, items_end - items_start)
1230 }
1231 _ => panic!(),
1232 };
1233 let offsets_tasks = self
1234 .offsets_encoder
1235 .maybe_encode_offsets_and_validity(array.as_ref())
1236 .map(|task| vec![task])
1237 .unwrap_or_default();
1238 let mut item_tasks = self.items_encoder.maybe_encode(
1239 items,
1240 external_buffers,
1241 repdef,
1242 row_number,
1243 num_rows,
1244 )?;
1245 if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1246 item_tasks = self.items_encoder.flush(external_buffers)?;
1252 }
1253 Self::combine_tasks(offsets_tasks, item_tasks)
1254 }
1255
1256 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1257 let offsets_tasks = self
1258 .offsets_encoder
1259 .flush()
1260 .map(|task| vec![task])
1261 .unwrap_or_default();
1262 let item_tasks = self.items_encoder.flush(external_buffers)?;
1263 Self::combine_tasks(offsets_tasks, item_tasks)
1264 }
1265
1266 fn num_columns(&self) -> u32 {
1267 self.items_encoder.num_columns() + 1
1268 }
1269
1270 fn finish(
1271 &mut self,
1272 external_buffers: &mut OutOfLineBuffers,
1273 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1274 let inner_columns = self.items_encoder.finish(external_buffers);
1275 async move {
1276 let mut columns = vec![EncodedColumn::default()];
1277 let inner_columns = inner_columns.await?;
1278 columns.extend(inner_columns);
1279 Ok(columns)
1280 }
1281 .boxed()
1282 }
1283}