1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::format::pb21;
245use crate::previous::decoder::LogicalPageDecoder;
246use crate::previous::encodings::logical::list::OffsetPageInfo;
247use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::previous::encodings::logical::{
249 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250 primitive::PrimitiveFieldScheduler,
251};
252use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
253use crate::version::LanceFileVersion;
254use crate::{BufferScheduler, EncodingsIo};
255
256const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
258
259#[derive(Debug)]
266pub enum PageEncoding {
267 Legacy(pb::ArrayEncoding),
268 Structural(pb21::PageLayout),
269}
270
271impl PageEncoding {
272 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
273 match self {
274 Self::Legacy(enc) => enc,
275 Self::Structural(_) => panic!("Expected a legacy encoding"),
276 }
277 }
278
279 pub fn as_structural(&self) -> &pb21::PageLayout {
280 match self {
281 Self::Structural(enc) => enc,
282 Self::Legacy(_) => panic!("Expected a structural encoding"),
283 }
284 }
285
286 pub fn is_structural(&self) -> bool {
287 matches!(self, Self::Structural(_))
288 }
289}
290
291#[derive(Debug)]
295pub struct PageInfo {
296 pub num_rows: u64,
298 pub priority: u64,
302 pub encoding: PageEncoding,
304 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
306}
307
308#[derive(Debug, Clone)]
312pub struct ColumnInfo {
313 pub index: u32,
315 pub page_infos: Arc<[PageInfo]>,
317 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
319 pub encoding: pb::ColumnEncoding,
320}
321
322impl ColumnInfo {
323 pub fn new(
325 index: u32,
326 page_infos: Arc<[PageInfo]>,
327 buffer_offsets_and_sizes: Vec<(u64, u64)>,
328 encoding: pb::ColumnEncoding,
329 ) -> Self {
330 Self {
331 index,
332 page_infos,
333 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
334 encoding,
335 }
336 }
337
338 pub fn is_structural(&self) -> bool {
339 self.page_infos
340 .first()
342 .map(|page| page.encoding.is_structural())
343 .unwrap_or(false)
344 }
345}
346
347enum RootScheduler {
348 Structural(Box<dyn StructuralFieldScheduler>),
349 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
350}
351
352impl RootScheduler {
353 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
354 match self {
355 Self::Structural(_) => panic!("Expected a legacy scheduler"),
356 Self::Legacy(s) => s,
357 }
358 }
359
360 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
361 match self {
362 Self::Structural(s) => s.as_ref(),
363 Self::Legacy(_) => panic!("Expected a structural scheduler"),
364 }
365 }
366}
367
368pub struct DecodeBatchScheduler {
390 root_scheduler: RootScheduler,
391 pub root_fields: Fields,
392 cache: Arc<LanceCache>,
393}
394
395pub struct ColumnInfoIter<'a> {
396 column_infos: Vec<Arc<ColumnInfo>>,
397 column_indices: &'a [u32],
398 column_info_pos: usize,
399 column_indices_pos: usize,
400}
401
402impl<'a> ColumnInfoIter<'a> {
403 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
404 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
405 Self {
406 column_infos,
407 column_indices,
408 column_info_pos: initial_pos,
409 column_indices_pos: 0,
410 }
411 }
412
413 pub fn peek(&self) -> &Arc<ColumnInfo> {
414 &self.column_infos[self.column_info_pos]
415 }
416
417 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
418 let column_info = self.column_infos[self.column_info_pos].clone();
419 let transformed = transform(column_info);
420 self.column_infos[self.column_info_pos] = transformed;
421 }
422
423 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
424 self.next().ok_or_else(|| {
425 Error::invalid_input(
426 "there were more fields in the schema than provided column indices / infos",
427 location!(),
428 )
429 })
430 }
431
432 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
433 if self.column_info_pos < self.column_infos.len() {
434 let info = &self.column_infos[self.column_info_pos];
435 self.column_info_pos += 1;
436 Some(info)
437 } else {
438 None
439 }
440 }
441
442 pub(crate) fn next_top_level(&mut self) {
443 self.column_indices_pos += 1;
444 if self.column_indices_pos < self.column_indices.len() {
445 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
446 } else {
447 self.column_info_pos = self.column_infos.len();
448 }
449 }
450}
451
452#[derive(Clone, Copy, Debug)]
454pub struct FileBuffers<'a> {
455 pub positions_and_sizes: &'a [(u64, u64)],
456}
457
458#[derive(Clone, Copy, Debug)]
460pub struct ColumnBuffers<'a, 'b> {
461 pub file_buffers: FileBuffers<'a>,
462 pub positions_and_sizes: &'b [(u64, u64)],
463}
464
465#[derive(Clone, Copy, Debug)]
467pub struct PageBuffers<'a, 'b, 'c> {
468 pub column_buffers: ColumnBuffers<'a, 'b>,
469 pub positions_and_sizes: &'c [(u64, u64)],
470}
471
472#[derive(Debug)]
474pub struct CoreFieldDecoderStrategy {
475 pub validate_data: bool,
476 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
477 pub cache_repetition_index: bool,
478}
479
480impl Default for CoreFieldDecoderStrategy {
481 fn default() -> Self {
482 Self {
483 validate_data: false,
484 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
485 cache_repetition_index: false,
486 }
487 }
488}
489
490impl CoreFieldDecoderStrategy {
491 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
493 self.cache_repetition_index = cache_repetition_index;
494 self
495 }
496
497 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
499 Self {
500 validate_data: config.validate_on_decode,
501 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
502 cache_repetition_index: config.cache_repetition_index,
503 }
504 }
505
506 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
509 let column_encoding = column_info
510 .encoding
511 .column_encoding
512 .as_ref()
513 .ok_or_else(|| {
514 Error::invalid_input(
515 format!(
516 "the column at index {} was missing a ColumnEncoding",
517 column_info.index
518 ),
519 location!(),
520 )
521 })?;
522 if matches!(
523 column_encoding,
524 pb::column_encoding::ColumnEncoding::Values(_)
525 ) {
526 Ok(())
527 } else {
528 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
529 }
530 }
531
532 fn is_structural_primitive(data_type: &DataType) -> bool {
533 if data_type.is_primitive() {
534 true
535 } else {
536 match data_type {
537 DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
539 DataType::Boolean
540 | DataType::Null
541 | DataType::FixedSizeBinary(_)
542 | DataType::Binary
543 | DataType::LargeBinary
544 | DataType::Utf8
545 | DataType::LargeUtf8 => true,
546 DataType::FixedSizeList(inner, _) => {
547 Self::is_structural_primitive(inner.data_type())
548 }
549 _ => false,
550 }
551 }
552 }
553
554 fn is_primitive_legacy(data_type: &DataType) -> bool {
555 if data_type.is_primitive() {
556 true
557 } else {
558 match data_type {
559 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
561 DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
562 _ => false,
563 }
564 }
565 }
566
567 fn create_primitive_scheduler(
568 &self,
569 field: &Field,
570 column: &ColumnInfo,
571 buffers: FileBuffers,
572 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
573 Self::ensure_values_encoded(column, &field.name)?;
574 let column_buffers = ColumnBuffers {
576 file_buffers: buffers,
577 positions_and_sizes: &column.buffer_offsets_and_sizes,
578 };
579 Ok(Box::new(PrimitiveFieldScheduler::new(
580 column.index,
581 field.data_type(),
582 column.page_infos.clone(),
583 column_buffers,
584 self.validate_data,
585 )))
586 }
587
588 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
590 Self::ensure_values_encoded(column_info, field_name)?;
591 if column_info.page_infos.len() != 1 {
592 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
593 }
594 let encoding = &column_info.page_infos[0].encoding;
595 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
596 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
597 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
598 }
599 }
600
601 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
602 let encoding = &column_info.page_infos[0].encoding;
603 matches!(
604 encoding.as_legacy().array_encoding.as_ref().unwrap(),
605 pb::array_encoding::ArrayEncoding::PackedStruct(_)
606 )
607 }
608
609 fn create_list_scheduler(
610 &self,
611 list_field: &Field,
612 column_infos: &mut ColumnInfoIter,
613 buffers: FileBuffers,
614 offsets_column: &ColumnInfo,
615 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
616 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
617 let offsets_column_buffers = ColumnBuffers {
618 file_buffers: buffers,
619 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
620 };
621 let items_scheduler =
622 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
623
624 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
625 .page_infos
626 .iter()
627 .filter(|offsets_page| offsets_page.num_rows > 0)
628 .map(|offsets_page| {
629 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
630 &offsets_page.encoding.as_legacy().array_encoding
631 {
632 let inner = PageInfo {
633 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
634 encoding: PageEncoding::Legacy(
635 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
636 ),
637 num_rows: offsets_page.num_rows,
638 priority: 0,
639 };
640 (
641 inner,
642 OffsetPageInfo {
643 offsets_in_page: offsets_page.num_rows,
644 null_offset_adjustment: list_encoding.null_offset_adjustment,
645 num_items_referenced_by_page: list_encoding.num_items,
646 },
647 )
648 } else {
649 panic!("Expected a list column");
651 }
652 })
653 .unzip();
654 let inner = Arc::new(PrimitiveFieldScheduler::new(
655 offsets_column.index,
656 DataType::UInt64,
657 Arc::from(inner_infos.into_boxed_slice()),
658 offsets_column_buffers,
659 self.validate_data,
660 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
661 let items_field = match list_field.data_type() {
662 DataType::List(inner) => inner,
663 DataType::LargeList(inner) => inner,
664 _ => unreachable!(),
665 };
666 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
667 DataType::Int32
668 } else {
669 DataType::Int64
670 };
671 Ok(Box::new(ListFieldScheduler::new(
672 inner,
673 items_scheduler.into(),
674 items_field,
675 offset_type,
676 null_offset_adjustments,
677 )))
678 }
679
680 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
681 if let column_encoding::ColumnEncoding::Blob(blob) =
682 column_info.encoding.column_encoding.as_ref().unwrap()
683 {
684 let mut column_info = column_info.clone();
685 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
686 Some(column_info)
687 } else {
688 None
689 }
690 }
691
692 fn create_structural_field_scheduler(
693 &self,
694 field: &Field,
695 column_infos: &mut ColumnInfoIter,
696 ) -> Result<Box<dyn StructuralFieldScheduler>> {
697 let data_type = field.data_type();
698 if Self::is_structural_primitive(&data_type) {
699 let column_info = column_infos.expect_next()?;
700 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
701 column_info.as_ref(),
702 self.decompressor_strategy.as_ref(),
703 self.cache_repetition_index,
704 field,
705 )?);
706
707 column_infos.next_top_level();
709
710 return Ok(scheduler);
711 }
712 match &data_type {
713 DataType::Struct(fields) => {
714 if field.is_packed_struct() {
715 let column_info = column_infos.expect_next()?;
717 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
718 column_info.as_ref(),
719 self.decompressor_strategy.as_ref(),
720 self.cache_repetition_index,
721 field,
722 )?);
723
724 column_infos.next_top_level();
726
727 return Ok(scheduler);
728 }
729 if field.is_blob() {
731 let column_info = column_infos.peek();
732 if column_info.page_infos.iter().any(|page| {
733 matches!(
734 page.encoding,
735 PageEncoding::Structural(pb21::PageLayout {
736 layout: Some(pb21::page_layout::Layout::BlobLayout(_))
737 })
738 )
739 }) {
740 let column_info = column_infos.expect_next()?;
741 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
742 column_info.as_ref(),
743 self.decompressor_strategy.as_ref(),
744 self.cache_repetition_index,
745 field,
746 )?);
747 column_infos.next_top_level();
748 return Ok(scheduler);
749 }
750 }
751
752 let mut child_schedulers = Vec::with_capacity(field.children.len());
753 for field in field.children.iter() {
754 let field_scheduler =
755 self.create_structural_field_scheduler(field, column_infos)?;
756 child_schedulers.push(field_scheduler);
757 }
758
759 let fields = fields.clone();
760 Ok(
761 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
762 as Box<dyn StructuralFieldScheduler>,
763 )
764 }
765 DataType::List(_) | DataType::LargeList(_) => {
766 let child = field
767 .children
768 .first()
769 .expect("List field must have a child");
770 let child_scheduler =
771 self.create_structural_field_scheduler(child, column_infos)?;
772 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
773 as Box<dyn StructuralFieldScheduler>)
774 }
775 _ => todo!("create_structural_field_scheduler for {}", data_type),
776 }
777 }
778
779 fn create_legacy_field_scheduler(
780 &self,
781 field: &Field,
782 column_infos: &mut ColumnInfoIter,
783 buffers: FileBuffers,
784 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
785 let data_type = field.data_type();
786 if Self::is_primitive_legacy(&data_type) {
787 let column_info = column_infos.expect_next()?;
788 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
789 return Ok(scheduler);
790 } else if data_type.is_binary_like() {
791 let column_info = column_infos.next().unwrap().clone();
792 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
794 let desc_scheduler =
795 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
796 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
797 return Ok(blob_scheduler);
798 }
799 if let Some(page_info) = column_info.page_infos.first() {
800 if matches!(
801 page_info.encoding.as_legacy(),
802 pb::ArrayEncoding {
803 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
804 }
805 ) {
806 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
807 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
808 } else {
809 DataType::LargeList(Arc::new(ArrowField::new(
810 "item",
811 DataType::UInt8,
812 false,
813 )))
814 };
815 let list_field = Field::try_from(ArrowField::new(
816 field.name.clone(),
817 list_type,
818 field.nullable,
819 ))
820 .unwrap();
821 let list_scheduler = self.create_list_scheduler(
822 &list_field,
823 column_infos,
824 buffers,
825 &column_info,
826 )?;
827 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
828 list_scheduler.into(),
829 field.data_type(),
830 ));
831 return Ok(binary_scheduler);
832 } else {
833 let scheduler =
834 self.create_primitive_scheduler(field, &column_info, buffers)?;
835 return Ok(scheduler);
836 }
837 } else {
838 return self.create_primitive_scheduler(field, &column_info, buffers);
839 }
840 }
841 match &data_type {
842 DataType::FixedSizeList(inner, _dimension) => {
843 if Self::is_primitive_legacy(inner.data_type()) {
846 let primitive_col = column_infos.expect_next()?;
847 let scheduler =
848 self.create_primitive_scheduler(field, primitive_col, buffers)?;
849 Ok(scheduler)
850 } else {
851 todo!()
852 }
853 }
854 DataType::Dictionary(_key_type, value_type) => {
855 if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
856 let primitive_col = column_infos.expect_next()?;
857 let scheduler =
858 self.create_primitive_scheduler(field, primitive_col, buffers)?;
859 Ok(scheduler)
860 } else {
861 Err(Error::NotSupported {
862 source: format!(
863 "No way to decode into a dictionary field of type {}",
864 value_type
865 )
866 .into(),
867 location: location!(),
868 })
869 }
870 }
871 DataType::List(_) | DataType::LargeList(_) => {
872 let offsets_column = column_infos.expect_next()?.clone();
873 column_infos.next_top_level();
874 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
875 }
876 DataType::Struct(fields) => {
877 let column_info = column_infos.expect_next()?;
878
879 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
881 return self.create_primitive_scheduler(field, &blob_col, buffers);
883 }
884
885 if Self::check_packed_struct(column_info) {
886 self.create_primitive_scheduler(field, column_info, buffers)
888 } else {
889 Self::check_simple_struct(column_info, &field.name).unwrap();
891 let num_rows = column_info
892 .page_infos
893 .iter()
894 .map(|page| page.num_rows)
895 .sum();
896 let mut child_schedulers = Vec::with_capacity(field.children.len());
897 for field in &field.children {
898 column_infos.next_top_level();
899 let field_scheduler =
900 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
901 child_schedulers.push(Arc::from(field_scheduler));
902 }
903
904 let fields = fields.clone();
905 Ok(Box::new(SimpleStructScheduler::new(
906 child_schedulers,
907 fields,
908 num_rows,
909 )))
910 }
911 }
912 _ => todo!(),
914 }
915 }
916}
917
918fn root_column(num_rows: u64) -> ColumnInfo {
920 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
921 let final_page_num_rows = num_rows % (u32::MAX as u64);
922 let root_pages = (0..num_root_pages)
923 .map(|i| PageInfo {
924 num_rows: if i == num_root_pages - 1 {
925 final_page_num_rows
926 } else {
927 u64::MAX
928 },
929 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
930 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
931 pb::SimpleStruct {},
932 )),
933 }),
934 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
936 })
937 .collect::<Vec<_>>();
938 ColumnInfo {
939 buffer_offsets_and_sizes: Arc::new([]),
940 encoding: pb::ColumnEncoding {
941 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
942 },
943 index: u32::MAX,
944 page_infos: Arc::from(root_pages),
945 }
946}
947
948pub enum RootDecoder {
949 Structural(StructuralStructDecoder),
950 Legacy(SimpleStructDecoder),
951}
952
953impl RootDecoder {
954 pub fn into_structural(self) -> StructuralStructDecoder {
955 match self {
956 Self::Structural(decoder) => decoder,
957 Self::Legacy(_) => panic!("Expected a structural decoder"),
958 }
959 }
960
961 pub fn into_legacy(self) -> SimpleStructDecoder {
962 match self {
963 Self::Legacy(decoder) => decoder,
964 Self::Structural(_) => panic!("Expected a legacy decoder"),
965 }
966 }
967}
968
969impl DecodeBatchScheduler {
970 #[allow(clippy::too_many_arguments)]
973 pub async fn try_new<'a>(
974 schema: &'a Schema,
975 column_indices: &[u32],
976 column_infos: &[Arc<ColumnInfo>],
977 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
978 num_rows: u64,
979 _decoder_plugins: Arc<DecoderPlugins>,
980 io: Arc<dyn EncodingsIo>,
981 cache: Arc<LanceCache>,
982 filter: &FilterExpression,
983 decoder_config: &DecoderConfig,
984 ) -> Result<Self> {
985 assert!(num_rows > 0);
986 let buffers = FileBuffers {
987 positions_and_sizes: file_buffer_positions_and_sizes,
988 };
989 let arrow_schema = ArrowSchema::from(schema);
990 let root_fields = arrow_schema.fields().clone();
991 let root_type = DataType::Struct(root_fields.clone());
992 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
993 root_field.children.clone_from(&schema.fields);
997 root_field
998 .metadata
999 .insert("__lance_decoder_root".to_string(), "true".to_string());
1000
1001 if column_infos.is_empty() || column_infos[0].is_structural() {
1002 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1003
1004 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1005 let mut root_scheduler =
1006 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1007
1008 let context = SchedulerContext::new(io, cache.clone());
1009 root_scheduler.initialize(filter, &context).await?;
1010
1011 Ok(Self {
1012 root_scheduler: RootScheduler::Structural(root_scheduler),
1013 root_fields,
1014 cache,
1015 })
1016 } else {
1017 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1020 columns.push(Arc::new(root_column(num_rows)));
1021 columns.extend(column_infos.iter().cloned());
1022
1023 let adjusted_column_indices = [0_u32]
1024 .into_iter()
1025 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1026 .collect::<Vec<_>>();
1027 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1028 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1029 let root_scheduler =
1030 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1031
1032 let context = SchedulerContext::new(io, cache.clone());
1033 root_scheduler.initialize(filter, &context).await?;
1034
1035 Ok(Self {
1036 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1037 root_fields,
1038 cache,
1039 })
1040 }
1041 }
1042
1043 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1044 pub fn from_scheduler(
1045 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1046 root_fields: Fields,
1047 cache: Arc<LanceCache>,
1048 ) -> Self {
1049 Self {
1050 root_scheduler: RootScheduler::Legacy(root_scheduler),
1051 root_fields,
1052 cache,
1053 }
1054 }
1055
1056 fn do_schedule_ranges_structural(
1057 &mut self,
1058 ranges: &[Range<u64>],
1059 filter: &FilterExpression,
1060 io: Arc<dyn EncodingsIo>,
1061 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1062 ) {
1063 let root_scheduler = self.root_scheduler.as_structural();
1064 let mut context = SchedulerContext::new(io, self.cache.clone());
1065 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1066 if let Err(schedule_ranges_err) = maybe_root_job {
1067 schedule_action(Err(schedule_ranges_err));
1068 return;
1069 }
1070 let mut root_job = maybe_root_job.unwrap();
1071 let mut num_rows_scheduled = 0;
1072 loop {
1073 let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1074 if let Err(err) = maybe_next_scan_lines {
1075 schedule_action(Err(err));
1076 return;
1077 }
1078 let next_scan_lines = maybe_next_scan_lines.unwrap();
1079 if next_scan_lines.is_empty() {
1080 return;
1081 }
1082 for next_scan_line in next_scan_lines {
1083 trace!(
1084 "Scheduled scan line of {} rows and {} decoders",
1085 next_scan_line.rows_scheduled,
1086 next_scan_line.decoders.len()
1087 );
1088 num_rows_scheduled += next_scan_line.rows_scheduled;
1089 if !schedule_action(Ok(DecoderMessage {
1090 scheduled_so_far: num_rows_scheduled,
1091 decoders: next_scan_line.decoders,
1092 })) {
1093 return;
1095 }
1096 }
1097 }
1098 }
1099
1100 fn do_schedule_ranges_legacy(
1101 &mut self,
1102 ranges: &[Range<u64>],
1103 filter: &FilterExpression,
1104 io: Arc<dyn EncodingsIo>,
1105 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1106 priority: Option<Box<dyn PriorityRange>>,
1110 ) {
1111 let root_scheduler = self.root_scheduler.as_legacy();
1112 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1113 trace!(
1114 "Scheduling {} ranges across {}..{} ({} rows){}",
1115 ranges.len(),
1116 ranges.first().unwrap().start,
1117 ranges.last().unwrap().end,
1118 rows_requested,
1119 priority
1120 .as_ref()
1121 .map(|p| format!(" (priority={:?})", p))
1122 .unwrap_or_default()
1123 );
1124
1125 let mut context = SchedulerContext::new(io, self.cache.clone());
1126 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1127 if let Err(schedule_ranges_err) = maybe_root_job {
1128 schedule_action(Err(schedule_ranges_err));
1129 return;
1130 }
1131 let mut root_job = maybe_root_job.unwrap();
1132 let mut num_rows_scheduled = 0;
1133 let mut rows_to_schedule = root_job.num_rows();
1134 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1135 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1136 while rows_to_schedule > 0 {
1137 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1138 if let Err(schedule_next_err) = maybe_next_scan_line {
1139 schedule_action(Err(schedule_next_err));
1140 return;
1141 }
1142 let next_scan_line = maybe_next_scan_line.unwrap();
1143 priority.advance(next_scan_line.rows_scheduled);
1144 num_rows_scheduled += next_scan_line.rows_scheduled;
1145 rows_to_schedule -= next_scan_line.rows_scheduled;
1146 trace!(
1147 "Scheduled scan line of {} rows and {} decoders",
1148 next_scan_line.rows_scheduled,
1149 next_scan_line.decoders.len()
1150 );
1151 if !schedule_action(Ok(DecoderMessage {
1152 scheduled_so_far: num_rows_scheduled,
1153 decoders: next_scan_line.decoders,
1154 })) {
1155 return;
1157 }
1158
1159 trace!("Finished scheduling {} ranges", ranges.len());
1160 }
1161 }
1162
1163 fn do_schedule_ranges(
1164 &mut self,
1165 ranges: &[Range<u64>],
1166 filter: &FilterExpression,
1167 io: Arc<dyn EncodingsIo>,
1168 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1169 priority: Option<Box<dyn PriorityRange>>,
1173 ) {
1174 match &self.root_scheduler {
1175 RootScheduler::Legacy(_) => {
1176 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1177 }
1178 RootScheduler::Structural(_) => {
1179 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1180 }
1181 }
1182 }
1183
1184 pub fn schedule_ranges_to_vec(
1187 &mut self,
1188 ranges: &[Range<u64>],
1189 filter: &FilterExpression,
1190 io: Arc<dyn EncodingsIo>,
1191 priority: Option<Box<dyn PriorityRange>>,
1192 ) -> Result<Vec<DecoderMessage>> {
1193 let mut decode_messages = Vec::new();
1194 self.do_schedule_ranges(
1195 ranges,
1196 filter,
1197 io,
1198 |msg| {
1199 decode_messages.push(msg);
1200 true
1201 },
1202 priority,
1203 );
1204 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1205 }
1206
1207 #[instrument(skip_all)]
1217 pub fn schedule_ranges(
1218 &mut self,
1219 ranges: &[Range<u64>],
1220 filter: &FilterExpression,
1221 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1222 scheduler: Arc<dyn EncodingsIo>,
1223 ) {
1224 self.do_schedule_ranges(
1225 ranges,
1226 filter,
1227 scheduler,
1228 |msg| {
1229 match sink.send(msg) {
1230 Ok(_) => true,
1231 Err(SendError { .. }) => {
1232 debug!(
1235 "schedule_ranges aborting early since decoder appears to have been dropped"
1236 );
1237 false
1238 }
1239 }
1240 },
1241 None,
1242 )
1243 }
1244
1245 #[instrument(skip_all)]
1253 pub fn schedule_range(
1254 &mut self,
1255 range: Range<u64>,
1256 filter: &FilterExpression,
1257 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1258 scheduler: Arc<dyn EncodingsIo>,
1259 ) {
1260 self.schedule_ranges(&[range], filter, sink, scheduler)
1261 }
1262
1263 pub fn schedule_take(
1271 &mut self,
1272 indices: &[u64],
1273 filter: &FilterExpression,
1274 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1275 scheduler: Arc<dyn EncodingsIo>,
1276 ) {
1277 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1278 if indices.is_empty() {
1279 return;
1280 }
1281 trace!("Scheduling take of {} rows", indices.len());
1282 let ranges = Self::indices_to_ranges(indices);
1283 self.schedule_ranges(&ranges, filter, sink, scheduler)
1284 }
1285
1286 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1288 let mut ranges = Vec::new();
1289 let mut start = indices[0];
1290
1291 for window in indices.windows(2) {
1292 if window[1] != window[0] + 1 {
1293 ranges.push(start..window[0] + 1);
1294 start = window[1];
1295 }
1296 }
1297
1298 ranges.push(start..*indices.last().unwrap() + 1);
1299 ranges
1300 }
1301}
1302
1303pub struct ReadBatchTask {
1304 pub task: BoxFuture<'static, Result<RecordBatch>>,
1305 pub num_rows: u32,
1306}
1307
1308pub struct BatchDecodeStream {
1310 context: DecoderContext,
1311 root_decoder: SimpleStructDecoder,
1312 rows_remaining: u64,
1313 rows_per_batch: u32,
1314 rows_scheduled: u64,
1315 rows_drained: u64,
1316 scheduler_exhausted: bool,
1317 emitted_batch_size_warning: Arc<Once>,
1318}
1319
1320impl BatchDecodeStream {
1321 pub fn new(
1332 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1333 rows_per_batch: u32,
1334 num_rows: u64,
1335 root_decoder: SimpleStructDecoder,
1336 ) -> Self {
1337 Self {
1338 context: DecoderContext::new(scheduled),
1339 root_decoder,
1340 rows_remaining: num_rows,
1341 rows_per_batch,
1342 rows_scheduled: 0,
1343 rows_drained: 0,
1344 scheduler_exhausted: false,
1345 emitted_batch_size_warning: Arc::new(Once::new()),
1346 }
1347 }
1348
1349 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1350 if decoder.path.is_empty() {
1351 Ok(())
1353 } else {
1354 self.root_decoder.accept_child(decoder)
1355 }
1356 }
1357
1358 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1359 if self.scheduler_exhausted {
1360 return Ok(self.rows_scheduled);
1361 }
1362 while self.rows_scheduled < scheduled_need {
1363 let next_message = self.context.source.recv().await;
1364 match next_message {
1365 Some(scan_line) => {
1366 let scan_line = scan_line?;
1367 self.rows_scheduled = scan_line.scheduled_so_far;
1368 for message in scan_line.decoders {
1369 self.accept_decoder(message.into_legacy())?;
1370 }
1371 }
1372 None => {
1373 self.scheduler_exhausted = true;
1377 return Ok(self.rows_scheduled);
1378 }
1379 }
1380 }
1381 Ok(scheduled_need)
1382 }
1383
1384 #[instrument(level = "debug", skip_all)]
1385 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1386 trace!(
1387 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1388 self.rows_remaining,
1389 self.rows_drained,
1390 self.rows_scheduled,
1391 );
1392 if self.rows_remaining == 0 {
1393 return Ok(None);
1394 }
1395
1396 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1397 self.rows_remaining -= to_take;
1398
1399 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1400 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1401 if scheduled_need > 0 {
1402 let desired_scheduled = scheduled_need + self.rows_scheduled;
1403 trace!(
1404 "Draining from scheduler (desire at least {} scheduled rows)",
1405 desired_scheduled
1406 );
1407 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1408 if actually_scheduled < desired_scheduled {
1409 let under_scheduled = desired_scheduled - actually_scheduled;
1410 to_take -= under_scheduled;
1411 }
1412 }
1413
1414 if to_take == 0 {
1415 return Ok(None);
1416 }
1417
1418 let loaded_need = self.rows_drained + to_take - 1;
1420 trace!(
1421 "Waiting for I/O (desire at least {} fully loaded rows)",
1422 loaded_need
1423 );
1424 self.root_decoder.wait_for_loaded(loaded_need).await?;
1425
1426 let next_task = self.root_decoder.drain(to_take)?;
1427 self.rows_drained += to_take;
1428 Ok(Some(next_task))
1429 }
1430
1431 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1432 let stream = futures::stream::unfold(self, |mut slf| async move {
1433 let next_task = slf.next_batch_task().await;
1434 let next_task = next_task.transpose().map(|next_task| {
1435 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1436 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1437 let task = async move {
1438 let next_task = next_task?;
1439 next_task.into_batch(emitted_batch_size_warning)
1440 };
1441 (task, num_rows)
1442 });
1443 next_task.map(|(task, num_rows)| {
1444 debug_assert!(num_rows <= u32::MAX as u64);
1446 let next_task = ReadBatchTask {
1447 task: task.boxed(),
1448 num_rows: num_rows as u32,
1449 };
1450 (next_task, slf)
1451 })
1452 });
1453 stream.boxed()
1454 }
1455}
1456
1457enum RootDecoderMessage {
1460 LoadedPage(LoadedPageShard),
1461 LegacyPage(crate::previous::decoder::DecoderReady),
1462}
1463trait RootDecoderType {
1464 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1465 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1466 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1467}
1468impl RootDecoderType for StructuralStructDecoder {
1469 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1470 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1471 unreachable!()
1472 };
1473 self.accept_page(loaded_page)
1474 }
1475 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1476 self.drain_batch_task(num_rows)
1477 }
1478 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1479 Ok(())
1481 }
1482}
1483impl RootDecoderType for SimpleStructDecoder {
1484 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1485 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1486 unreachable!()
1487 };
1488 self.accept_child(legacy_page)
1489 }
1490 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1491 self.drain(num_rows)
1492 }
1493 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1494 runtime.block_on(self.wait_for_loaded(loaded_need))
1495 }
1496}
1497
1498struct BatchDecodeIterator<T: RootDecoderType> {
1500 messages: VecDeque<Result<DecoderMessage>>,
1501 root_decoder: T,
1502 rows_remaining: u64,
1503 rows_per_batch: u32,
1504 rows_scheduled: u64,
1505 rows_drained: u64,
1506 emitted_batch_size_warning: Arc<Once>,
1507 wait_for_io_runtime: tokio::runtime::Runtime,
1511 schema: Arc<ArrowSchema>,
1512}
1513
1514impl<T: RootDecoderType> BatchDecodeIterator<T> {
1515 pub fn new(
1517 messages: VecDeque<Result<DecoderMessage>>,
1518 rows_per_batch: u32,
1519 num_rows: u64,
1520 root_decoder: T,
1521 schema: Arc<ArrowSchema>,
1522 ) -> Self {
1523 Self {
1524 messages,
1525 root_decoder,
1526 rows_remaining: num_rows,
1527 rows_per_batch,
1528 rows_scheduled: 0,
1529 rows_drained: 0,
1530 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1531 .build()
1532 .unwrap(),
1533 emitted_batch_size_warning: Arc::new(Once::new()),
1534 schema,
1535 }
1536 }
1537
1538 fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1543 match maybe_done(unloaded_page.0) {
1544 MaybeDone::Done(loaded_page) => loaded_page,
1546 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1548 MaybeDone::Gone => unreachable!(),
1549 }
1550 }
1551
1552 #[instrument(skip_all)]
1557 fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1558 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1559 let message = self.messages.pop_front().unwrap()?;
1560 self.rows_scheduled = message.scheduled_so_far;
1561 for decoder_message in message.decoders {
1562 match decoder_message {
1563 MessageType::UnloadedPage(unloaded_page) => {
1564 let loaded_page = self.wait_for_page(unloaded_page)?;
1565 self.root_decoder
1566 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1567 }
1568 MessageType::DecoderReady(decoder_ready) => {
1569 if !decoder_ready.path.is_empty() {
1571 self.root_decoder
1572 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1573 }
1574 }
1575 }
1576 }
1577 }
1578
1579 let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1580
1581 self.root_decoder
1582 .wait(loaded_need, &self.wait_for_io_runtime)?;
1583 Ok(self.rows_scheduled)
1584 }
1585
1586 #[instrument(level = "debug", skip_all)]
1587 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1588 trace!(
1589 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1590 self.rows_remaining,
1591 self.rows_drained,
1592 self.rows_scheduled,
1593 );
1594 if self.rows_remaining == 0 {
1595 return Ok(None);
1596 }
1597
1598 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1599 self.rows_remaining -= to_take;
1600
1601 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1602 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1603 if scheduled_need > 0 {
1604 let desired_scheduled = scheduled_need + self.rows_scheduled;
1605 trace!(
1606 "Draining from scheduler (desire at least {} scheduled rows)",
1607 desired_scheduled
1608 );
1609 let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1610 if actually_scheduled < desired_scheduled {
1611 let under_scheduled = desired_scheduled - actually_scheduled;
1612 to_take -= under_scheduled;
1613 }
1614 }
1615
1616 if to_take == 0 {
1617 return Ok(None);
1618 }
1619
1620 let next_task = self.root_decoder.drain_batch(to_take)?;
1621
1622 self.rows_drained += to_take;
1623
1624 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1625
1626 Ok(Some(batch))
1627 }
1628}
1629
1630impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1631 type Item = ArrowResult<RecordBatch>;
1632
1633 fn next(&mut self) -> Option<Self::Item> {
1634 self.next_batch_task()
1635 .transpose()
1636 .map(|r| r.map_err(ArrowError::from))
1637 }
1638}
1639
1640impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1641 fn schema(&self) -> Arc<ArrowSchema> {
1642 self.schema.clone()
1643 }
1644}
1645
1646pub struct StructuralBatchDecodeStream {
1648 context: DecoderContext,
1649 root_decoder: StructuralStructDecoder,
1650 rows_remaining: u64,
1651 rows_per_batch: u32,
1652 rows_scheduled: u64,
1653 rows_drained: u64,
1654 scheduler_exhausted: bool,
1655 emitted_batch_size_warning: Arc<Once>,
1656}
1657
1658impl StructuralBatchDecodeStream {
1659 pub fn new(
1670 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1671 rows_per_batch: u32,
1672 num_rows: u64,
1673 root_decoder: StructuralStructDecoder,
1674 ) -> Self {
1675 Self {
1676 context: DecoderContext::new(scheduled),
1677 root_decoder,
1678 rows_remaining: num_rows,
1679 rows_per_batch,
1680 rows_scheduled: 0,
1681 rows_drained: 0,
1682 scheduler_exhausted: false,
1683 emitted_batch_size_warning: Arc::new(Once::new()),
1684 }
1685 }
1686
1687 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1688 if self.scheduler_exhausted {
1689 return Ok(self.rows_scheduled);
1690 }
1691 while self.rows_scheduled < scheduled_need {
1692 let next_message = self.context.source.recv().await;
1693 match next_message {
1694 Some(scan_line) => {
1695 let scan_line = scan_line?;
1696 self.rows_scheduled = scan_line.scheduled_so_far;
1697 for message in scan_line.decoders {
1698 let unloaded_page = message.into_structural();
1699 let loaded_page = unloaded_page.0.await?;
1700 self.root_decoder.accept_page(loaded_page)?;
1701 }
1702 }
1703 None => {
1704 self.scheduler_exhausted = true;
1708 return Ok(self.rows_scheduled);
1709 }
1710 }
1711 }
1712 Ok(scheduled_need)
1713 }
1714
1715 #[instrument(level = "debug", skip_all)]
1716 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1717 trace!(
1718 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1719 self.rows_remaining,
1720 self.rows_drained,
1721 self.rows_scheduled,
1722 );
1723 if self.rows_remaining == 0 {
1724 return Ok(None);
1725 }
1726
1727 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1728 self.rows_remaining -= to_take;
1729
1730 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1731 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1732 if scheduled_need > 0 {
1733 let desired_scheduled = scheduled_need + self.rows_scheduled;
1734 trace!(
1735 "Draining from scheduler (desire at least {} scheduled rows)",
1736 desired_scheduled
1737 );
1738 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1739 if actually_scheduled < desired_scheduled {
1740 let under_scheduled = desired_scheduled - actually_scheduled;
1741 to_take -= under_scheduled;
1742 }
1743 }
1744
1745 if to_take == 0 {
1746 return Ok(None);
1747 }
1748
1749 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1750 self.rows_drained += to_take;
1751 Ok(Some(next_task))
1752 }
1753
1754 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1755 let stream = futures::stream::unfold(self, |mut slf| async move {
1756 let next_task = slf.next_batch_task().await;
1757 let next_task = next_task.transpose().map(|next_task| {
1758 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1759 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1760 let task = async move {
1761 let next_task = next_task?;
1762 next_task.into_batch(emitted_batch_size_warning)
1763 };
1764 (task, num_rows)
1765 });
1766 next_task.map(|(task, num_rows)| {
1767 debug_assert!(num_rows <= u32::MAX as u64);
1769 let next_task = ReadBatchTask {
1770 task: task.boxed(),
1771 num_rows: num_rows as u32,
1772 };
1773 (next_task, slf)
1774 })
1775 });
1776 stream.boxed()
1777 }
1778}
1779
1780#[derive(Debug)]
1781pub enum RequestedRows {
1782 Ranges(Vec<Range<u64>>),
1783 Indices(Vec<u64>),
1784}
1785
1786impl RequestedRows {
1787 pub fn num_rows(&self) -> u64 {
1788 match self {
1789 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1790 Self::Indices(indices) => indices.len() as u64,
1791 }
1792 }
1793
1794 pub fn trim_empty_ranges(mut self) -> Self {
1795 if let Self::Ranges(ranges) = &mut self {
1796 ranges.retain(|r| !r.is_empty());
1797 }
1798 self
1799 }
1800}
1801
1802#[derive(Debug, Clone, Default)]
1804pub struct DecoderConfig {
1805 pub cache_repetition_index: bool,
1807 pub validate_on_decode: bool,
1809}
1810
1811#[derive(Debug, Clone)]
1812pub struct SchedulerDecoderConfig {
1813 pub decoder_plugins: Arc<DecoderPlugins>,
1814 pub batch_size: u32,
1815 pub io: Arc<dyn EncodingsIo>,
1816 pub cache: Arc<LanceCache>,
1817 pub decoder_config: DecoderConfig,
1819}
1820
1821fn check_scheduler_on_drop(
1822 stream: BoxStream<'static, ReadBatchTask>,
1823 scheduler_handle: tokio::task::JoinHandle<()>,
1824) -> BoxStream<'static, ReadBatchTask> {
1825 let mut scheduler_handle = Some(scheduler_handle);
1829 let check_scheduler = stream::unfold((), move |_| {
1830 let handle = scheduler_handle.take();
1831 async move {
1832 if let Some(handle) = handle {
1833 handle.await.unwrap();
1834 }
1835 None
1836 }
1837 });
1838 stream.chain(check_scheduler).boxed()
1839}
1840
1841pub fn create_decode_stream(
1842 schema: &Schema,
1843 num_rows: u64,
1844 batch_size: u32,
1845 is_structural: bool,
1846 should_validate: bool,
1847 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1848) -> BoxStream<'static, ReadBatchTask> {
1849 if is_structural {
1850 let arrow_schema = ArrowSchema::from(schema);
1851 let structural_decoder = StructuralStructDecoder::new(
1852 arrow_schema.fields,
1853 should_validate,
1854 true,
1855 );
1856 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1857 } else {
1858 let arrow_schema = ArrowSchema::from(schema);
1859 let root_fields = arrow_schema.fields;
1860
1861 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1862 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1863 }
1864}
1865
1866pub fn create_decode_iterator(
1870 schema: &Schema,
1871 num_rows: u64,
1872 batch_size: u32,
1873 should_validate: bool,
1874 is_structural: bool,
1875 messages: VecDeque<Result<DecoderMessage>>,
1876) -> Box<dyn RecordBatchReader + Send + 'static> {
1877 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1878 let root_fields = arrow_schema.fields.clone();
1879 if is_structural {
1880 let simple_struct_decoder =
1881 StructuralStructDecoder::new(root_fields, should_validate, true);
1882 Box::new(BatchDecodeIterator::new(
1883 messages,
1884 batch_size,
1885 num_rows,
1886 simple_struct_decoder,
1887 arrow_schema,
1888 ))
1889 } else {
1890 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1891 Box::new(BatchDecodeIterator::new(
1892 messages,
1893 batch_size,
1894 num_rows,
1895 root_decoder,
1896 arrow_schema,
1897 ))
1898 }
1899}
1900
1901fn create_scheduler_decoder(
1902 column_infos: Vec<Arc<ColumnInfo>>,
1903 requested_rows: RequestedRows,
1904 filter: FilterExpression,
1905 column_indices: Vec<u32>,
1906 target_schema: Arc<Schema>,
1907 config: SchedulerDecoderConfig,
1908) -> Result<BoxStream<'static, ReadBatchTask>> {
1909 let num_rows = requested_rows.num_rows();
1910
1911 let is_structural = column_infos[0].is_structural();
1912
1913 let (tx, rx) = mpsc::unbounded_channel();
1914
1915 let decode_stream = create_decode_stream(
1916 &target_schema,
1917 num_rows,
1918 config.batch_size,
1919 is_structural,
1920 config.decoder_config.validate_on_decode,
1921 rx,
1922 );
1923
1924 let scheduler_handle = tokio::task::spawn(async move {
1925 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1926 target_schema.as_ref(),
1927 &column_indices,
1928 &column_infos,
1929 &vec![],
1930 num_rows,
1931 config.decoder_plugins,
1932 config.io.clone(),
1933 config.cache,
1934 &filter,
1935 &config.decoder_config,
1936 )
1937 .await
1938 {
1939 Ok(scheduler) => scheduler,
1940 Err(e) => {
1941 let _ = tx.send(Err(e));
1942 return;
1943 }
1944 };
1945
1946 match requested_rows {
1947 RequestedRows::Ranges(ranges) => {
1948 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1949 }
1950 RequestedRows::Indices(indices) => {
1951 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1952 }
1953 }
1954 });
1955
1956 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1957}
1958
1959pub fn schedule_and_decode(
1965 column_infos: Vec<Arc<ColumnInfo>>,
1966 requested_rows: RequestedRows,
1967 filter: FilterExpression,
1968 column_indices: Vec<u32>,
1969 target_schema: Arc<Schema>,
1970 config: SchedulerDecoderConfig,
1971) -> BoxStream<'static, ReadBatchTask> {
1972 if requested_rows.num_rows() == 0 {
1973 return stream::empty().boxed();
1974 }
1975
1976 let requested_rows = requested_rows.trim_empty_ranges();
1979
1980 match create_scheduler_decoder(
1984 column_infos,
1985 requested_rows,
1986 filter,
1987 column_indices,
1988 target_schema,
1989 config,
1990 ) {
1991 Ok(stream) => stream,
1993 Err(e) => stream::once(std::future::ready(ReadBatchTask {
1994 num_rows: 0,
1995 task: std::future::ready(Err(e)).boxed(),
1996 }))
1997 .boxed(),
1998 }
1999}
2000
2001pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2002 tokio::runtime::Builder::new_current_thread()
2003 .build()
2004 .unwrap()
2005});
2006
2007pub fn schedule_and_decode_blocking(
2022 column_infos: Vec<Arc<ColumnInfo>>,
2023 requested_rows: RequestedRows,
2024 filter: FilterExpression,
2025 column_indices: Vec<u32>,
2026 target_schema: Arc<Schema>,
2027 config: SchedulerDecoderConfig,
2028) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2029 if requested_rows.num_rows() == 0 {
2030 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2031 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2032 }
2033
2034 let num_rows = requested_rows.num_rows();
2035 let is_structural = column_infos[0].is_structural();
2036
2037 let (tx, mut rx) = mpsc::unbounded_channel();
2038
2039 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2042 target_schema.as_ref(),
2043 &column_indices,
2044 &column_infos,
2045 &vec![],
2046 num_rows,
2047 config.decoder_plugins,
2048 config.io.clone(),
2049 config.cache,
2050 &filter,
2051 &config.decoder_config,
2052 ))?;
2053
2054 match requested_rows {
2056 RequestedRows::Ranges(ranges) => {
2057 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2058 }
2059 RequestedRows::Indices(indices) => {
2060 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2061 }
2062 }
2063
2064 let mut messages = Vec::new();
2066 while rx
2067 .recv_many(&mut messages, usize::MAX)
2068 .now_or_never()
2069 .unwrap()
2070 != 0
2071 {}
2072
2073 let decode_iterator = create_decode_iterator(
2075 &target_schema,
2076 num_rows,
2077 config.batch_size,
2078 config.decoder_config.validate_on_decode,
2079 is_structural,
2080 messages.into(),
2081 );
2082
2083 Ok(decode_iterator)
2084}
2085
2086pub trait PrimitivePageDecoder: Send + Sync {
2098 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2130}
2131
2132pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2141 fn schedule_ranges(
2153 &self,
2154 ranges: &[Range<u64>],
2155 scheduler: &Arc<dyn EncodingsIo>,
2156 top_level_row: u64,
2157 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2158}
2159
2160pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2162 fn advance(&mut self, num_rows: u64);
2163 fn current_priority(&self) -> u64;
2164 fn box_clone(&self) -> Box<dyn PriorityRange>;
2165}
2166
2167#[derive(Debug)]
2170pub struct SimplePriorityRange {
2171 priority: u64,
2172}
2173
2174impl SimplePriorityRange {
2175 fn new(priority: u64) -> Self {
2176 Self { priority }
2177 }
2178}
2179
2180impl PriorityRange for SimplePriorityRange {
2181 fn advance(&mut self, num_rows: u64) {
2182 self.priority += num_rows;
2183 }
2184
2185 fn current_priority(&self) -> u64 {
2186 self.priority
2187 }
2188
2189 fn box_clone(&self) -> Box<dyn PriorityRange> {
2190 Box::new(Self {
2191 priority: self.priority,
2192 })
2193 }
2194}
2195
2196pub struct ListPriorityRange {
2209 base: Box<dyn PriorityRange>,
2210 offsets: Arc<[u64]>,
2211 cur_index_into_offsets: usize,
2212 cur_position: u64,
2213}
2214
2215impl ListPriorityRange {
2216 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2217 Self {
2218 base,
2219 offsets,
2220 cur_index_into_offsets: 0,
2221 cur_position: 0,
2222 }
2223 }
2224}
2225
2226impl std::fmt::Debug for ListPriorityRange {
2227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2228 f.debug_struct("ListPriorityRange")
2229 .field("base", &self.base)
2230 .field("offsets.len()", &self.offsets.len())
2231 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2232 .field("cur_position", &self.cur_position)
2233 .finish()
2234 }
2235}
2236
2237impl PriorityRange for ListPriorityRange {
2238 fn advance(&mut self, num_rows: u64) {
2239 self.cur_position += num_rows;
2242 let mut idx_into_offsets = self.cur_index_into_offsets;
2243 while idx_into_offsets + 1 < self.offsets.len()
2244 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2245 {
2246 idx_into_offsets += 1;
2247 }
2248 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2249 self.cur_index_into_offsets = idx_into_offsets;
2250 self.base.advance(base_rows_advanced as u64);
2251 }
2252
2253 fn current_priority(&self) -> u64 {
2254 self.base.current_priority()
2255 }
2256
2257 fn box_clone(&self) -> Box<dyn PriorityRange> {
2258 Box::new(Self {
2259 base: self.base.box_clone(),
2260 offsets: self.offsets.clone(),
2261 cur_index_into_offsets: self.cur_index_into_offsets,
2262 cur_position: self.cur_position,
2263 })
2264 }
2265}
2266
2267pub struct SchedulerContext {
2269 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2270 io: Arc<dyn EncodingsIo>,
2271 cache: Arc<LanceCache>,
2272 name: String,
2273 path: Vec<u32>,
2274 path_names: Vec<String>,
2275}
2276
2277pub struct ScopedSchedulerContext<'a> {
2278 pub context: &'a mut SchedulerContext,
2279}
2280
2281impl<'a> ScopedSchedulerContext<'a> {
2282 pub fn pop(self) -> &'a mut SchedulerContext {
2283 self.context.pop();
2284 self.context
2285 }
2286}
2287
2288impl SchedulerContext {
2289 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2290 Self {
2291 io,
2292 cache,
2293 recv: None,
2294 name: "".to_string(),
2295 path: Vec::new(),
2296 path_names: Vec::new(),
2297 }
2298 }
2299
2300 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2301 &self.io
2302 }
2303
2304 pub fn cache(&self) -> &Arc<LanceCache> {
2305 &self.cache
2306 }
2307
2308 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2309 self.path.push(index);
2310 self.path_names.push(name.to_string());
2311 ScopedSchedulerContext { context: self }
2312 }
2313
2314 pub fn pop(&mut self) {
2315 self.path.pop();
2316 self.path_names.pop();
2317 }
2318
2319 pub fn path_name(&self) -> String {
2320 let path = self.path_names.join("/");
2321 if self.recv.is_some() {
2322 format!("TEMP({}){}", self.name, path)
2323 } else {
2324 format!("ROOT{}", path)
2325 }
2326 }
2327
2328 pub fn current_path(&self) -> VecDeque<u32> {
2329 VecDeque::from_iter(self.path.iter().copied())
2330 }
2331
2332 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2333 pub fn locate_decoder(
2334 &mut self,
2335 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2336 ) -> crate::previous::decoder::DecoderReady {
2337 trace!(
2338 "Scheduling decoder of type {:?} for {:?}",
2339 decoder.data_type(),
2340 self.path,
2341 );
2342 crate::previous::decoder::DecoderReady {
2343 decoder,
2344 path: self.current_path(),
2345 }
2346 }
2347}
2348
2349pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2350
2351impl std::fmt::Debug for UnloadedPageShard {
2352 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2353 f.debug_struct("UnloadedPage").finish()
2354 }
2355}
2356
2357#[derive(Debug)]
2358pub struct ScheduledScanLine {
2359 pub rows_scheduled: u64,
2360 pub decoders: Vec<MessageType>,
2361}
2362
2363pub trait StructuralSchedulingJob: std::fmt::Debug {
2364 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2371}
2372
2373pub struct FilterExpression(pub Bytes);
2381
2382impl FilterExpression {
2383 pub fn no_filter() -> Self {
2388 Self(Bytes::new())
2389 }
2390
2391 pub fn is_noop(&self) -> bool {
2393 self.0.is_empty()
2394 }
2395}
2396
2397pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2398 fn initialize<'a>(
2399 &'a mut self,
2400 filter: &'a FilterExpression,
2401 context: &'a SchedulerContext,
2402 ) -> BoxFuture<'a, Result<()>>;
2403 fn schedule_ranges<'a>(
2404 &'a self,
2405 ranges: &[Range<u64>],
2406 filter: &FilterExpression,
2407 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2408}
2409
2410pub trait DecodeArrayTask: Send {
2412 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2414}
2415
2416impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2417 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2418 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2419 }
2420}
2421
2422pub struct NextDecodeTask {
2427 pub task: Box<dyn DecodeArrayTask>,
2429 pub num_rows: u64,
2431}
2432
2433impl NextDecodeTask {
2434 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2439 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2440 let struct_arr = self.task.decode();
2441 match struct_arr {
2442 Ok(struct_arr) => {
2443 let batch = RecordBatch::from(struct_arr.as_struct());
2444 let size_bytes = batch.get_array_memory_size() as u64;
2445 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2446 emitted_batch_size_warning.call_once(|| {
2447 let size_mb = size_bytes / 1024 / 1024;
2448 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2449 });
2450 }
2451 Ok(batch)
2452 }
2453 Err(e) => {
2454 let e = Error::Internal {
2455 message: format!("Error decoding batch: {}", e),
2456 location: location!(),
2457 };
2458 Err(e)
2459 }
2460 }
2461 }
2462}
2463
2464#[derive(Debug)]
2468pub enum MessageType {
2469 DecoderReady(crate::previous::decoder::DecoderReady),
2474 UnloadedPage(UnloadedPageShard),
2478}
2479
2480impl MessageType {
2481 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2482 match self {
2483 Self::DecoderReady(decoder) => decoder,
2484 Self::UnloadedPage(_) => {
2485 panic!("Expected DecoderReady but got UnloadedPage")
2486 }
2487 }
2488 }
2489
2490 pub fn into_structural(self) -> UnloadedPageShard {
2491 match self {
2492 Self::UnloadedPage(unloaded) => unloaded,
2493 Self::DecoderReady(_) => {
2494 panic!("Expected UnloadedPage but got DecoderReady")
2495 }
2496 }
2497 }
2498}
2499
2500pub struct DecoderMessage {
2501 pub scheduled_so_far: u64,
2502 pub decoders: Vec<MessageType>,
2503}
2504
2505pub struct DecoderContext {
2506 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2507}
2508
2509impl DecoderContext {
2510 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2511 Self { source }
2512 }
2513}
2514
2515pub struct DecodedPage {
2516 pub data: DataBlock,
2517 pub repdef: RepDefUnraveler,
2518}
2519
2520pub trait DecodePageTask: Send + std::fmt::Debug {
2521 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2523}
2524
2525pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2526 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2527 fn num_rows(&self) -> u64;
2528}
2529
2530#[derive(Debug)]
2531pub struct LoadedPageShard {
2532 pub decoder: Box<dyn StructuralPageDecoder>,
2534 pub path: VecDeque<u32>,
2553}
2554
2555pub struct DecodedArray {
2556 pub array: ArrayRef,
2557 pub repdef: CompositeRepDefUnraveler,
2558}
2559
2560pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2561 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2562}
2563
2564pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2565 fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2570 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2572 fn data_type(&self) -> &DataType;
2574}
2575
2576#[derive(Debug, Default)]
2577pub struct DecoderPlugins {}
2578
2579pub async fn decode_batch(
2581 batch: &EncodedBatch,
2582 filter: &FilterExpression,
2583 decoder_plugins: Arc<DecoderPlugins>,
2584 should_validate: bool,
2585 version: LanceFileVersion,
2586 cache: Option<Arc<LanceCache>>,
2587) -> Result<RecordBatch> {
2588 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2593 let cache = if let Some(cache) = cache {
2594 cache
2595 } else {
2596 Arc::new(lance_core::cache::LanceCache::with_capacity(
2597 128 * 1024 * 1024,
2598 ))
2599 };
2600 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2601 batch.schema.as_ref(),
2602 &batch.top_level_columns,
2603 &batch.page_table,
2604 &vec![],
2605 batch.num_rows,
2606 decoder_plugins,
2607 io_scheduler.clone(),
2608 cache,
2609 filter,
2610 &DecoderConfig::default(),
2611 )
2612 .await?;
2613 let (tx, rx) = unbounded_channel();
2614 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2615 let is_structural = version >= LanceFileVersion::V2_1;
2616 let mut decode_stream = create_decode_stream(
2617 &batch.schema,
2618 batch.num_rows,
2619 batch.num_rows as u32,
2620 is_structural,
2621 should_validate,
2622 rx,
2623 );
2624 decode_stream.next().await.unwrap().task.await
2625}
2626
2627#[cfg(test)]
2628mod tests {
2630 use super::*;
2631
2632 #[test]
2633 fn test_coalesce_indices_to_ranges_with_single_index() {
2634 let indices = vec![1];
2635 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2636 assert_eq!(ranges, vec![1..2]);
2637 }
2638
2639 #[test]
2640 fn test_coalesce_indices_to_ranges() {
2641 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2642 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2643 assert_eq!(ranges, vec![1..10]);
2644 }
2645
2646 #[test]
2647 fn test_coalesce_indices_to_ranges_with_gaps() {
2648 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2649 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2650 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2651 }
2652}