1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use lance_core::utils::futures::FinallyStreamExt;
230use log::{debug, trace, warn};
231use snafu::location;
232use tokio::sync::mpsc::error::SendError;
233use tokio::sync::mpsc::{self, unbounded_channel};
234
235use lance_core::{ArrowResult, Error, Result};
236use tracing::instrument;
237
238use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
239use crate::data::DataBlock;
240use crate::encoder::EncodedBatch;
241use crate::encodings::logical::list::StructuralListScheduler;
242use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
243use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
244use crate::format::pb::{self, column_encoding};
245use crate::format::pb21;
246use crate::previous::decoder::LogicalPageDecoder;
247use crate::previous::encodings::logical::list::OffsetPageInfo;
248use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
249use crate::previous::encodings::logical::{
250 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
251 primitive::PrimitiveFieldScheduler,
252};
253use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
254use crate::version::LanceFileVersion;
255use crate::{BufferScheduler, EncodingsIo};
256
257const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
259
260#[derive(Debug)]
267pub enum PageEncoding {
268 Legacy(pb::ArrayEncoding),
269 Structural(pb21::PageLayout),
270}
271
272impl PageEncoding {
273 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
274 match self {
275 Self::Legacy(enc) => enc,
276 Self::Structural(_) => panic!("Expected a legacy encoding"),
277 }
278 }
279
280 pub fn as_structural(&self) -> &pb21::PageLayout {
281 match self {
282 Self::Structural(enc) => enc,
283 Self::Legacy(_) => panic!("Expected a structural encoding"),
284 }
285 }
286
287 pub fn is_structural(&self) -> bool {
288 matches!(self, Self::Structural(_))
289 }
290}
291
292#[derive(Debug)]
296pub struct PageInfo {
297 pub num_rows: u64,
299 pub priority: u64,
303 pub encoding: PageEncoding,
305 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
307}
308
309#[derive(Debug, Clone)]
313pub struct ColumnInfo {
314 pub index: u32,
316 pub page_infos: Arc<[PageInfo]>,
318 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
320 pub encoding: pb::ColumnEncoding,
321}
322
323impl ColumnInfo {
324 pub fn new(
326 index: u32,
327 page_infos: Arc<[PageInfo]>,
328 buffer_offsets_and_sizes: Vec<(u64, u64)>,
329 encoding: pb::ColumnEncoding,
330 ) -> Self {
331 Self {
332 index,
333 page_infos,
334 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
335 encoding,
336 }
337 }
338
339 pub fn is_structural(&self) -> bool {
340 self.page_infos
341 .first()
343 .map(|page| page.encoding.is_structural())
344 .unwrap_or(false)
345 }
346}
347
348enum RootScheduler {
349 Structural(Box<dyn StructuralFieldScheduler>),
350 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
351}
352
353impl RootScheduler {
354 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
355 match self {
356 Self::Structural(_) => panic!("Expected a legacy scheduler"),
357 Self::Legacy(s) => s,
358 }
359 }
360
361 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
362 match self {
363 Self::Structural(s) => s.as_ref(),
364 Self::Legacy(_) => panic!("Expected a structural scheduler"),
365 }
366 }
367}
368
369pub struct DecodeBatchScheduler {
391 root_scheduler: RootScheduler,
392 pub root_fields: Fields,
393 cache: Arc<LanceCache>,
394}
395
396pub struct ColumnInfoIter<'a> {
397 column_infos: Vec<Arc<ColumnInfo>>,
398 column_indices: &'a [u32],
399 column_info_pos: usize,
400 column_indices_pos: usize,
401}
402
403impl<'a> ColumnInfoIter<'a> {
404 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
405 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
406 Self {
407 column_infos,
408 column_indices,
409 column_info_pos: initial_pos,
410 column_indices_pos: 0,
411 }
412 }
413
414 pub fn peek(&self) -> &Arc<ColumnInfo> {
415 &self.column_infos[self.column_info_pos]
416 }
417
418 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
419 let column_info = self.column_infos[self.column_info_pos].clone();
420 let transformed = transform(column_info);
421 self.column_infos[self.column_info_pos] = transformed;
422 }
423
424 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
425 self.next().ok_or_else(|| {
426 Error::invalid_input(
427 "there were more fields in the schema than provided column indices / infos",
428 location!(),
429 )
430 })
431 }
432
433 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
434 if self.column_info_pos < self.column_infos.len() {
435 let info = &self.column_infos[self.column_info_pos];
436 self.column_info_pos += 1;
437 Some(info)
438 } else {
439 None
440 }
441 }
442
443 pub(crate) fn next_top_level(&mut self) {
444 self.column_indices_pos += 1;
445 if self.column_indices_pos < self.column_indices.len() {
446 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
447 } else {
448 self.column_info_pos = self.column_infos.len();
449 }
450 }
451}
452
453#[derive(Clone, Copy, Debug)]
455pub struct FileBuffers<'a> {
456 pub positions_and_sizes: &'a [(u64, u64)],
457}
458
459#[derive(Clone, Copy, Debug)]
461pub struct ColumnBuffers<'a, 'b> {
462 pub file_buffers: FileBuffers<'a>,
463 pub positions_and_sizes: &'b [(u64, u64)],
464}
465
466#[derive(Clone, Copy, Debug)]
468pub struct PageBuffers<'a, 'b, 'c> {
469 pub column_buffers: ColumnBuffers<'a, 'b>,
470 pub positions_and_sizes: &'c [(u64, u64)],
471}
472
473#[derive(Debug)]
475pub struct CoreFieldDecoderStrategy {
476 pub validate_data: bool,
477 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
478 pub cache_repetition_index: bool,
479}
480
481impl Default for CoreFieldDecoderStrategy {
482 fn default() -> Self {
483 Self {
484 validate_data: false,
485 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
486 cache_repetition_index: false,
487 }
488 }
489}
490
491impl CoreFieldDecoderStrategy {
492 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
494 self.cache_repetition_index = cache_repetition_index;
495 self
496 }
497
498 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
500 Self {
501 validate_data: config.validate_on_decode,
502 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
503 cache_repetition_index: config.cache_repetition_index,
504 }
505 }
506
507 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
510 let column_encoding = column_info
511 .encoding
512 .column_encoding
513 .as_ref()
514 .ok_or_else(|| {
515 Error::invalid_input(
516 format!(
517 "the column at index {} was missing a ColumnEncoding",
518 column_info.index
519 ),
520 location!(),
521 )
522 })?;
523 if matches!(
524 column_encoding,
525 pb::column_encoding::ColumnEncoding::Values(_)
526 ) {
527 Ok(())
528 } else {
529 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
530 }
531 }
532
533 fn is_structural_primitive(data_type: &DataType) -> bool {
534 if data_type.is_primitive() {
535 true
536 } else {
537 match data_type {
538 DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
540 DataType::Boolean
541 | DataType::Null
542 | DataType::FixedSizeBinary(_)
543 | DataType::Binary
544 | DataType::LargeBinary
545 | DataType::Utf8
546 | DataType::LargeUtf8 => true,
547 DataType::FixedSizeList(inner, _) => {
548 Self::is_structural_primitive(inner.data_type())
549 }
550 _ => false,
551 }
552 }
553 }
554
555 fn is_primitive_legacy(data_type: &DataType) -> bool {
556 if data_type.is_primitive() {
557 true
558 } else {
559 match data_type {
560 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
562 DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
563 _ => false,
564 }
565 }
566 }
567
568 fn create_primitive_scheduler(
569 &self,
570 field: &Field,
571 column: &ColumnInfo,
572 buffers: FileBuffers,
573 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
574 Self::ensure_values_encoded(column, &field.name)?;
575 let column_buffers = ColumnBuffers {
577 file_buffers: buffers,
578 positions_and_sizes: &column.buffer_offsets_and_sizes,
579 };
580 Ok(Box::new(PrimitiveFieldScheduler::new(
581 column.index,
582 field.data_type(),
583 column.page_infos.clone(),
584 column_buffers,
585 self.validate_data,
586 )))
587 }
588
589 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
591 Self::ensure_values_encoded(column_info, field_name)?;
592 if column_info.page_infos.len() != 1 {
593 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
594 }
595 let encoding = &column_info.page_infos[0].encoding;
596 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
597 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
598 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
599 }
600 }
601
602 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
603 let encoding = &column_info.page_infos[0].encoding;
604 matches!(
605 encoding.as_legacy().array_encoding.as_ref().unwrap(),
606 pb::array_encoding::ArrayEncoding::PackedStruct(_)
607 )
608 }
609
610 fn create_list_scheduler(
611 &self,
612 list_field: &Field,
613 column_infos: &mut ColumnInfoIter,
614 buffers: FileBuffers,
615 offsets_column: &ColumnInfo,
616 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
617 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
618 let offsets_column_buffers = ColumnBuffers {
619 file_buffers: buffers,
620 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
621 };
622 let items_scheduler =
623 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
624
625 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
626 .page_infos
627 .iter()
628 .filter(|offsets_page| offsets_page.num_rows > 0)
629 .map(|offsets_page| {
630 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
631 &offsets_page.encoding.as_legacy().array_encoding
632 {
633 let inner = PageInfo {
634 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
635 encoding: PageEncoding::Legacy(
636 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
637 ),
638 num_rows: offsets_page.num_rows,
639 priority: 0,
640 };
641 (
642 inner,
643 OffsetPageInfo {
644 offsets_in_page: offsets_page.num_rows,
645 null_offset_adjustment: list_encoding.null_offset_adjustment,
646 num_items_referenced_by_page: list_encoding.num_items,
647 },
648 )
649 } else {
650 panic!("Expected a list column");
652 }
653 })
654 .unzip();
655 let inner = Arc::new(PrimitiveFieldScheduler::new(
656 offsets_column.index,
657 DataType::UInt64,
658 Arc::from(inner_infos.into_boxed_slice()),
659 offsets_column_buffers,
660 self.validate_data,
661 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
662 let items_field = match list_field.data_type() {
663 DataType::List(inner) => inner,
664 DataType::LargeList(inner) => inner,
665 _ => unreachable!(),
666 };
667 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
668 DataType::Int32
669 } else {
670 DataType::Int64
671 };
672 Ok(Box::new(ListFieldScheduler::new(
673 inner,
674 items_scheduler.into(),
675 items_field,
676 offset_type,
677 null_offset_adjustments,
678 )))
679 }
680
681 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
682 if let column_encoding::ColumnEncoding::Blob(blob) =
683 column_info.encoding.column_encoding.as_ref().unwrap()
684 {
685 let mut column_info = column_info.clone();
686 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
687 Some(column_info)
688 } else {
689 None
690 }
691 }
692
693 fn create_structural_field_scheduler(
694 &self,
695 field: &Field,
696 column_infos: &mut ColumnInfoIter,
697 ) -> Result<Box<dyn StructuralFieldScheduler>> {
698 let data_type = field.data_type();
699 if Self::is_structural_primitive(&data_type) {
700 let column_info = column_infos.expect_next()?;
701 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
702 column_info.as_ref(),
703 self.decompressor_strategy.as_ref(),
704 self.cache_repetition_index,
705 field,
706 )?);
707
708 column_infos.next_top_level();
710
711 return Ok(scheduler);
712 }
713 match &data_type {
714 DataType::Struct(fields) => {
715 if field.is_packed_struct() {
716 let column_info = column_infos.expect_next()?;
718 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
719 column_info.as_ref(),
720 self.decompressor_strategy.as_ref(),
721 self.cache_repetition_index,
722 field,
723 )?);
724
725 column_infos.next_top_level();
727
728 return Ok(scheduler);
729 }
730 if field.is_blob() {
732 let column_info = column_infos.peek();
733 if column_info.page_infos.iter().any(|page| {
734 matches!(
735 page.encoding,
736 PageEncoding::Structural(pb21::PageLayout {
737 layout: Some(pb21::page_layout::Layout::BlobLayout(_))
738 })
739 )
740 }) {
741 let column_info = column_infos.expect_next()?;
742 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
743 column_info.as_ref(),
744 self.decompressor_strategy.as_ref(),
745 self.cache_repetition_index,
746 field,
747 )?);
748 column_infos.next_top_level();
749 return Ok(scheduler);
750 }
751 }
752
753 let mut child_schedulers = Vec::with_capacity(field.children.len());
754 for field in field.children.iter() {
755 let field_scheduler =
756 self.create_structural_field_scheduler(field, column_infos)?;
757 child_schedulers.push(field_scheduler);
758 }
759
760 let fields = fields.clone();
761 Ok(
762 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
763 as Box<dyn StructuralFieldScheduler>,
764 )
765 }
766 DataType::List(_) | DataType::LargeList(_) => {
767 let child = field
768 .children
769 .first()
770 .expect("List field must have a child");
771 let child_scheduler =
772 self.create_structural_field_scheduler(child, column_infos)?;
773 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
774 as Box<dyn StructuralFieldScheduler>)
775 }
776 _ => todo!("create_structural_field_scheduler for {}", data_type),
777 }
778 }
779
780 fn create_legacy_field_scheduler(
781 &self,
782 field: &Field,
783 column_infos: &mut ColumnInfoIter,
784 buffers: FileBuffers,
785 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
786 let data_type = field.data_type();
787 if Self::is_primitive_legacy(&data_type) {
788 let column_info = column_infos.expect_next()?;
789 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
790 return Ok(scheduler);
791 } else if data_type.is_binary_like() {
792 let column_info = column_infos.next().unwrap().clone();
793 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
795 let desc_scheduler =
796 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
797 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
798 return Ok(blob_scheduler);
799 }
800 if let Some(page_info) = column_info.page_infos.first() {
801 if matches!(
802 page_info.encoding.as_legacy(),
803 pb::ArrayEncoding {
804 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
805 }
806 ) {
807 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
808 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
809 } else {
810 DataType::LargeList(Arc::new(ArrowField::new(
811 "item",
812 DataType::UInt8,
813 false,
814 )))
815 };
816 let list_field = Field::try_from(ArrowField::new(
817 field.name.clone(),
818 list_type,
819 field.nullable,
820 ))
821 .unwrap();
822 let list_scheduler = self.create_list_scheduler(
823 &list_field,
824 column_infos,
825 buffers,
826 &column_info,
827 )?;
828 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
829 list_scheduler.into(),
830 field.data_type(),
831 ));
832 return Ok(binary_scheduler);
833 } else {
834 let scheduler =
835 self.create_primitive_scheduler(field, &column_info, buffers)?;
836 return Ok(scheduler);
837 }
838 } else {
839 return self.create_primitive_scheduler(field, &column_info, buffers);
840 }
841 }
842 match &data_type {
843 DataType::FixedSizeList(inner, _dimension) => {
844 if Self::is_primitive_legacy(inner.data_type()) {
847 let primitive_col = column_infos.expect_next()?;
848 let scheduler =
849 self.create_primitive_scheduler(field, primitive_col, buffers)?;
850 Ok(scheduler)
851 } else {
852 todo!()
853 }
854 }
855 DataType::Dictionary(_key_type, value_type) => {
856 if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
857 let primitive_col = column_infos.expect_next()?;
858 let scheduler =
859 self.create_primitive_scheduler(field, primitive_col, buffers)?;
860 Ok(scheduler)
861 } else {
862 Err(Error::NotSupported {
863 source: format!(
864 "No way to decode into a dictionary field of type {}",
865 value_type
866 )
867 .into(),
868 location: location!(),
869 })
870 }
871 }
872 DataType::List(_) | DataType::LargeList(_) => {
873 let offsets_column = column_infos.expect_next()?.clone();
874 column_infos.next_top_level();
875 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
876 }
877 DataType::Struct(fields) => {
878 let column_info = column_infos.expect_next()?;
879
880 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
882 return self.create_primitive_scheduler(field, &blob_col, buffers);
884 }
885
886 if Self::check_packed_struct(column_info) {
887 self.create_primitive_scheduler(field, column_info, buffers)
889 } else {
890 Self::check_simple_struct(column_info, &field.name).unwrap();
892 let num_rows = column_info
893 .page_infos
894 .iter()
895 .map(|page| page.num_rows)
896 .sum();
897 let mut child_schedulers = Vec::with_capacity(field.children.len());
898 for field in &field.children {
899 column_infos.next_top_level();
900 let field_scheduler =
901 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
902 child_schedulers.push(Arc::from(field_scheduler));
903 }
904
905 let fields = fields.clone();
906 Ok(Box::new(SimpleStructScheduler::new(
907 child_schedulers,
908 fields,
909 num_rows,
910 )))
911 }
912 }
913 _ => todo!(),
915 }
916 }
917}
918
919fn root_column(num_rows: u64) -> ColumnInfo {
921 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
922 let final_page_num_rows = num_rows % (u32::MAX as u64);
923 let root_pages = (0..num_root_pages)
924 .map(|i| PageInfo {
925 num_rows: if i == num_root_pages - 1 {
926 final_page_num_rows
927 } else {
928 u64::MAX
929 },
930 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
931 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
932 pb::SimpleStruct {},
933 )),
934 }),
935 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
937 })
938 .collect::<Vec<_>>();
939 ColumnInfo {
940 buffer_offsets_and_sizes: Arc::new([]),
941 encoding: pb::ColumnEncoding {
942 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
943 },
944 index: u32::MAX,
945 page_infos: Arc::from(root_pages),
946 }
947}
948
949pub enum RootDecoder {
950 Structural(StructuralStructDecoder),
951 Legacy(SimpleStructDecoder),
952}
953
954impl RootDecoder {
955 pub fn into_structural(self) -> StructuralStructDecoder {
956 match self {
957 Self::Structural(decoder) => decoder,
958 Self::Legacy(_) => panic!("Expected a structural decoder"),
959 }
960 }
961
962 pub fn into_legacy(self) -> SimpleStructDecoder {
963 match self {
964 Self::Legacy(decoder) => decoder,
965 Self::Structural(_) => panic!("Expected a legacy decoder"),
966 }
967 }
968}
969
970impl DecodeBatchScheduler {
971 #[allow(clippy::too_many_arguments)]
974 pub async fn try_new<'a>(
975 schema: &'a Schema,
976 column_indices: &[u32],
977 column_infos: &[Arc<ColumnInfo>],
978 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
979 num_rows: u64,
980 _decoder_plugins: Arc<DecoderPlugins>,
981 io: Arc<dyn EncodingsIo>,
982 cache: Arc<LanceCache>,
983 filter: &FilterExpression,
984 decoder_config: &DecoderConfig,
985 ) -> Result<Self> {
986 assert!(num_rows > 0);
987 let buffers = FileBuffers {
988 positions_and_sizes: file_buffer_positions_and_sizes,
989 };
990 let arrow_schema = ArrowSchema::from(schema);
991 let root_fields = arrow_schema.fields().clone();
992 let root_type = DataType::Struct(root_fields.clone());
993 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
994 root_field.children.clone_from(&schema.fields);
998 root_field
999 .metadata
1000 .insert("__lance_decoder_root".to_string(), "true".to_string());
1001
1002 if column_infos.is_empty() || column_infos[0].is_structural() {
1003 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1004
1005 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1006 let mut root_scheduler =
1007 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1008
1009 let context = SchedulerContext::new(io, cache.clone());
1010 root_scheduler.initialize(filter, &context).await?;
1011
1012 Ok(Self {
1013 root_scheduler: RootScheduler::Structural(root_scheduler),
1014 root_fields,
1015 cache,
1016 })
1017 } else {
1018 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1021 columns.push(Arc::new(root_column(num_rows)));
1022 columns.extend(column_infos.iter().cloned());
1023
1024 let adjusted_column_indices = [0_u32]
1025 .into_iter()
1026 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1027 .collect::<Vec<_>>();
1028 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1029 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1030 let root_scheduler =
1031 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1032
1033 let context = SchedulerContext::new(io, cache.clone());
1034 root_scheduler.initialize(filter, &context).await?;
1035
1036 Ok(Self {
1037 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1038 root_fields,
1039 cache,
1040 })
1041 }
1042 }
1043
1044 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1045 pub fn from_scheduler(
1046 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1047 root_fields: Fields,
1048 cache: Arc<LanceCache>,
1049 ) -> Self {
1050 Self {
1051 root_scheduler: RootScheduler::Legacy(root_scheduler),
1052 root_fields,
1053 cache,
1054 }
1055 }
1056
1057 fn do_schedule_ranges_structural(
1058 &mut self,
1059 ranges: &[Range<u64>],
1060 filter: &FilterExpression,
1061 io: Arc<dyn EncodingsIo>,
1062 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1063 ) {
1064 let root_scheduler = self.root_scheduler.as_structural();
1065 let mut context = SchedulerContext::new(io, self.cache.clone());
1066 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1067 if let Err(schedule_ranges_err) = maybe_root_job {
1068 schedule_action(Err(schedule_ranges_err));
1069 return;
1070 }
1071 let mut root_job = maybe_root_job.unwrap();
1072 let mut num_rows_scheduled = 0;
1073 loop {
1074 let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1075 if let Err(err) = maybe_next_scan_lines {
1076 schedule_action(Err(err));
1077 return;
1078 }
1079 let next_scan_lines = maybe_next_scan_lines.unwrap();
1080 if next_scan_lines.is_empty() {
1081 return;
1082 }
1083 for next_scan_line in next_scan_lines {
1084 trace!(
1085 "Scheduled scan line of {} rows and {} decoders",
1086 next_scan_line.rows_scheduled,
1087 next_scan_line.decoders.len()
1088 );
1089 num_rows_scheduled += next_scan_line.rows_scheduled;
1090 if !schedule_action(Ok(DecoderMessage {
1091 scheduled_so_far: num_rows_scheduled,
1092 decoders: next_scan_line.decoders,
1093 })) {
1094 return;
1096 }
1097 }
1098 }
1099 }
1100
1101 fn do_schedule_ranges_legacy(
1102 &mut self,
1103 ranges: &[Range<u64>],
1104 filter: &FilterExpression,
1105 io: Arc<dyn EncodingsIo>,
1106 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1107 priority: Option<Box<dyn PriorityRange>>,
1111 ) {
1112 let root_scheduler = self.root_scheduler.as_legacy();
1113 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1114 trace!(
1115 "Scheduling {} ranges across {}..{} ({} rows){}",
1116 ranges.len(),
1117 ranges.first().unwrap().start,
1118 ranges.last().unwrap().end,
1119 rows_requested,
1120 priority
1121 .as_ref()
1122 .map(|p| format!(" (priority={:?})", p))
1123 .unwrap_or_default()
1124 );
1125
1126 let mut context = SchedulerContext::new(io, self.cache.clone());
1127 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1128 if let Err(schedule_ranges_err) = maybe_root_job {
1129 schedule_action(Err(schedule_ranges_err));
1130 return;
1131 }
1132 let mut root_job = maybe_root_job.unwrap();
1133 let mut num_rows_scheduled = 0;
1134 let mut rows_to_schedule = root_job.num_rows();
1135 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1136 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1137 while rows_to_schedule > 0 {
1138 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1139 if let Err(schedule_next_err) = maybe_next_scan_line {
1140 schedule_action(Err(schedule_next_err));
1141 return;
1142 }
1143 let next_scan_line = maybe_next_scan_line.unwrap();
1144 priority.advance(next_scan_line.rows_scheduled);
1145 num_rows_scheduled += next_scan_line.rows_scheduled;
1146 rows_to_schedule -= next_scan_line.rows_scheduled;
1147 trace!(
1148 "Scheduled scan line of {} rows and {} decoders",
1149 next_scan_line.rows_scheduled,
1150 next_scan_line.decoders.len()
1151 );
1152 if !schedule_action(Ok(DecoderMessage {
1153 scheduled_so_far: num_rows_scheduled,
1154 decoders: next_scan_line.decoders,
1155 })) {
1156 return;
1158 }
1159
1160 trace!("Finished scheduling {} ranges", ranges.len());
1161 }
1162 }
1163
1164 fn do_schedule_ranges(
1165 &mut self,
1166 ranges: &[Range<u64>],
1167 filter: &FilterExpression,
1168 io: Arc<dyn EncodingsIo>,
1169 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1170 priority: Option<Box<dyn PriorityRange>>,
1174 ) {
1175 match &self.root_scheduler {
1176 RootScheduler::Legacy(_) => {
1177 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1178 }
1179 RootScheduler::Structural(_) => {
1180 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1181 }
1182 }
1183 }
1184
1185 pub fn schedule_ranges_to_vec(
1188 &mut self,
1189 ranges: &[Range<u64>],
1190 filter: &FilterExpression,
1191 io: Arc<dyn EncodingsIo>,
1192 priority: Option<Box<dyn PriorityRange>>,
1193 ) -> Result<Vec<DecoderMessage>> {
1194 let mut decode_messages = Vec::new();
1195 self.do_schedule_ranges(
1196 ranges,
1197 filter,
1198 io,
1199 |msg| {
1200 decode_messages.push(msg);
1201 true
1202 },
1203 priority,
1204 );
1205 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1206 }
1207
1208 #[instrument(skip_all)]
1218 pub fn schedule_ranges(
1219 &mut self,
1220 ranges: &[Range<u64>],
1221 filter: &FilterExpression,
1222 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1223 scheduler: Arc<dyn EncodingsIo>,
1224 ) {
1225 self.do_schedule_ranges(
1226 ranges,
1227 filter,
1228 scheduler,
1229 |msg| {
1230 match sink.send(msg) {
1231 Ok(_) => true,
1232 Err(SendError { .. }) => {
1233 debug!(
1236 "schedule_ranges aborting early since decoder appears to have been dropped"
1237 );
1238 false
1239 }
1240 }
1241 },
1242 None,
1243 )
1244 }
1245
1246 #[instrument(skip_all)]
1254 pub fn schedule_range(
1255 &mut self,
1256 range: Range<u64>,
1257 filter: &FilterExpression,
1258 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1259 scheduler: Arc<dyn EncodingsIo>,
1260 ) {
1261 self.schedule_ranges(&[range], filter, sink, scheduler)
1262 }
1263
1264 pub fn schedule_take(
1272 &mut self,
1273 indices: &[u64],
1274 filter: &FilterExpression,
1275 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1276 scheduler: Arc<dyn EncodingsIo>,
1277 ) {
1278 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1279 if indices.is_empty() {
1280 return;
1281 }
1282 trace!("Scheduling take of {} rows", indices.len());
1283 let ranges = Self::indices_to_ranges(indices);
1284 self.schedule_ranges(&ranges, filter, sink, scheduler)
1285 }
1286
1287 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1289 let mut ranges = Vec::new();
1290 let mut start = indices[0];
1291
1292 for window in indices.windows(2) {
1293 if window[1] != window[0] + 1 {
1294 ranges.push(start..window[0] + 1);
1295 start = window[1];
1296 }
1297 }
1298
1299 ranges.push(start..*indices.last().unwrap() + 1);
1300 ranges
1301 }
1302}
1303
1304pub struct ReadBatchTask {
1305 pub task: BoxFuture<'static, Result<RecordBatch>>,
1306 pub num_rows: u32,
1307}
1308
1309pub struct BatchDecodeStream {
1311 context: DecoderContext,
1312 root_decoder: SimpleStructDecoder,
1313 rows_remaining: u64,
1314 rows_per_batch: u32,
1315 rows_scheduled: u64,
1316 rows_drained: u64,
1317 scheduler_exhausted: bool,
1318 emitted_batch_size_warning: Arc<Once>,
1319}
1320
1321impl BatchDecodeStream {
1322 pub fn new(
1333 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1334 rows_per_batch: u32,
1335 num_rows: u64,
1336 root_decoder: SimpleStructDecoder,
1337 ) -> Self {
1338 Self {
1339 context: DecoderContext::new(scheduled),
1340 root_decoder,
1341 rows_remaining: num_rows,
1342 rows_per_batch,
1343 rows_scheduled: 0,
1344 rows_drained: 0,
1345 scheduler_exhausted: false,
1346 emitted_batch_size_warning: Arc::new(Once::new()),
1347 }
1348 }
1349
1350 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1351 if decoder.path.is_empty() {
1352 Ok(())
1354 } else {
1355 self.root_decoder.accept_child(decoder)
1356 }
1357 }
1358
1359 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1360 if self.scheduler_exhausted {
1361 return Ok(self.rows_scheduled);
1362 }
1363 while self.rows_scheduled < scheduled_need {
1364 let next_message = self.context.source.recv().await;
1365 match next_message {
1366 Some(scan_line) => {
1367 let scan_line = scan_line?;
1368 self.rows_scheduled = scan_line.scheduled_so_far;
1369 for message in scan_line.decoders {
1370 self.accept_decoder(message.into_legacy())?;
1371 }
1372 }
1373 None => {
1374 self.scheduler_exhausted = true;
1378 return Ok(self.rows_scheduled);
1379 }
1380 }
1381 }
1382 Ok(scheduled_need)
1383 }
1384
1385 #[instrument(level = "debug", skip_all)]
1386 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1387 trace!(
1388 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1389 self.rows_remaining,
1390 self.rows_drained,
1391 self.rows_scheduled,
1392 );
1393 if self.rows_remaining == 0 {
1394 return Ok(None);
1395 }
1396
1397 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1398 self.rows_remaining -= to_take;
1399
1400 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1401 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1402 if scheduled_need > 0 {
1403 let desired_scheduled = scheduled_need + self.rows_scheduled;
1404 trace!(
1405 "Draining from scheduler (desire at least {} scheduled rows)",
1406 desired_scheduled
1407 );
1408 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1409 if actually_scheduled < desired_scheduled {
1410 let under_scheduled = desired_scheduled - actually_scheduled;
1411 to_take -= under_scheduled;
1412 }
1413 }
1414
1415 if to_take == 0 {
1416 return Ok(None);
1417 }
1418
1419 let loaded_need = self.rows_drained + to_take - 1;
1421 trace!(
1422 "Waiting for I/O (desire at least {} fully loaded rows)",
1423 loaded_need
1424 );
1425 self.root_decoder.wait_for_loaded(loaded_need).await?;
1426
1427 let next_task = self.root_decoder.drain(to_take)?;
1428 self.rows_drained += to_take;
1429 Ok(Some(next_task))
1430 }
1431
1432 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1433 let stream = futures::stream::unfold(self, |mut slf| async move {
1434 let next_task = slf.next_batch_task().await;
1435 let next_task = next_task.transpose().map(|next_task| {
1436 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1437 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1438 let task = async move {
1439 let next_task = next_task?;
1440 tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1444 .await
1445 .map_err(|err| Error::Wrapped {
1446 error: err.into(),
1447 location: location!(),
1448 })?
1449 };
1450 (task, num_rows)
1451 });
1452 next_task.map(|(task, num_rows)| {
1453 debug_assert!(num_rows <= u32::MAX as u64);
1455 let next_task = ReadBatchTask {
1456 task: task.boxed(),
1457 num_rows: num_rows as u32,
1458 };
1459 (next_task, slf)
1460 })
1461 });
1462 stream.boxed()
1463 }
1464}
1465
1466enum RootDecoderMessage {
1469 LoadedPage(LoadedPageShard),
1470 LegacyPage(crate::previous::decoder::DecoderReady),
1471}
1472trait RootDecoderType {
1473 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1474 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1475 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1476}
1477impl RootDecoderType for StructuralStructDecoder {
1478 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1479 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1480 unreachable!()
1481 };
1482 self.accept_page(loaded_page)
1483 }
1484 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1485 self.drain_batch_task(num_rows)
1486 }
1487 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1488 Ok(())
1490 }
1491}
1492impl RootDecoderType for SimpleStructDecoder {
1493 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1494 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1495 unreachable!()
1496 };
1497 self.accept_child(legacy_page)
1498 }
1499 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1500 self.drain(num_rows)
1501 }
1502 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1503 runtime.block_on(self.wait_for_loaded(loaded_need))
1504 }
1505}
1506
1507struct BatchDecodeIterator<T: RootDecoderType> {
1509 messages: VecDeque<Result<DecoderMessage>>,
1510 root_decoder: T,
1511 rows_remaining: u64,
1512 rows_per_batch: u32,
1513 rows_scheduled: u64,
1514 rows_drained: u64,
1515 emitted_batch_size_warning: Arc<Once>,
1516 wait_for_io_runtime: tokio::runtime::Runtime,
1520 schema: Arc<ArrowSchema>,
1521}
1522
1523impl<T: RootDecoderType> BatchDecodeIterator<T> {
1524 pub fn new(
1526 messages: VecDeque<Result<DecoderMessage>>,
1527 rows_per_batch: u32,
1528 num_rows: u64,
1529 root_decoder: T,
1530 schema: Arc<ArrowSchema>,
1531 ) -> Self {
1532 Self {
1533 messages,
1534 root_decoder,
1535 rows_remaining: num_rows,
1536 rows_per_batch,
1537 rows_scheduled: 0,
1538 rows_drained: 0,
1539 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1540 .build()
1541 .unwrap(),
1542 emitted_batch_size_warning: Arc::new(Once::new()),
1543 schema,
1544 }
1545 }
1546
1547 fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1552 match maybe_done(unloaded_page.0) {
1553 MaybeDone::Done(loaded_page) => loaded_page,
1555 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1557 MaybeDone::Gone => unreachable!(),
1558 }
1559 }
1560
1561 #[instrument(skip_all)]
1566 fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1567 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1568 let message = self.messages.pop_front().unwrap()?;
1569 self.rows_scheduled = message.scheduled_so_far;
1570 for decoder_message in message.decoders {
1571 match decoder_message {
1572 MessageType::UnloadedPage(unloaded_page) => {
1573 let loaded_page = self.wait_for_page(unloaded_page)?;
1574 self.root_decoder
1575 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1576 }
1577 MessageType::DecoderReady(decoder_ready) => {
1578 if !decoder_ready.path.is_empty() {
1580 self.root_decoder
1581 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1582 }
1583 }
1584 }
1585 }
1586 }
1587
1588 let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1589
1590 self.root_decoder
1591 .wait(loaded_need, &self.wait_for_io_runtime)?;
1592 Ok(self.rows_scheduled)
1593 }
1594
1595 #[instrument(level = "debug", skip_all)]
1596 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1597 trace!(
1598 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1599 self.rows_remaining,
1600 self.rows_drained,
1601 self.rows_scheduled,
1602 );
1603 if self.rows_remaining == 0 {
1604 return Ok(None);
1605 }
1606
1607 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1608 self.rows_remaining -= to_take;
1609
1610 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1611 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1612 if scheduled_need > 0 {
1613 let desired_scheduled = scheduled_need + self.rows_scheduled;
1614 trace!(
1615 "Draining from scheduler (desire at least {} scheduled rows)",
1616 desired_scheduled
1617 );
1618 let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1619 if actually_scheduled < desired_scheduled {
1620 let under_scheduled = desired_scheduled - actually_scheduled;
1621 to_take -= under_scheduled;
1622 }
1623 }
1624
1625 if to_take == 0 {
1626 return Ok(None);
1627 }
1628
1629 let next_task = self.root_decoder.drain_batch(to_take)?;
1630
1631 self.rows_drained += to_take;
1632
1633 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1634
1635 Ok(Some(batch))
1636 }
1637}
1638
1639impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1640 type Item = ArrowResult<RecordBatch>;
1641
1642 fn next(&mut self) -> Option<Self::Item> {
1643 self.next_batch_task()
1644 .transpose()
1645 .map(|r| r.map_err(ArrowError::from))
1646 }
1647}
1648
1649impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1650 fn schema(&self) -> Arc<ArrowSchema> {
1651 self.schema.clone()
1652 }
1653}
1654
1655pub struct StructuralBatchDecodeStream {
1657 context: DecoderContext,
1658 root_decoder: StructuralStructDecoder,
1659 rows_remaining: u64,
1660 rows_per_batch: u32,
1661 rows_scheduled: u64,
1662 rows_drained: u64,
1663 scheduler_exhausted: bool,
1664 emitted_batch_size_warning: Arc<Once>,
1665}
1666
1667impl StructuralBatchDecodeStream {
1668 pub fn new(
1679 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1680 rows_per_batch: u32,
1681 num_rows: u64,
1682 root_decoder: StructuralStructDecoder,
1683 ) -> Self {
1684 Self {
1685 context: DecoderContext::new(scheduled),
1686 root_decoder,
1687 rows_remaining: num_rows,
1688 rows_per_batch,
1689 rows_scheduled: 0,
1690 rows_drained: 0,
1691 scheduler_exhausted: false,
1692 emitted_batch_size_warning: Arc::new(Once::new()),
1693 }
1694 }
1695
1696 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1697 if self.scheduler_exhausted {
1698 return Ok(self.rows_scheduled);
1699 }
1700 while self.rows_scheduled < scheduled_need {
1701 let next_message = self.context.source.recv().await;
1702 match next_message {
1703 Some(scan_line) => {
1704 let scan_line = scan_line?;
1705 self.rows_scheduled = scan_line.scheduled_so_far;
1706 for message in scan_line.decoders {
1707 let unloaded_page = message.into_structural();
1708 let loaded_page = unloaded_page.0.await?;
1709 self.root_decoder.accept_page(loaded_page)?;
1710 }
1711 }
1712 None => {
1713 self.scheduler_exhausted = true;
1717 return Ok(self.rows_scheduled);
1718 }
1719 }
1720 }
1721 Ok(scheduled_need)
1722 }
1723
1724 #[instrument(level = "debug", skip_all)]
1725 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1726 trace!(
1727 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1728 self.rows_remaining,
1729 self.rows_drained,
1730 self.rows_scheduled,
1731 );
1732 if self.rows_remaining == 0 {
1733 return Ok(None);
1734 }
1735
1736 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1737 self.rows_remaining -= to_take;
1738
1739 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1740 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1741 if scheduled_need > 0 {
1742 let desired_scheduled = scheduled_need + self.rows_scheduled;
1743 trace!(
1744 "Draining from scheduler (desire at least {} scheduled rows)",
1745 desired_scheduled
1746 );
1747 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1748 if actually_scheduled < desired_scheduled {
1749 let under_scheduled = desired_scheduled - actually_scheduled;
1750 to_take -= under_scheduled;
1751 }
1752 }
1753
1754 if to_take == 0 {
1755 return Ok(None);
1756 }
1757
1758 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1759 self.rows_drained += to_take;
1760 Ok(Some(next_task))
1761 }
1762
1763 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1764 let stream = futures::stream::unfold(self, |mut slf| async move {
1765 let next_task = slf.next_batch_task().await;
1766 let next_task = next_task.transpose().map(|next_task| {
1767 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1768 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1769 let task = async move {
1770 let next_task = next_task?;
1771 tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1775 .await
1776 .map_err(|err| Error::Wrapped {
1777 error: err.into(),
1778 location: location!(),
1779 })?
1780 };
1781 (task, num_rows)
1782 });
1783 next_task.map(|(task, num_rows)| {
1784 debug_assert!(num_rows <= u32::MAX as u64);
1786 let next_task = ReadBatchTask {
1787 task: task.boxed(),
1788 num_rows: num_rows as u32,
1789 };
1790 (next_task, slf)
1791 })
1792 });
1793 stream.boxed()
1794 }
1795}
1796
1797#[derive(Debug)]
1798pub enum RequestedRows {
1799 Ranges(Vec<Range<u64>>),
1800 Indices(Vec<u64>),
1801}
1802
1803impl RequestedRows {
1804 pub fn num_rows(&self) -> u64 {
1805 match self {
1806 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1807 Self::Indices(indices) => indices.len() as u64,
1808 }
1809 }
1810
1811 pub fn trim_empty_ranges(mut self) -> Self {
1812 if let Self::Ranges(ranges) = &mut self {
1813 ranges.retain(|r| !r.is_empty());
1814 }
1815 self
1816 }
1817}
1818
1819#[derive(Debug, Clone, Default)]
1821pub struct DecoderConfig {
1822 pub cache_repetition_index: bool,
1824 pub validate_on_decode: bool,
1826}
1827
1828#[derive(Debug, Clone)]
1829pub struct SchedulerDecoderConfig {
1830 pub decoder_plugins: Arc<DecoderPlugins>,
1831 pub batch_size: u32,
1832 pub io: Arc<dyn EncodingsIo>,
1833 pub cache: Arc<LanceCache>,
1834 pub decoder_config: DecoderConfig,
1836}
1837
1838fn check_scheduler_on_drop(
1839 stream: BoxStream<'static, ReadBatchTask>,
1840 scheduler_handle: tokio::task::JoinHandle<()>,
1841) -> BoxStream<'static, ReadBatchTask> {
1842 let mut scheduler_handle = Some(scheduler_handle);
1846 let check_scheduler = stream::unfold((), move |_| {
1847 let handle = scheduler_handle.take();
1848 async move {
1849 if let Some(handle) = handle {
1850 handle.await.unwrap();
1851 }
1852 None
1853 }
1854 });
1855 stream.chain(check_scheduler).boxed()
1856}
1857
1858pub fn create_decode_stream(
1859 schema: &Schema,
1860 num_rows: u64,
1861 batch_size: u32,
1862 is_structural: bool,
1863 should_validate: bool,
1864 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1865) -> BoxStream<'static, ReadBatchTask> {
1866 if is_structural {
1867 let arrow_schema = ArrowSchema::from(schema);
1868 let structural_decoder = StructuralStructDecoder::new(
1869 arrow_schema.fields,
1870 should_validate,
1871 true,
1872 );
1873 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1874 } else {
1875 let arrow_schema = ArrowSchema::from(schema);
1876 let root_fields = arrow_schema.fields;
1877
1878 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1879 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1880 }
1881}
1882
1883pub fn create_decode_iterator(
1887 schema: &Schema,
1888 num_rows: u64,
1889 batch_size: u32,
1890 should_validate: bool,
1891 is_structural: bool,
1892 messages: VecDeque<Result<DecoderMessage>>,
1893) -> Box<dyn RecordBatchReader + Send + 'static> {
1894 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1895 let root_fields = arrow_schema.fields.clone();
1896 if is_structural {
1897 let simple_struct_decoder =
1898 StructuralStructDecoder::new(root_fields, should_validate, true);
1899 Box::new(BatchDecodeIterator::new(
1900 messages,
1901 batch_size,
1902 num_rows,
1903 simple_struct_decoder,
1904 arrow_schema,
1905 ))
1906 } else {
1907 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1908 Box::new(BatchDecodeIterator::new(
1909 messages,
1910 batch_size,
1911 num_rows,
1912 root_decoder,
1913 arrow_schema,
1914 ))
1915 }
1916}
1917
1918fn create_scheduler_decoder(
1919 column_infos: Vec<Arc<ColumnInfo>>,
1920 requested_rows: RequestedRows,
1921 filter: FilterExpression,
1922 column_indices: Vec<u32>,
1923 target_schema: Arc<Schema>,
1924 config: SchedulerDecoderConfig,
1925) -> Result<BoxStream<'static, ReadBatchTask>> {
1926 let num_rows = requested_rows.num_rows();
1927
1928 let is_structural = column_infos[0].is_structural();
1929
1930 let (tx, rx) = mpsc::unbounded_channel();
1931
1932 let decode_stream = create_decode_stream(
1933 &target_schema,
1934 num_rows,
1935 config.batch_size,
1936 is_structural,
1937 config.decoder_config.validate_on_decode,
1938 rx,
1939 );
1940
1941 let scheduler_handle = tokio::task::spawn(async move {
1942 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1943 target_schema.as_ref(),
1944 &column_indices,
1945 &column_infos,
1946 &vec![],
1947 num_rows,
1948 config.decoder_plugins,
1949 config.io.clone(),
1950 config.cache,
1951 &filter,
1952 &config.decoder_config,
1953 )
1954 .await
1955 {
1956 Ok(scheduler) => scheduler,
1957 Err(e) => {
1958 let _ = tx.send(Err(e));
1959 return;
1960 }
1961 };
1962
1963 match requested_rows {
1964 RequestedRows::Ranges(ranges) => {
1965 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1966 }
1967 RequestedRows::Indices(indices) => {
1968 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1969 }
1970 }
1971 });
1972
1973 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1974}
1975
1976pub fn schedule_and_decode(
1982 column_infos: Vec<Arc<ColumnInfo>>,
1983 requested_rows: RequestedRows,
1984 filter: FilterExpression,
1985 column_indices: Vec<u32>,
1986 target_schema: Arc<Schema>,
1987 config: SchedulerDecoderConfig,
1988) -> BoxStream<'static, ReadBatchTask> {
1989 if requested_rows.num_rows() == 0 {
1990 return stream::empty().boxed();
1991 }
1992
1993 let requested_rows = requested_rows.trim_empty_ranges();
1996
1997 let io = config.io.clone();
1998
1999 match create_scheduler_decoder(
2003 column_infos,
2004 requested_rows,
2005 filter,
2006 column_indices,
2007 target_schema,
2008 config,
2009 ) {
2010 Ok(stream) => stream.finally(move || drop(io)).boxed(),
2013 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2015 num_rows: 0,
2016 task: std::future::ready(Err(e)).boxed(),
2017 }))
2018 .boxed(),
2019 }
2020}
2021
2022pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2023 tokio::runtime::Builder::new_current_thread()
2024 .build()
2025 .unwrap()
2026});
2027
2028pub fn schedule_and_decode_blocking(
2043 column_infos: Vec<Arc<ColumnInfo>>,
2044 requested_rows: RequestedRows,
2045 filter: FilterExpression,
2046 column_indices: Vec<u32>,
2047 target_schema: Arc<Schema>,
2048 config: SchedulerDecoderConfig,
2049) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2050 if requested_rows.num_rows() == 0 {
2051 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2052 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2053 }
2054
2055 let num_rows = requested_rows.num_rows();
2056 let is_structural = column_infos[0].is_structural();
2057
2058 let (tx, mut rx) = mpsc::unbounded_channel();
2059
2060 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2063 target_schema.as_ref(),
2064 &column_indices,
2065 &column_infos,
2066 &vec![],
2067 num_rows,
2068 config.decoder_plugins,
2069 config.io.clone(),
2070 config.cache,
2071 &filter,
2072 &config.decoder_config,
2073 ))?;
2074
2075 match requested_rows {
2077 RequestedRows::Ranges(ranges) => {
2078 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2079 }
2080 RequestedRows::Indices(indices) => {
2081 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2082 }
2083 }
2084
2085 let mut messages = Vec::new();
2087 while rx
2088 .recv_many(&mut messages, usize::MAX)
2089 .now_or_never()
2090 .unwrap()
2091 != 0
2092 {}
2093
2094 let decode_iterator = create_decode_iterator(
2096 &target_schema,
2097 num_rows,
2098 config.batch_size,
2099 config.decoder_config.validate_on_decode,
2100 is_structural,
2101 messages.into(),
2102 );
2103
2104 Ok(decode_iterator)
2105}
2106
2107pub trait PrimitivePageDecoder: Send + Sync {
2119 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2151}
2152
2153pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2162 fn schedule_ranges(
2174 &self,
2175 ranges: &[Range<u64>],
2176 scheduler: &Arc<dyn EncodingsIo>,
2177 top_level_row: u64,
2178 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2179}
2180
2181pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2183 fn advance(&mut self, num_rows: u64);
2184 fn current_priority(&self) -> u64;
2185 fn box_clone(&self) -> Box<dyn PriorityRange>;
2186}
2187
2188#[derive(Debug)]
2191pub struct SimplePriorityRange {
2192 priority: u64,
2193}
2194
2195impl SimplePriorityRange {
2196 fn new(priority: u64) -> Self {
2197 Self { priority }
2198 }
2199}
2200
2201impl PriorityRange for SimplePriorityRange {
2202 fn advance(&mut self, num_rows: u64) {
2203 self.priority += num_rows;
2204 }
2205
2206 fn current_priority(&self) -> u64 {
2207 self.priority
2208 }
2209
2210 fn box_clone(&self) -> Box<dyn PriorityRange> {
2211 Box::new(Self {
2212 priority: self.priority,
2213 })
2214 }
2215}
2216
2217pub struct ListPriorityRange {
2230 base: Box<dyn PriorityRange>,
2231 offsets: Arc<[u64]>,
2232 cur_index_into_offsets: usize,
2233 cur_position: u64,
2234}
2235
2236impl ListPriorityRange {
2237 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2238 Self {
2239 base,
2240 offsets,
2241 cur_index_into_offsets: 0,
2242 cur_position: 0,
2243 }
2244 }
2245}
2246
2247impl std::fmt::Debug for ListPriorityRange {
2248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2249 f.debug_struct("ListPriorityRange")
2250 .field("base", &self.base)
2251 .field("offsets.len()", &self.offsets.len())
2252 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2253 .field("cur_position", &self.cur_position)
2254 .finish()
2255 }
2256}
2257
2258impl PriorityRange for ListPriorityRange {
2259 fn advance(&mut self, num_rows: u64) {
2260 self.cur_position += num_rows;
2263 let mut idx_into_offsets = self.cur_index_into_offsets;
2264 while idx_into_offsets + 1 < self.offsets.len()
2265 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2266 {
2267 idx_into_offsets += 1;
2268 }
2269 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2270 self.cur_index_into_offsets = idx_into_offsets;
2271 self.base.advance(base_rows_advanced as u64);
2272 }
2273
2274 fn current_priority(&self) -> u64 {
2275 self.base.current_priority()
2276 }
2277
2278 fn box_clone(&self) -> Box<dyn PriorityRange> {
2279 Box::new(Self {
2280 base: self.base.box_clone(),
2281 offsets: self.offsets.clone(),
2282 cur_index_into_offsets: self.cur_index_into_offsets,
2283 cur_position: self.cur_position,
2284 })
2285 }
2286}
2287
2288pub struct SchedulerContext {
2290 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2291 io: Arc<dyn EncodingsIo>,
2292 cache: Arc<LanceCache>,
2293 name: String,
2294 path: Vec<u32>,
2295 path_names: Vec<String>,
2296}
2297
2298pub struct ScopedSchedulerContext<'a> {
2299 pub context: &'a mut SchedulerContext,
2300}
2301
2302impl<'a> ScopedSchedulerContext<'a> {
2303 pub fn pop(self) -> &'a mut SchedulerContext {
2304 self.context.pop();
2305 self.context
2306 }
2307}
2308
2309impl SchedulerContext {
2310 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2311 Self {
2312 io,
2313 cache,
2314 recv: None,
2315 name: "".to_string(),
2316 path: Vec::new(),
2317 path_names: Vec::new(),
2318 }
2319 }
2320
2321 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2322 &self.io
2323 }
2324
2325 pub fn cache(&self) -> &Arc<LanceCache> {
2326 &self.cache
2327 }
2328
2329 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2330 self.path.push(index);
2331 self.path_names.push(name.to_string());
2332 ScopedSchedulerContext { context: self }
2333 }
2334
2335 pub fn pop(&mut self) {
2336 self.path.pop();
2337 self.path_names.pop();
2338 }
2339
2340 pub fn path_name(&self) -> String {
2341 let path = self.path_names.join("/");
2342 if self.recv.is_some() {
2343 format!("TEMP({}){}", self.name, path)
2344 } else {
2345 format!("ROOT{}", path)
2346 }
2347 }
2348
2349 pub fn current_path(&self) -> VecDeque<u32> {
2350 VecDeque::from_iter(self.path.iter().copied())
2351 }
2352
2353 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2354 pub fn locate_decoder(
2355 &mut self,
2356 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2357 ) -> crate::previous::decoder::DecoderReady {
2358 trace!(
2359 "Scheduling decoder of type {:?} for {:?}",
2360 decoder.data_type(),
2361 self.path,
2362 );
2363 crate::previous::decoder::DecoderReady {
2364 decoder,
2365 path: self.current_path(),
2366 }
2367 }
2368}
2369
2370pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2371
2372impl std::fmt::Debug for UnloadedPageShard {
2373 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2374 f.debug_struct("UnloadedPage").finish()
2375 }
2376}
2377
2378#[derive(Debug)]
2379pub struct ScheduledScanLine {
2380 pub rows_scheduled: u64,
2381 pub decoders: Vec<MessageType>,
2382}
2383
2384pub trait StructuralSchedulingJob: std::fmt::Debug {
2385 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2392}
2393
2394pub struct FilterExpression(pub Bytes);
2402
2403impl FilterExpression {
2404 pub fn no_filter() -> Self {
2409 Self(Bytes::new())
2410 }
2411
2412 pub fn is_noop(&self) -> bool {
2414 self.0.is_empty()
2415 }
2416}
2417
2418pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2419 fn initialize<'a>(
2420 &'a mut self,
2421 filter: &'a FilterExpression,
2422 context: &'a SchedulerContext,
2423 ) -> BoxFuture<'a, Result<()>>;
2424 fn schedule_ranges<'a>(
2425 &'a self,
2426 ranges: &[Range<u64>],
2427 filter: &FilterExpression,
2428 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2429}
2430
2431pub trait DecodeArrayTask: Send {
2433 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2435}
2436
2437impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2438 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2439 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2440 }
2441}
2442
2443pub struct NextDecodeTask {
2448 pub task: Box<dyn DecodeArrayTask>,
2450 pub num_rows: u64,
2452}
2453
2454impl NextDecodeTask {
2455 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2460 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2461 let struct_arr = self.task.decode();
2462 match struct_arr {
2463 Ok(struct_arr) => {
2464 let batch = RecordBatch::from(struct_arr.as_struct());
2465 let size_bytes = batch.get_array_memory_size() as u64;
2466 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2467 emitted_batch_size_warning.call_once(|| {
2468 let size_mb = size_bytes / 1024 / 1024;
2469 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2470 });
2471 }
2472 Ok(batch)
2473 }
2474 Err(e) => {
2475 let e = Error::Internal {
2476 message: format!("Error decoding batch: {}", e),
2477 location: location!(),
2478 };
2479 Err(e)
2480 }
2481 }
2482 }
2483}
2484
2485#[derive(Debug)]
2489pub enum MessageType {
2490 DecoderReady(crate::previous::decoder::DecoderReady),
2495 UnloadedPage(UnloadedPageShard),
2499}
2500
2501impl MessageType {
2502 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2503 match self {
2504 Self::DecoderReady(decoder) => decoder,
2505 Self::UnloadedPage(_) => {
2506 panic!("Expected DecoderReady but got UnloadedPage")
2507 }
2508 }
2509 }
2510
2511 pub fn into_structural(self) -> UnloadedPageShard {
2512 match self {
2513 Self::UnloadedPage(unloaded) => unloaded,
2514 Self::DecoderReady(_) => {
2515 panic!("Expected UnloadedPage but got DecoderReady")
2516 }
2517 }
2518 }
2519}
2520
2521pub struct DecoderMessage {
2522 pub scheduled_so_far: u64,
2523 pub decoders: Vec<MessageType>,
2524}
2525
2526pub struct DecoderContext {
2527 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2528}
2529
2530impl DecoderContext {
2531 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2532 Self { source }
2533 }
2534}
2535
2536pub struct DecodedPage {
2537 pub data: DataBlock,
2538 pub repdef: RepDefUnraveler,
2539}
2540
2541pub trait DecodePageTask: Send + std::fmt::Debug {
2542 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2544}
2545
2546pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2547 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2548 fn num_rows(&self) -> u64;
2549}
2550
2551#[derive(Debug)]
2552pub struct LoadedPageShard {
2553 pub decoder: Box<dyn StructuralPageDecoder>,
2555 pub path: VecDeque<u32>,
2574}
2575
2576pub struct DecodedArray {
2577 pub array: ArrayRef,
2578 pub repdef: CompositeRepDefUnraveler,
2579}
2580
2581pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2582 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2583}
2584
2585pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2586 fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2591 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2593 fn data_type(&self) -> &DataType;
2595}
2596
2597#[derive(Debug, Default)]
2598pub struct DecoderPlugins {}
2599
2600pub async fn decode_batch(
2602 batch: &EncodedBatch,
2603 filter: &FilterExpression,
2604 decoder_plugins: Arc<DecoderPlugins>,
2605 should_validate: bool,
2606 version: LanceFileVersion,
2607 cache: Option<Arc<LanceCache>>,
2608) -> Result<RecordBatch> {
2609 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2614 let cache = if let Some(cache) = cache {
2615 cache
2616 } else {
2617 Arc::new(lance_core::cache::LanceCache::with_capacity(
2618 128 * 1024 * 1024,
2619 ))
2620 };
2621 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2622 batch.schema.as_ref(),
2623 &batch.top_level_columns,
2624 &batch.page_table,
2625 &vec![],
2626 batch.num_rows,
2627 decoder_plugins,
2628 io_scheduler.clone(),
2629 cache,
2630 filter,
2631 &DecoderConfig::default(),
2632 )
2633 .await?;
2634 let (tx, rx) = unbounded_channel();
2635 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2636 let is_structural = version >= LanceFileVersion::V2_1;
2637 let mut decode_stream = create_decode_stream(
2638 &batch.schema,
2639 batch.num_rows,
2640 batch.num_rows as u32,
2641 is_structural,
2642 should_validate,
2643 rx,
2644 );
2645 decode_stream.next().await.unwrap().task.await
2646}
2647
2648#[cfg(test)]
2649mod tests {
2651 use super::*;
2652
2653 #[test]
2654 fn test_coalesce_indices_to_ranges_with_single_index() {
2655 let indices = vec![1];
2656 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2657 assert_eq!(ranges, vec![1..2]);
2658 }
2659
2660 #[test]
2661 fn test_coalesce_indices_to_ranges() {
2662 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2663 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2664 assert_eq!(ranges, vec![1..10]);
2665 }
2666
2667 #[test]
2668 fn test_coalesce_indices_to_ranges_with_gaps() {
2669 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2670 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2671 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2672 }
2673}