1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use lance_core::utils::futures::FinallyStreamExt;
230use log::{debug, trace, warn};
231use snafu::location;
232use tokio::sync::mpsc::error::SendError;
233use tokio::sync::mpsc::{self, unbounded_channel};
234
235use lance_core::error::LanceOptionExt;
236use lance_core::{ArrowResult, Error, Result};
237use tracing::instrument;
238
239use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
240use crate::data::DataBlock;
241use crate::encoder::EncodedBatch;
242use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
243use crate::encodings::logical::list::StructuralListScheduler;
244use crate::encodings::logical::map::StructuralMapScheduler;
245use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
246use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
247use crate::format::pb::{self, column_encoding};
248use crate::format::pb21;
249use crate::previous::decoder::LogicalPageDecoder;
250use crate::previous::encodings::logical::list::OffsetPageInfo;
251use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
252use crate::previous::encodings::logical::{
253 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
254 primitive::PrimitiveFieldScheduler,
255};
256use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
257use crate::version::LanceFileVersion;
258use crate::{BufferScheduler, EncodingsIo};
259
260const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
262
263#[derive(Debug)]
270pub enum PageEncoding {
271 Legacy(pb::ArrayEncoding),
272 Structural(pb21::PageLayout),
273}
274
275impl PageEncoding {
276 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
277 match self {
278 Self::Legacy(enc) => enc,
279 Self::Structural(_) => panic!("Expected a legacy encoding"),
280 }
281 }
282
283 pub fn as_structural(&self) -> &pb21::PageLayout {
284 match self {
285 Self::Structural(enc) => enc,
286 Self::Legacy(_) => panic!("Expected a structural encoding"),
287 }
288 }
289
290 pub fn is_structural(&self) -> bool {
291 matches!(self, Self::Structural(_))
292 }
293}
294
295#[derive(Debug)]
299pub struct PageInfo {
300 pub num_rows: u64,
302 pub priority: u64,
306 pub encoding: PageEncoding,
308 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
310}
311
312#[derive(Debug, Clone)]
316pub struct ColumnInfo {
317 pub index: u32,
319 pub page_infos: Arc<[PageInfo]>,
321 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
323 pub encoding: pb::ColumnEncoding,
324}
325
326impl ColumnInfo {
327 pub fn new(
329 index: u32,
330 page_infos: Arc<[PageInfo]>,
331 buffer_offsets_and_sizes: Vec<(u64, u64)>,
332 encoding: pb::ColumnEncoding,
333 ) -> Self {
334 Self {
335 index,
336 page_infos,
337 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
338 encoding,
339 }
340 }
341
342 pub fn is_structural(&self) -> bool {
343 self.page_infos
344 .first()
346 .map(|page| page.encoding.is_structural())
347 .unwrap_or(false)
348 }
349}
350
351enum RootScheduler {
352 Structural(Box<dyn StructuralFieldScheduler>),
353 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
354}
355
356impl RootScheduler {
357 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
358 match self {
359 Self::Structural(_) => panic!("Expected a legacy scheduler"),
360 Self::Legacy(s) => s,
361 }
362 }
363
364 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
365 match self {
366 Self::Structural(s) => s.as_ref(),
367 Self::Legacy(_) => panic!("Expected a structural scheduler"),
368 }
369 }
370}
371
372pub struct DecodeBatchScheduler {
394 root_scheduler: RootScheduler,
395 pub root_fields: Fields,
396 cache: Arc<LanceCache>,
397}
398
399pub struct ColumnInfoIter<'a> {
400 column_infos: Vec<Arc<ColumnInfo>>,
401 column_indices: &'a [u32],
402 column_info_pos: usize,
403 column_indices_pos: usize,
404}
405
406impl<'a> ColumnInfoIter<'a> {
407 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
408 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
409 Self {
410 column_infos,
411 column_indices,
412 column_info_pos: initial_pos,
413 column_indices_pos: 0,
414 }
415 }
416
417 pub fn peek(&self) -> &Arc<ColumnInfo> {
418 &self.column_infos[self.column_info_pos]
419 }
420
421 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
422 let column_info = self.column_infos[self.column_info_pos].clone();
423 let transformed = transform(column_info);
424 self.column_infos[self.column_info_pos] = transformed;
425 }
426
427 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
428 self.next().ok_or_else(|| {
429 Error::invalid_input(
430 "there were more fields in the schema than provided column indices / infos",
431 location!(),
432 )
433 })
434 }
435
436 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
437 if self.column_info_pos < self.column_infos.len() {
438 let info = &self.column_infos[self.column_info_pos];
439 self.column_info_pos += 1;
440 Some(info)
441 } else {
442 None
443 }
444 }
445
446 pub(crate) fn next_top_level(&mut self) {
447 self.column_indices_pos += 1;
448 if self.column_indices_pos < self.column_indices.len() {
449 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
450 } else {
451 self.column_info_pos = self.column_infos.len();
452 }
453 }
454}
455
456#[derive(Clone, Copy, Debug)]
458pub struct FileBuffers<'a> {
459 pub positions_and_sizes: &'a [(u64, u64)],
460}
461
462#[derive(Clone, Copy, Debug)]
464pub struct ColumnBuffers<'a, 'b> {
465 pub file_buffers: FileBuffers<'a>,
466 pub positions_and_sizes: &'b [(u64, u64)],
467}
468
469#[derive(Clone, Copy, Debug)]
471pub struct PageBuffers<'a, 'b, 'c> {
472 pub column_buffers: ColumnBuffers<'a, 'b>,
473 pub positions_and_sizes: &'c [(u64, u64)],
474}
475
476#[derive(Debug)]
478pub struct CoreFieldDecoderStrategy {
479 pub validate_data: bool,
480 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
481 pub cache_repetition_index: bool,
482}
483
484impl Default for CoreFieldDecoderStrategy {
485 fn default() -> Self {
486 Self {
487 validate_data: false,
488 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
489 cache_repetition_index: false,
490 }
491 }
492}
493
494impl CoreFieldDecoderStrategy {
495 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
497 self.cache_repetition_index = cache_repetition_index;
498 self
499 }
500
501 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
503 Self {
504 validate_data: config.validate_on_decode,
505 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
506 cache_repetition_index: config.cache_repetition_index,
507 }
508 }
509
510 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
513 let column_encoding = column_info
514 .encoding
515 .column_encoding
516 .as_ref()
517 .ok_or_else(|| {
518 Error::invalid_input(
519 format!(
520 "the column at index {} was missing a ColumnEncoding",
521 column_info.index
522 ),
523 location!(),
524 )
525 })?;
526 if matches!(
527 column_encoding,
528 pb::column_encoding::ColumnEncoding::Values(_)
529 ) {
530 Ok(())
531 } else {
532 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
533 }
534 }
535
536 fn is_structural_primitive(data_type: &DataType) -> bool {
537 if data_type.is_primitive() {
538 true
539 } else {
540 match data_type {
541 DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
543 DataType::Boolean
544 | DataType::Null
545 | DataType::FixedSizeBinary(_)
546 | DataType::Binary
547 | DataType::LargeBinary
548 | DataType::Utf8
549 | DataType::LargeUtf8 => true,
550 DataType::FixedSizeList(inner, _) => {
551 Self::is_structural_primitive(inner.data_type())
552 }
553 _ => false,
554 }
555 }
556 }
557
558 fn is_primitive_legacy(data_type: &DataType) -> bool {
559 if data_type.is_primitive() {
560 true
561 } else {
562 match data_type {
563 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
565 DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
566 _ => false,
567 }
568 }
569 }
570
571 fn create_primitive_scheduler(
572 &self,
573 field: &Field,
574 column: &ColumnInfo,
575 buffers: FileBuffers,
576 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
577 Self::ensure_values_encoded(column, &field.name)?;
578 let column_buffers = ColumnBuffers {
580 file_buffers: buffers,
581 positions_and_sizes: &column.buffer_offsets_and_sizes,
582 };
583 Ok(Box::new(PrimitiveFieldScheduler::new(
584 column.index,
585 field.data_type(),
586 column.page_infos.clone(),
587 column_buffers,
588 self.validate_data,
589 )))
590 }
591
592 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
594 Self::ensure_values_encoded(column_info, field_name)?;
595 if column_info.page_infos.len() != 1 {
596 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
597 }
598 let encoding = &column_info.page_infos[0].encoding;
599 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
600 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
601 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
602 }
603 }
604
605 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
606 let encoding = &column_info.page_infos[0].encoding;
607 matches!(
608 encoding.as_legacy().array_encoding.as_ref().unwrap(),
609 pb::array_encoding::ArrayEncoding::PackedStruct(_)
610 )
611 }
612
613 fn create_list_scheduler(
614 &self,
615 list_field: &Field,
616 column_infos: &mut ColumnInfoIter,
617 buffers: FileBuffers,
618 offsets_column: &ColumnInfo,
619 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
620 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
621 let offsets_column_buffers = ColumnBuffers {
622 file_buffers: buffers,
623 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
624 };
625 let items_scheduler =
626 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
627
628 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
629 .page_infos
630 .iter()
631 .filter(|offsets_page| offsets_page.num_rows > 0)
632 .map(|offsets_page| {
633 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
634 &offsets_page.encoding.as_legacy().array_encoding
635 {
636 let inner = PageInfo {
637 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
638 encoding: PageEncoding::Legacy(
639 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
640 ),
641 num_rows: offsets_page.num_rows,
642 priority: 0,
643 };
644 (
645 inner,
646 OffsetPageInfo {
647 offsets_in_page: offsets_page.num_rows,
648 null_offset_adjustment: list_encoding.null_offset_adjustment,
649 num_items_referenced_by_page: list_encoding.num_items,
650 },
651 )
652 } else {
653 panic!("Expected a list column");
655 }
656 })
657 .unzip();
658 let inner = Arc::new(PrimitiveFieldScheduler::new(
659 offsets_column.index,
660 DataType::UInt64,
661 Arc::from(inner_infos.into_boxed_slice()),
662 offsets_column_buffers,
663 self.validate_data,
664 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
665 let items_field = match list_field.data_type() {
666 DataType::List(inner) => inner,
667 DataType::LargeList(inner) => inner,
668 _ => unreachable!(),
669 };
670 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
671 DataType::Int32
672 } else {
673 DataType::Int64
674 };
675 Ok(Box::new(ListFieldScheduler::new(
676 inner,
677 items_scheduler.into(),
678 items_field,
679 offset_type,
680 null_offset_adjustments,
681 )))
682 }
683
684 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
685 if let column_encoding::ColumnEncoding::Blob(blob) =
686 column_info.encoding.column_encoding.as_ref().unwrap()
687 {
688 let mut column_info = column_info.clone();
689 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
690 Some(column_info)
691 } else {
692 None
693 }
694 }
695
696 fn create_structural_field_scheduler(
697 &self,
698 field: &Field,
699 column_infos: &mut ColumnInfoIter,
700 ) -> Result<Box<dyn StructuralFieldScheduler>> {
701 let data_type = field.data_type();
702 if Self::is_structural_primitive(&data_type) {
703 let column_info = column_infos.expect_next()?;
704 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
705 column_info.as_ref(),
706 self.decompressor_strategy.as_ref(),
707 self.cache_repetition_index,
708 field,
709 )?);
710
711 column_infos.next_top_level();
713
714 return Ok(scheduler);
715 }
716 match &data_type {
717 DataType::Struct(fields) => {
718 if field.is_packed_struct() {
719 let column_info = column_infos.expect_next()?;
721 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
722 column_info.as_ref(),
723 self.decompressor_strategy.as_ref(),
724 self.cache_repetition_index,
725 field,
726 )?);
727
728 column_infos.next_top_level();
730
731 return Ok(scheduler);
732 }
733 if field.is_blob() {
735 let column_info = column_infos.peek();
736 if column_info.page_infos.iter().any(|page| {
737 matches!(
738 page.encoding,
739 PageEncoding::Structural(pb21::PageLayout {
740 layout: Some(pb21::page_layout::Layout::BlobLayout(_))
741 })
742 )
743 }) {
744 let column_info = column_infos.expect_next()?;
745 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
746 column_info.as_ref(),
747 self.decompressor_strategy.as_ref(),
748 self.cache_repetition_index,
749 field,
750 )?);
751 column_infos.next_top_level();
752 return Ok(scheduler);
753 }
754 }
755
756 let mut child_schedulers = Vec::with_capacity(field.children.len());
757 for field in field.children.iter() {
758 let field_scheduler =
759 self.create_structural_field_scheduler(field, column_infos)?;
760 child_schedulers.push(field_scheduler);
761 }
762
763 let fields = fields.clone();
764 Ok(
765 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
766 as Box<dyn StructuralFieldScheduler>,
767 )
768 }
769 DataType::List(_) | DataType::LargeList(_) => {
770 let child = field.children.first().expect_ok()?;
771 let child_scheduler =
772 self.create_structural_field_scheduler(child, column_infos)?;
773 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
774 as Box<dyn StructuralFieldScheduler>)
775 }
776 DataType::FixedSizeList(inner, dimension)
777 if matches!(inner.data_type(), DataType::Struct(_)) =>
778 {
779 let child = field.children.first().expect_ok()?;
780 let child_scheduler =
781 self.create_structural_field_scheduler(child, column_infos)?;
782 Ok(Box::new(StructuralFixedSizeListScheduler::new(
783 child_scheduler,
784 *dimension,
785 )) as Box<dyn StructuralFieldScheduler>)
786 }
787 DataType::Map(_, keys_sorted) => {
788 if *keys_sorted {
792 return Err(Error::NotSupported {
793 source: format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into(),
794 location: location!(),
795 });
796 }
797 let entries_child = field.children.first().expect_ok()?;
798 let child_scheduler =
799 self.create_structural_field_scheduler(entries_child, column_infos)?;
800 Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
801 as Box<dyn StructuralFieldScheduler>)
802 }
803 _ => todo!("create_structural_field_scheduler for {}", data_type),
804 }
805 }
806
807 fn create_legacy_field_scheduler(
808 &self,
809 field: &Field,
810 column_infos: &mut ColumnInfoIter,
811 buffers: FileBuffers,
812 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
813 let data_type = field.data_type();
814 if Self::is_primitive_legacy(&data_type) {
815 let column_info = column_infos.expect_next()?;
816 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
817 return Ok(scheduler);
818 } else if data_type.is_binary_like() {
819 let column_info = column_infos.expect_next()?.clone();
820 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
822 let desc_scheduler =
823 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
824 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
825 return Ok(blob_scheduler);
826 }
827 if let Some(page_info) = column_info.page_infos.first() {
828 if matches!(
829 page_info.encoding.as_legacy(),
830 pb::ArrayEncoding {
831 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
832 }
833 ) {
834 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
835 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
836 } else {
837 DataType::LargeList(Arc::new(ArrowField::new(
838 "item",
839 DataType::UInt8,
840 false,
841 )))
842 };
843 let list_field = Field::try_from(ArrowField::new(
844 field.name.clone(),
845 list_type,
846 field.nullable,
847 ))
848 .unwrap();
849 let list_scheduler = self.create_list_scheduler(
850 &list_field,
851 column_infos,
852 buffers,
853 &column_info,
854 )?;
855 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
856 list_scheduler.into(),
857 field.data_type(),
858 ));
859 return Ok(binary_scheduler);
860 } else {
861 let scheduler =
862 self.create_primitive_scheduler(field, &column_info, buffers)?;
863 return Ok(scheduler);
864 }
865 } else {
866 return self.create_primitive_scheduler(field, &column_info, buffers);
867 }
868 }
869 match &data_type {
870 DataType::FixedSizeList(inner, _dimension) => {
871 if Self::is_primitive_legacy(inner.data_type()) {
874 let primitive_col = column_infos.expect_next()?;
875 let scheduler =
876 self.create_primitive_scheduler(field, primitive_col, buffers)?;
877 Ok(scheduler)
878 } else {
879 todo!()
880 }
881 }
882 DataType::Dictionary(_key_type, value_type) => {
883 if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
884 let primitive_col = column_infos.expect_next()?;
885 let scheduler =
886 self.create_primitive_scheduler(field, primitive_col, buffers)?;
887 Ok(scheduler)
888 } else {
889 Err(Error::NotSupported {
890 source: format!(
891 "No way to decode into a dictionary field of type {}",
892 value_type
893 )
894 .into(),
895 location: location!(),
896 })
897 }
898 }
899 DataType::List(_) | DataType::LargeList(_) => {
900 let offsets_column = column_infos.expect_next()?.clone();
901 column_infos.next_top_level();
902 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
903 }
904 DataType::Struct(fields) => {
905 let column_info = column_infos.expect_next()?;
906
907 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
909 return self.create_primitive_scheduler(field, &blob_col, buffers);
911 }
912
913 if Self::check_packed_struct(column_info) {
914 self.create_primitive_scheduler(field, column_info, buffers)
916 } else {
917 Self::check_simple_struct(column_info, &field.name).unwrap();
919 let num_rows = column_info
920 .page_infos
921 .iter()
922 .map(|page| page.num_rows)
923 .sum();
924 let mut child_schedulers = Vec::with_capacity(field.children.len());
925 for field in &field.children {
926 column_infos.next_top_level();
927 let field_scheduler =
928 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
929 child_schedulers.push(Arc::from(field_scheduler));
930 }
931
932 let fields = fields.clone();
933 Ok(Box::new(SimpleStructScheduler::new(
934 child_schedulers,
935 fields,
936 num_rows,
937 )))
938 }
939 }
940 _ => todo!(),
942 }
943 }
944}
945
946fn root_column(num_rows: u64) -> ColumnInfo {
948 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
949 let final_page_num_rows = num_rows % (u32::MAX as u64);
950 let root_pages = (0..num_root_pages)
951 .map(|i| PageInfo {
952 num_rows: if i == num_root_pages - 1 {
953 final_page_num_rows
954 } else {
955 u64::MAX
956 },
957 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
958 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
959 pb::SimpleStruct {},
960 )),
961 }),
962 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
964 })
965 .collect::<Vec<_>>();
966 ColumnInfo {
967 buffer_offsets_and_sizes: Arc::new([]),
968 encoding: pb::ColumnEncoding {
969 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
970 },
971 index: u32::MAX,
972 page_infos: Arc::from(root_pages),
973 }
974}
975
976pub enum RootDecoder {
977 Structural(StructuralStructDecoder),
978 Legacy(SimpleStructDecoder),
979}
980
981impl RootDecoder {
982 pub fn into_structural(self) -> StructuralStructDecoder {
983 match self {
984 Self::Structural(decoder) => decoder,
985 Self::Legacy(_) => panic!("Expected a structural decoder"),
986 }
987 }
988
989 pub fn into_legacy(self) -> SimpleStructDecoder {
990 match self {
991 Self::Legacy(decoder) => decoder,
992 Self::Structural(_) => panic!("Expected a legacy decoder"),
993 }
994 }
995}
996
997impl DecodeBatchScheduler {
998 #[allow(clippy::too_many_arguments)]
1001 pub async fn try_new<'a>(
1002 schema: &'a Schema,
1003 column_indices: &[u32],
1004 column_infos: &[Arc<ColumnInfo>],
1005 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1006 num_rows: u64,
1007 _decoder_plugins: Arc<DecoderPlugins>,
1008 io: Arc<dyn EncodingsIo>,
1009 cache: Arc<LanceCache>,
1010 filter: &FilterExpression,
1011 decoder_config: &DecoderConfig,
1012 ) -> Result<Self> {
1013 assert!(num_rows > 0);
1014 let buffers = FileBuffers {
1015 positions_and_sizes: file_buffer_positions_and_sizes,
1016 };
1017 let arrow_schema = ArrowSchema::from(schema);
1018 let root_fields = arrow_schema.fields().clone();
1019 let root_type = DataType::Struct(root_fields.clone());
1020 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1021 root_field.children.clone_from(&schema.fields);
1025 root_field
1026 .metadata
1027 .insert("__lance_decoder_root".to_string(), "true".to_string());
1028
1029 if column_infos.is_empty() || column_infos[0].is_structural() {
1030 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1031
1032 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1033 let mut root_scheduler =
1034 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1035
1036 let context = SchedulerContext::new(io, cache.clone());
1037 root_scheduler.initialize(filter, &context).await?;
1038
1039 Ok(Self {
1040 root_scheduler: RootScheduler::Structural(root_scheduler),
1041 root_fields,
1042 cache,
1043 })
1044 } else {
1045 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1048 columns.push(Arc::new(root_column(num_rows)));
1049 columns.extend(column_infos.iter().cloned());
1050
1051 let adjusted_column_indices = [0_u32]
1052 .into_iter()
1053 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1054 .collect::<Vec<_>>();
1055 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1056 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1057 let root_scheduler =
1058 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1059
1060 let context = SchedulerContext::new(io, cache.clone());
1061 root_scheduler.initialize(filter, &context).await?;
1062
1063 Ok(Self {
1064 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1065 root_fields,
1066 cache,
1067 })
1068 }
1069 }
1070
1071 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1072 pub fn from_scheduler(
1073 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1074 root_fields: Fields,
1075 cache: Arc<LanceCache>,
1076 ) -> Self {
1077 Self {
1078 root_scheduler: RootScheduler::Legacy(root_scheduler),
1079 root_fields,
1080 cache,
1081 }
1082 }
1083
1084 fn do_schedule_ranges_structural(
1085 &mut self,
1086 ranges: &[Range<u64>],
1087 filter: &FilterExpression,
1088 io: Arc<dyn EncodingsIo>,
1089 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1090 ) {
1091 let root_scheduler = self.root_scheduler.as_structural();
1092 let mut context = SchedulerContext::new(io, self.cache.clone());
1093 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1094 if let Err(schedule_ranges_err) = maybe_root_job {
1095 schedule_action(Err(schedule_ranges_err));
1096 return;
1097 }
1098 let mut root_job = maybe_root_job.unwrap();
1099 let mut num_rows_scheduled = 0;
1100 loop {
1101 let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1102 if let Err(err) = maybe_next_scan_lines {
1103 schedule_action(Err(err));
1104 return;
1105 }
1106 let next_scan_lines = maybe_next_scan_lines.unwrap();
1107 if next_scan_lines.is_empty() {
1108 return;
1109 }
1110 for next_scan_line in next_scan_lines {
1111 trace!(
1112 "Scheduled scan line of {} rows and {} decoders",
1113 next_scan_line.rows_scheduled,
1114 next_scan_line.decoders.len()
1115 );
1116 num_rows_scheduled += next_scan_line.rows_scheduled;
1117 if !schedule_action(Ok(DecoderMessage {
1118 scheduled_so_far: num_rows_scheduled,
1119 decoders: next_scan_line.decoders,
1120 })) {
1121 return;
1123 }
1124 }
1125 }
1126 }
1127
1128 fn do_schedule_ranges_legacy(
1129 &mut self,
1130 ranges: &[Range<u64>],
1131 filter: &FilterExpression,
1132 io: Arc<dyn EncodingsIo>,
1133 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1134 priority: Option<Box<dyn PriorityRange>>,
1138 ) {
1139 let root_scheduler = self.root_scheduler.as_legacy();
1140 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1141 trace!(
1142 "Scheduling {} ranges across {}..{} ({} rows){}",
1143 ranges.len(),
1144 ranges.first().unwrap().start,
1145 ranges.last().unwrap().end,
1146 rows_requested,
1147 priority
1148 .as_ref()
1149 .map(|p| format!(" (priority={:?})", p))
1150 .unwrap_or_default()
1151 );
1152
1153 let mut context = SchedulerContext::new(io, self.cache.clone());
1154 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1155 if let Err(schedule_ranges_err) = maybe_root_job {
1156 schedule_action(Err(schedule_ranges_err));
1157 return;
1158 }
1159 let mut root_job = maybe_root_job.unwrap();
1160 let mut num_rows_scheduled = 0;
1161 let mut rows_to_schedule = root_job.num_rows();
1162 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1163 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1164 while rows_to_schedule > 0 {
1165 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1166 if let Err(schedule_next_err) = maybe_next_scan_line {
1167 schedule_action(Err(schedule_next_err));
1168 return;
1169 }
1170 let next_scan_line = maybe_next_scan_line.unwrap();
1171 priority.advance(next_scan_line.rows_scheduled);
1172 num_rows_scheduled += next_scan_line.rows_scheduled;
1173 rows_to_schedule -= next_scan_line.rows_scheduled;
1174 trace!(
1175 "Scheduled scan line of {} rows and {} decoders",
1176 next_scan_line.rows_scheduled,
1177 next_scan_line.decoders.len()
1178 );
1179 if !schedule_action(Ok(DecoderMessage {
1180 scheduled_so_far: num_rows_scheduled,
1181 decoders: next_scan_line.decoders,
1182 })) {
1183 return;
1185 }
1186
1187 trace!("Finished scheduling {} ranges", ranges.len());
1188 }
1189 }
1190
1191 fn do_schedule_ranges(
1192 &mut self,
1193 ranges: &[Range<u64>],
1194 filter: &FilterExpression,
1195 io: Arc<dyn EncodingsIo>,
1196 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1197 priority: Option<Box<dyn PriorityRange>>,
1201 ) {
1202 match &self.root_scheduler {
1203 RootScheduler::Legacy(_) => {
1204 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1205 }
1206 RootScheduler::Structural(_) => {
1207 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1208 }
1209 }
1210 }
1211
1212 pub fn schedule_ranges_to_vec(
1215 &mut self,
1216 ranges: &[Range<u64>],
1217 filter: &FilterExpression,
1218 io: Arc<dyn EncodingsIo>,
1219 priority: Option<Box<dyn PriorityRange>>,
1220 ) -> Result<Vec<DecoderMessage>> {
1221 let mut decode_messages = Vec::new();
1222 self.do_schedule_ranges(
1223 ranges,
1224 filter,
1225 io,
1226 |msg| {
1227 decode_messages.push(msg);
1228 true
1229 },
1230 priority,
1231 );
1232 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1233 }
1234
1235 #[instrument(skip_all)]
1245 pub fn schedule_ranges(
1246 &mut self,
1247 ranges: &[Range<u64>],
1248 filter: &FilterExpression,
1249 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1250 scheduler: Arc<dyn EncodingsIo>,
1251 ) {
1252 self.do_schedule_ranges(
1253 ranges,
1254 filter,
1255 scheduler,
1256 |msg| {
1257 match sink.send(msg) {
1258 Ok(_) => true,
1259 Err(SendError { .. }) => {
1260 debug!(
1263 "schedule_ranges aborting early since decoder appears to have been dropped"
1264 );
1265 false
1266 }
1267 }
1268 },
1269 None,
1270 )
1271 }
1272
1273 #[instrument(skip_all)]
1281 pub fn schedule_range(
1282 &mut self,
1283 range: Range<u64>,
1284 filter: &FilterExpression,
1285 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1286 scheduler: Arc<dyn EncodingsIo>,
1287 ) {
1288 self.schedule_ranges(&[range], filter, sink, scheduler)
1289 }
1290
1291 pub fn schedule_take(
1299 &mut self,
1300 indices: &[u64],
1301 filter: &FilterExpression,
1302 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1303 scheduler: Arc<dyn EncodingsIo>,
1304 ) {
1305 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1306 if indices.is_empty() {
1307 return;
1308 }
1309 trace!("Scheduling take of {} rows", indices.len());
1310 let ranges = Self::indices_to_ranges(indices);
1311 self.schedule_ranges(&ranges, filter, sink, scheduler)
1312 }
1313
1314 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1316 let mut ranges = Vec::new();
1317 let mut start = indices[0];
1318
1319 for window in indices.windows(2) {
1320 if window[1] != window[0] + 1 {
1321 ranges.push(start..window[0] + 1);
1322 start = window[1];
1323 }
1324 }
1325
1326 ranges.push(start..*indices.last().unwrap() + 1);
1327 ranges
1328 }
1329}
1330
1331pub struct ReadBatchTask {
1332 pub task: BoxFuture<'static, Result<RecordBatch>>,
1333 pub num_rows: u32,
1334}
1335
1336pub struct BatchDecodeStream {
1338 context: DecoderContext,
1339 root_decoder: SimpleStructDecoder,
1340 rows_remaining: u64,
1341 rows_per_batch: u32,
1342 rows_scheduled: u64,
1343 rows_drained: u64,
1344 scheduler_exhausted: bool,
1345 emitted_batch_size_warning: Arc<Once>,
1346}
1347
1348impl BatchDecodeStream {
1349 pub fn new(
1359 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1360 rows_per_batch: u32,
1361 num_rows: u64,
1362 root_decoder: SimpleStructDecoder,
1363 ) -> Self {
1364 Self {
1365 context: DecoderContext::new(scheduled),
1366 root_decoder,
1367 rows_remaining: num_rows,
1368 rows_per_batch,
1369 rows_scheduled: 0,
1370 rows_drained: 0,
1371 scheduler_exhausted: false,
1372 emitted_batch_size_warning: Arc::new(Once::new()),
1373 }
1374 }
1375
1376 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1377 if decoder.path.is_empty() {
1378 Ok(())
1380 } else {
1381 self.root_decoder.accept_child(decoder)
1382 }
1383 }
1384
1385 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1386 if self.scheduler_exhausted {
1387 return Ok(self.rows_scheduled);
1388 }
1389 while self.rows_scheduled < scheduled_need {
1390 let next_message = self.context.source.recv().await;
1391 match next_message {
1392 Some(scan_line) => {
1393 let scan_line = scan_line?;
1394 self.rows_scheduled = scan_line.scheduled_so_far;
1395 for message in scan_line.decoders {
1396 self.accept_decoder(message.into_legacy())?;
1397 }
1398 }
1399 None => {
1400 self.scheduler_exhausted = true;
1404 return Ok(self.rows_scheduled);
1405 }
1406 }
1407 }
1408 Ok(scheduled_need)
1409 }
1410
1411 #[instrument(level = "debug", skip_all)]
1412 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1413 trace!(
1414 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1415 self.rows_remaining,
1416 self.rows_drained,
1417 self.rows_scheduled,
1418 );
1419 if self.rows_remaining == 0 {
1420 return Ok(None);
1421 }
1422
1423 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1424 self.rows_remaining -= to_take;
1425
1426 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1427 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1428 if scheduled_need > 0 {
1429 let desired_scheduled = scheduled_need + self.rows_scheduled;
1430 trace!(
1431 "Draining from scheduler (desire at least {} scheduled rows)",
1432 desired_scheduled
1433 );
1434 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1435 if actually_scheduled < desired_scheduled {
1436 let under_scheduled = desired_scheduled - actually_scheduled;
1437 to_take -= under_scheduled;
1438 }
1439 }
1440
1441 if to_take == 0 {
1442 return Ok(None);
1443 }
1444
1445 let loaded_need = self.rows_drained + to_take - 1;
1447 trace!(
1448 "Waiting for I/O (desire at least {} fully loaded rows)",
1449 loaded_need
1450 );
1451 self.root_decoder.wait_for_loaded(loaded_need).await?;
1452
1453 let next_task = self.root_decoder.drain(to_take)?;
1454 self.rows_drained += to_take;
1455 Ok(Some(next_task))
1456 }
1457
1458 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1459 let stream = futures::stream::unfold(self, |mut slf| async move {
1460 let next_task = slf.next_batch_task().await;
1461 let next_task = next_task.transpose().map(|next_task| {
1462 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1463 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1464 let task = async move {
1465 let next_task = next_task?;
1466 tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1470 .await
1471 .map_err(|err| Error::Wrapped {
1472 error: err.into(),
1473 location: location!(),
1474 })?
1475 };
1476 (task, num_rows)
1477 });
1478 next_task.map(|(task, num_rows)| {
1479 debug_assert!(num_rows <= u32::MAX as u64);
1481 let next_task = ReadBatchTask {
1482 task: task.boxed(),
1483 num_rows: num_rows as u32,
1484 };
1485 (next_task, slf)
1486 })
1487 });
1488 stream.boxed()
1489 }
1490}
1491
1492enum RootDecoderMessage {
1495 LoadedPage(LoadedPageShard),
1496 LegacyPage(crate::previous::decoder::DecoderReady),
1497}
1498trait RootDecoderType {
1499 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1500 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1501 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1502}
1503impl RootDecoderType for StructuralStructDecoder {
1504 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1505 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1506 unreachable!()
1507 };
1508 self.accept_page(loaded_page)
1509 }
1510 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1511 self.drain_batch_task(num_rows)
1512 }
1513 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1514 Ok(())
1516 }
1517}
1518impl RootDecoderType for SimpleStructDecoder {
1519 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1520 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1521 unreachable!()
1522 };
1523 self.accept_child(legacy_page)
1524 }
1525 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1526 self.drain(num_rows)
1527 }
1528 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1529 runtime.block_on(self.wait_for_loaded(loaded_need))
1530 }
1531}
1532
1533struct BatchDecodeIterator<T: RootDecoderType> {
1535 messages: VecDeque<Result<DecoderMessage>>,
1536 root_decoder: T,
1537 rows_remaining: u64,
1538 rows_per_batch: u32,
1539 rows_scheduled: u64,
1540 rows_drained: u64,
1541 emitted_batch_size_warning: Arc<Once>,
1542 wait_for_io_runtime: tokio::runtime::Runtime,
1546 schema: Arc<ArrowSchema>,
1547}
1548
1549impl<T: RootDecoderType> BatchDecodeIterator<T> {
1550 pub fn new(
1552 messages: VecDeque<Result<DecoderMessage>>,
1553 rows_per_batch: u32,
1554 num_rows: u64,
1555 root_decoder: T,
1556 schema: Arc<ArrowSchema>,
1557 ) -> Self {
1558 Self {
1559 messages,
1560 root_decoder,
1561 rows_remaining: num_rows,
1562 rows_per_batch,
1563 rows_scheduled: 0,
1564 rows_drained: 0,
1565 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1566 .build()
1567 .unwrap(),
1568 emitted_batch_size_warning: Arc::new(Once::new()),
1569 schema,
1570 }
1571 }
1572
1573 fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1578 match maybe_done(unloaded_page.0) {
1579 MaybeDone::Done(loaded_page) => loaded_page,
1581 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1583 MaybeDone::Gone => unreachable!(),
1584 }
1585 }
1586
1587 #[instrument(skip_all)]
1592 fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1593 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1594 let message = self.messages.pop_front().unwrap()?;
1595 self.rows_scheduled = message.scheduled_so_far;
1596 for decoder_message in message.decoders {
1597 match decoder_message {
1598 MessageType::UnloadedPage(unloaded_page) => {
1599 let loaded_page = self.wait_for_page(unloaded_page)?;
1600 self.root_decoder
1601 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1602 }
1603 MessageType::DecoderReady(decoder_ready) => {
1604 if !decoder_ready.path.is_empty() {
1606 self.root_decoder
1607 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1608 }
1609 }
1610 }
1611 }
1612 }
1613
1614 let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1615
1616 self.root_decoder
1617 .wait(loaded_need, &self.wait_for_io_runtime)?;
1618 Ok(self.rows_scheduled)
1619 }
1620
1621 #[instrument(level = "debug", skip_all)]
1622 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1623 trace!(
1624 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1625 self.rows_remaining,
1626 self.rows_drained,
1627 self.rows_scheduled,
1628 );
1629 if self.rows_remaining == 0 {
1630 return Ok(None);
1631 }
1632
1633 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1634 self.rows_remaining -= to_take;
1635
1636 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1637 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1638 if scheduled_need > 0 {
1639 let desired_scheduled = scheduled_need + self.rows_scheduled;
1640 trace!(
1641 "Draining from scheduler (desire at least {} scheduled rows)",
1642 desired_scheduled
1643 );
1644 let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1645 if actually_scheduled < desired_scheduled {
1646 let under_scheduled = desired_scheduled - actually_scheduled;
1647 to_take -= under_scheduled;
1648 }
1649 }
1650
1651 if to_take == 0 {
1652 return Ok(None);
1653 }
1654
1655 let next_task = self.root_decoder.drain_batch(to_take)?;
1656
1657 self.rows_drained += to_take;
1658
1659 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1660
1661 Ok(Some(batch))
1662 }
1663}
1664
1665impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1666 type Item = ArrowResult<RecordBatch>;
1667
1668 fn next(&mut self) -> Option<Self::Item> {
1669 self.next_batch_task()
1670 .transpose()
1671 .map(|r| r.map_err(ArrowError::from))
1672 }
1673}
1674
1675impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1676 fn schema(&self) -> Arc<ArrowSchema> {
1677 self.schema.clone()
1678 }
1679}
1680
1681pub struct StructuralBatchDecodeStream {
1683 context: DecoderContext,
1684 root_decoder: StructuralStructDecoder,
1685 rows_remaining: u64,
1686 rows_per_batch: u32,
1687 rows_scheduled: u64,
1688 rows_drained: u64,
1689 scheduler_exhausted: bool,
1690 emitted_batch_size_warning: Arc<Once>,
1691}
1692
1693impl StructuralBatchDecodeStream {
1694 pub fn new(
1704 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1705 rows_per_batch: u32,
1706 num_rows: u64,
1707 root_decoder: StructuralStructDecoder,
1708 ) -> Self {
1709 Self {
1710 context: DecoderContext::new(scheduled),
1711 root_decoder,
1712 rows_remaining: num_rows,
1713 rows_per_batch,
1714 rows_scheduled: 0,
1715 rows_drained: 0,
1716 scheduler_exhausted: false,
1717 emitted_batch_size_warning: Arc::new(Once::new()),
1718 }
1719 }
1720
1721 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1722 if self.scheduler_exhausted {
1723 return Ok(self.rows_scheduled);
1724 }
1725 while self.rows_scheduled < scheduled_need {
1726 let next_message = self.context.source.recv().await;
1727 match next_message {
1728 Some(scan_line) => {
1729 let scan_line = scan_line?;
1730 self.rows_scheduled = scan_line.scheduled_so_far;
1731 for message in scan_line.decoders {
1732 let unloaded_page = message.into_structural();
1733 let loaded_page = unloaded_page.0.await?;
1734 self.root_decoder.accept_page(loaded_page)?;
1735 }
1736 }
1737 None => {
1738 self.scheduler_exhausted = true;
1742 return Ok(self.rows_scheduled);
1743 }
1744 }
1745 }
1746 Ok(scheduled_need)
1747 }
1748
1749 #[instrument(level = "debug", skip_all)]
1750 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1751 trace!(
1752 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1753 self.rows_remaining,
1754 self.rows_drained,
1755 self.rows_scheduled,
1756 );
1757 if self.rows_remaining == 0 {
1758 return Ok(None);
1759 }
1760
1761 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1762 self.rows_remaining -= to_take;
1763
1764 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1765 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1766 if scheduled_need > 0 {
1767 let desired_scheduled = scheduled_need + self.rows_scheduled;
1768 trace!(
1769 "Draining from scheduler (desire at least {} scheduled rows)",
1770 desired_scheduled
1771 );
1772 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1773 if actually_scheduled < desired_scheduled {
1774 let under_scheduled = desired_scheduled - actually_scheduled;
1775 to_take -= under_scheduled;
1776 }
1777 }
1778
1779 if to_take == 0 {
1780 return Ok(None);
1781 }
1782
1783 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1784 self.rows_drained += to_take;
1785 Ok(Some(next_task))
1786 }
1787
1788 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1789 let stream = futures::stream::unfold(self, |mut slf| async move {
1790 let next_task = slf.next_batch_task().await;
1791 let next_task = next_task.transpose().map(|next_task| {
1792 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1793 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1794 let task = async move {
1795 let next_task = next_task?;
1796 tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1800 .await
1801 .map_err(|err| Error::Wrapped {
1802 error: err.into(),
1803 location: location!(),
1804 })?
1805 };
1806 (task, num_rows)
1807 });
1808 next_task.map(|(task, num_rows)| {
1809 debug_assert!(num_rows <= u32::MAX as u64);
1811 let next_task = ReadBatchTask {
1812 task: task.boxed(),
1813 num_rows: num_rows as u32,
1814 };
1815 (next_task, slf)
1816 })
1817 });
1818 stream.boxed()
1819 }
1820}
1821
1822#[derive(Debug)]
1823pub enum RequestedRows {
1824 Ranges(Vec<Range<u64>>),
1825 Indices(Vec<u64>),
1826}
1827
1828impl RequestedRows {
1829 pub fn num_rows(&self) -> u64 {
1830 match self {
1831 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1832 Self::Indices(indices) => indices.len() as u64,
1833 }
1834 }
1835
1836 pub fn trim_empty_ranges(mut self) -> Self {
1837 if let Self::Ranges(ranges) = &mut self {
1838 ranges.retain(|r| !r.is_empty());
1839 }
1840 self
1841 }
1842}
1843
1844#[derive(Debug, Clone, Default)]
1846pub struct DecoderConfig {
1847 pub cache_repetition_index: bool,
1849 pub validate_on_decode: bool,
1851}
1852
1853#[derive(Debug, Clone)]
1854pub struct SchedulerDecoderConfig {
1855 pub decoder_plugins: Arc<DecoderPlugins>,
1856 pub batch_size: u32,
1857 pub io: Arc<dyn EncodingsIo>,
1858 pub cache: Arc<LanceCache>,
1859 pub decoder_config: DecoderConfig,
1861}
1862
1863fn check_scheduler_on_drop(
1864 stream: BoxStream<'static, ReadBatchTask>,
1865 scheduler_handle: tokio::task::JoinHandle<()>,
1866) -> BoxStream<'static, ReadBatchTask> {
1867 let mut scheduler_handle = Some(scheduler_handle);
1871 let check_scheduler = stream::unfold((), move |_| {
1872 let handle = scheduler_handle.take();
1873 async move {
1874 if let Some(handle) = handle {
1875 handle.await.unwrap();
1876 }
1877 None
1878 }
1879 });
1880 stream.chain(check_scheduler).boxed()
1881}
1882
1883pub fn create_decode_stream(
1884 schema: &Schema,
1885 num_rows: u64,
1886 batch_size: u32,
1887 is_structural: bool,
1888 should_validate: bool,
1889 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1890) -> Result<BoxStream<'static, ReadBatchTask>> {
1891 if is_structural {
1892 let arrow_schema = ArrowSchema::from(schema);
1893 let structural_decoder = StructuralStructDecoder::new(
1894 arrow_schema.fields,
1895 should_validate,
1896 true,
1897 )?;
1898 Ok(
1899 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder)
1900 .into_stream(),
1901 )
1902 } else {
1903 let arrow_schema = ArrowSchema::from(schema);
1904 let root_fields = arrow_schema.fields;
1905
1906 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1907 Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
1908 }
1909}
1910
1911pub fn create_decode_iterator(
1915 schema: &Schema,
1916 num_rows: u64,
1917 batch_size: u32,
1918 should_validate: bool,
1919 is_structural: bool,
1920 messages: VecDeque<Result<DecoderMessage>>,
1921) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1922 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1923 let root_fields = arrow_schema.fields.clone();
1924 if is_structural {
1925 let simple_struct_decoder =
1926 StructuralStructDecoder::new(root_fields, should_validate, true)?;
1927 Ok(Box::new(BatchDecodeIterator::new(
1928 messages,
1929 batch_size,
1930 num_rows,
1931 simple_struct_decoder,
1932 arrow_schema,
1933 )))
1934 } else {
1935 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1936 Ok(Box::new(BatchDecodeIterator::new(
1937 messages,
1938 batch_size,
1939 num_rows,
1940 root_decoder,
1941 arrow_schema,
1942 )))
1943 }
1944}
1945
1946fn create_scheduler_decoder(
1947 column_infos: Vec<Arc<ColumnInfo>>,
1948 requested_rows: RequestedRows,
1949 filter: FilterExpression,
1950 column_indices: Vec<u32>,
1951 target_schema: Arc<Schema>,
1952 config: SchedulerDecoderConfig,
1953) -> Result<BoxStream<'static, ReadBatchTask>> {
1954 let num_rows = requested_rows.num_rows();
1955
1956 let is_structural = column_infos[0].is_structural();
1957
1958 let (tx, rx) = mpsc::unbounded_channel();
1959
1960 let decode_stream = create_decode_stream(
1961 &target_schema,
1962 num_rows,
1963 config.batch_size,
1964 is_structural,
1965 config.decoder_config.validate_on_decode,
1966 rx,
1967 )?;
1968
1969 let scheduler_handle = tokio::task::spawn(async move {
1970 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1971 target_schema.as_ref(),
1972 &column_indices,
1973 &column_infos,
1974 &vec![],
1975 num_rows,
1976 config.decoder_plugins,
1977 config.io.clone(),
1978 config.cache,
1979 &filter,
1980 &config.decoder_config,
1981 )
1982 .await
1983 {
1984 Ok(scheduler) => scheduler,
1985 Err(e) => {
1986 let _ = tx.send(Err(e));
1987 return;
1988 }
1989 };
1990
1991 match requested_rows {
1992 RequestedRows::Ranges(ranges) => {
1993 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1994 }
1995 RequestedRows::Indices(indices) => {
1996 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1997 }
1998 }
1999 });
2000
2001 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2002}
2003
2004pub fn schedule_and_decode(
2010 column_infos: Vec<Arc<ColumnInfo>>,
2011 requested_rows: RequestedRows,
2012 filter: FilterExpression,
2013 column_indices: Vec<u32>,
2014 target_schema: Arc<Schema>,
2015 config: SchedulerDecoderConfig,
2016) -> BoxStream<'static, ReadBatchTask> {
2017 if requested_rows.num_rows() == 0 {
2018 return stream::empty().boxed();
2019 }
2020
2021 let requested_rows = requested_rows.trim_empty_ranges();
2024
2025 let io = config.io.clone();
2026
2027 match create_scheduler_decoder(
2031 column_infos,
2032 requested_rows,
2033 filter,
2034 column_indices,
2035 target_schema,
2036 config,
2037 ) {
2038 Ok(stream) => stream.finally(move || drop(io)).boxed(),
2041 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2043 num_rows: 0,
2044 task: std::future::ready(Err(e)).boxed(),
2045 }))
2046 .boxed(),
2047 }
2048}
2049
2050pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2051 tokio::runtime::Builder::new_current_thread()
2052 .build()
2053 .unwrap()
2054});
2055
2056pub fn schedule_and_decode_blocking(
2071 column_infos: Vec<Arc<ColumnInfo>>,
2072 requested_rows: RequestedRows,
2073 filter: FilterExpression,
2074 column_indices: Vec<u32>,
2075 target_schema: Arc<Schema>,
2076 config: SchedulerDecoderConfig,
2077) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2078 if requested_rows.num_rows() == 0 {
2079 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2080 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2081 }
2082
2083 let num_rows = requested_rows.num_rows();
2084 let is_structural = column_infos[0].is_structural();
2085
2086 let (tx, mut rx) = mpsc::unbounded_channel();
2087
2088 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2091 target_schema.as_ref(),
2092 &column_indices,
2093 &column_infos,
2094 &vec![],
2095 num_rows,
2096 config.decoder_plugins,
2097 config.io.clone(),
2098 config.cache,
2099 &filter,
2100 &config.decoder_config,
2101 ))?;
2102
2103 match requested_rows {
2105 RequestedRows::Ranges(ranges) => {
2106 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2107 }
2108 RequestedRows::Indices(indices) => {
2109 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2110 }
2111 }
2112
2113 let mut messages = Vec::new();
2115 while rx
2116 .recv_many(&mut messages, usize::MAX)
2117 .now_or_never()
2118 .unwrap()
2119 != 0
2120 {}
2121
2122 let decode_iterator = create_decode_iterator(
2124 &target_schema,
2125 num_rows,
2126 config.batch_size,
2127 config.decoder_config.validate_on_decode,
2128 is_structural,
2129 messages.into(),
2130 )?;
2131
2132 Ok(decode_iterator)
2133}
2134
2135pub trait PrimitivePageDecoder: Send + Sync {
2147 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2179}
2180
2181pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2190 fn schedule_ranges(
2202 &self,
2203 ranges: &[Range<u64>],
2204 scheduler: &Arc<dyn EncodingsIo>,
2205 top_level_row: u64,
2206 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2207}
2208
2209pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2211 fn advance(&mut self, num_rows: u64);
2212 fn current_priority(&self) -> u64;
2213 fn box_clone(&self) -> Box<dyn PriorityRange>;
2214}
2215
2216#[derive(Debug)]
2219pub struct SimplePriorityRange {
2220 priority: u64,
2221}
2222
2223impl SimplePriorityRange {
2224 fn new(priority: u64) -> Self {
2225 Self { priority }
2226 }
2227}
2228
2229impl PriorityRange for SimplePriorityRange {
2230 fn advance(&mut self, num_rows: u64) {
2231 self.priority += num_rows;
2232 }
2233
2234 fn current_priority(&self) -> u64 {
2235 self.priority
2236 }
2237
2238 fn box_clone(&self) -> Box<dyn PriorityRange> {
2239 Box::new(Self {
2240 priority: self.priority,
2241 })
2242 }
2243}
2244
2245pub struct ListPriorityRange {
2258 base: Box<dyn PriorityRange>,
2259 offsets: Arc<[u64]>,
2260 cur_index_into_offsets: usize,
2261 cur_position: u64,
2262}
2263
2264impl ListPriorityRange {
2265 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2266 Self {
2267 base,
2268 offsets,
2269 cur_index_into_offsets: 0,
2270 cur_position: 0,
2271 }
2272 }
2273}
2274
2275impl std::fmt::Debug for ListPriorityRange {
2276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2277 f.debug_struct("ListPriorityRange")
2278 .field("base", &self.base)
2279 .field("offsets.len()", &self.offsets.len())
2280 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2281 .field("cur_position", &self.cur_position)
2282 .finish()
2283 }
2284}
2285
2286impl PriorityRange for ListPriorityRange {
2287 fn advance(&mut self, num_rows: u64) {
2288 self.cur_position += num_rows;
2291 let mut idx_into_offsets = self.cur_index_into_offsets;
2292 while idx_into_offsets + 1 < self.offsets.len()
2293 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2294 {
2295 idx_into_offsets += 1;
2296 }
2297 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2298 self.cur_index_into_offsets = idx_into_offsets;
2299 self.base.advance(base_rows_advanced as u64);
2300 }
2301
2302 fn current_priority(&self) -> u64 {
2303 self.base.current_priority()
2304 }
2305
2306 fn box_clone(&self) -> Box<dyn PriorityRange> {
2307 Box::new(Self {
2308 base: self.base.box_clone(),
2309 offsets: self.offsets.clone(),
2310 cur_index_into_offsets: self.cur_index_into_offsets,
2311 cur_position: self.cur_position,
2312 })
2313 }
2314}
2315
2316pub struct SchedulerContext {
2318 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2319 io: Arc<dyn EncodingsIo>,
2320 cache: Arc<LanceCache>,
2321 name: String,
2322 path: Vec<u32>,
2323 path_names: Vec<String>,
2324}
2325
2326pub struct ScopedSchedulerContext<'a> {
2327 pub context: &'a mut SchedulerContext,
2328}
2329
2330impl<'a> ScopedSchedulerContext<'a> {
2331 pub fn pop(self) -> &'a mut SchedulerContext {
2332 self.context.pop();
2333 self.context
2334 }
2335}
2336
2337impl SchedulerContext {
2338 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2339 Self {
2340 io,
2341 cache,
2342 recv: None,
2343 name: "".to_string(),
2344 path: Vec::new(),
2345 path_names: Vec::new(),
2346 }
2347 }
2348
2349 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2350 &self.io
2351 }
2352
2353 pub fn cache(&self) -> &Arc<LanceCache> {
2354 &self.cache
2355 }
2356
2357 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2358 self.path.push(index);
2359 self.path_names.push(name.to_string());
2360 ScopedSchedulerContext { context: self }
2361 }
2362
2363 pub fn pop(&mut self) {
2364 self.path.pop();
2365 self.path_names.pop();
2366 }
2367
2368 pub fn path_name(&self) -> String {
2369 let path = self.path_names.join("/");
2370 if self.recv.is_some() {
2371 format!("TEMP({}){}", self.name, path)
2372 } else {
2373 format!("ROOT{}", path)
2374 }
2375 }
2376
2377 pub fn current_path(&self) -> VecDeque<u32> {
2378 VecDeque::from_iter(self.path.iter().copied())
2379 }
2380
2381 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2382 pub fn locate_decoder(
2383 &mut self,
2384 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2385 ) -> crate::previous::decoder::DecoderReady {
2386 trace!(
2387 "Scheduling decoder of type {:?} for {:?}",
2388 decoder.data_type(),
2389 self.path,
2390 );
2391 crate::previous::decoder::DecoderReady {
2392 decoder,
2393 path: self.current_path(),
2394 }
2395 }
2396}
2397
2398pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2399
2400impl std::fmt::Debug for UnloadedPageShard {
2401 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2402 f.debug_struct("UnloadedPage").finish()
2403 }
2404}
2405
2406#[derive(Debug)]
2407pub struct ScheduledScanLine {
2408 pub rows_scheduled: u64,
2409 pub decoders: Vec<MessageType>,
2410}
2411
2412pub trait StructuralSchedulingJob: std::fmt::Debug {
2413 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2420}
2421
2422pub struct FilterExpression(pub Bytes);
2430
2431impl FilterExpression {
2432 pub fn no_filter() -> Self {
2437 Self(Bytes::new())
2438 }
2439
2440 pub fn is_noop(&self) -> bool {
2442 self.0.is_empty()
2443 }
2444}
2445
2446pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2447 fn initialize<'a>(
2448 &'a mut self,
2449 filter: &'a FilterExpression,
2450 context: &'a SchedulerContext,
2451 ) -> BoxFuture<'a, Result<()>>;
2452 fn schedule_ranges<'a>(
2453 &'a self,
2454 ranges: &[Range<u64>],
2455 filter: &FilterExpression,
2456 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2457}
2458
2459pub trait DecodeArrayTask: Send {
2461 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2463}
2464
2465impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2466 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2467 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2468 }
2469}
2470
2471pub struct NextDecodeTask {
2476 pub task: Box<dyn DecodeArrayTask>,
2478 pub num_rows: u64,
2480}
2481
2482impl NextDecodeTask {
2483 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2488 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2489 let struct_arr = self.task.decode();
2490 match struct_arr {
2491 Ok(struct_arr) => {
2492 let batch = RecordBatch::from(struct_arr.as_struct());
2493 let size_bytes = batch.get_array_memory_size() as u64;
2494 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2495 emitted_batch_size_warning.call_once(|| {
2496 let size_mb = size_bytes / 1024 / 1024;
2497 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2498 });
2499 }
2500 Ok(batch)
2501 }
2502 Err(e) => {
2503 let e = Error::Internal {
2504 message: format!("Error decoding batch: {}", e),
2505 location: location!(),
2506 };
2507 Err(e)
2508 }
2509 }
2510 }
2511}
2512
2513#[derive(Debug)]
2517pub enum MessageType {
2518 DecoderReady(crate::previous::decoder::DecoderReady),
2523 UnloadedPage(UnloadedPageShard),
2527}
2528
2529impl MessageType {
2530 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2531 match self {
2532 Self::DecoderReady(decoder) => decoder,
2533 Self::UnloadedPage(_) => {
2534 panic!("Expected DecoderReady but got UnloadedPage")
2535 }
2536 }
2537 }
2538
2539 pub fn into_structural(self) -> UnloadedPageShard {
2540 match self {
2541 Self::UnloadedPage(unloaded) => unloaded,
2542 Self::DecoderReady(_) => {
2543 panic!("Expected UnloadedPage but got DecoderReady")
2544 }
2545 }
2546 }
2547}
2548
2549pub struct DecoderMessage {
2550 pub scheduled_so_far: u64,
2551 pub decoders: Vec<MessageType>,
2552}
2553
2554pub struct DecoderContext {
2555 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2556}
2557
2558impl DecoderContext {
2559 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2560 Self { source }
2561 }
2562}
2563
2564pub struct DecodedPage {
2565 pub data: DataBlock,
2566 pub repdef: RepDefUnraveler,
2567}
2568
2569pub trait DecodePageTask: Send + std::fmt::Debug {
2570 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2572}
2573
2574pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2575 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2576 fn num_rows(&self) -> u64;
2577}
2578
2579#[derive(Debug)]
2580pub struct LoadedPageShard {
2581 pub decoder: Box<dyn StructuralPageDecoder>,
2583 pub path: VecDeque<u32>,
2602}
2603
2604pub struct DecodedArray {
2605 pub array: ArrayRef,
2606 pub repdef: CompositeRepDefUnraveler,
2607}
2608
2609pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2610 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2611}
2612
2613pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2614 fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2619 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2621 fn data_type(&self) -> &DataType;
2623}
2624
2625#[derive(Debug, Default)]
2626pub struct DecoderPlugins {}
2627
2628pub async fn decode_batch(
2630 batch: &EncodedBatch,
2631 filter: &FilterExpression,
2632 decoder_plugins: Arc<DecoderPlugins>,
2633 should_validate: bool,
2634 version: LanceFileVersion,
2635 cache: Option<Arc<LanceCache>>,
2636) -> Result<RecordBatch> {
2637 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2642 let cache = if let Some(cache) = cache {
2643 cache
2644 } else {
2645 Arc::new(lance_core::cache::LanceCache::with_capacity(
2646 128 * 1024 * 1024,
2647 ))
2648 };
2649 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2650 batch.schema.as_ref(),
2651 &batch.top_level_columns,
2652 &batch.page_table,
2653 &vec![],
2654 batch.num_rows,
2655 decoder_plugins,
2656 io_scheduler.clone(),
2657 cache,
2658 filter,
2659 &DecoderConfig::default(),
2660 )
2661 .await?;
2662 let (tx, rx) = unbounded_channel();
2663 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2664 let is_structural = version >= LanceFileVersion::V2_1;
2665 let mut decode_stream = create_decode_stream(
2666 &batch.schema,
2667 batch.num_rows,
2668 batch.num_rows as u32,
2669 is_structural,
2670 should_validate,
2671 rx,
2672 )?;
2673 decode_stream.next().await.unwrap().task.await
2674}
2675
2676#[cfg(test)]
2677mod tests {
2679 use super::*;
2680
2681 #[test]
2682 fn test_coalesce_indices_to_ranges_with_single_index() {
2683 let indices = vec![1];
2684 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2685 assert_eq!(ranges, vec![1..2]);
2686 }
2687
2688 #[test]
2689 fn test_coalesce_indices_to_ranges() {
2690 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2691 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2692 assert_eq!(ranges, vec![1..10]);
2693 }
2694
2695 #[test]
2696 fn test_coalesce_indices_to_ranges_with_gaps() {
2697 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2698 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2699 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2700 }
2701}