1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once, OnceLock};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{BoxFuture, MaybeDone, maybe_done};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{BLOB_DESC_LANCE_FIELD, Field, Schema};
229use lance_core::utils::futures::FinallyStreamExt;
230use lance_core::utils::parse::parse_env_as_bool;
231use log::{debug, trace, warn};
232use tokio::sync::mpsc::error::SendError;
233use tokio::sync::mpsc::{self, unbounded_channel};
234
235use lance_core::error::LanceOptionExt;
236use lance_core::{ArrowResult, Error, Result};
237use tracing::instrument;
238
239use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
240use crate::data::DataBlock;
241use crate::encoder::EncodedBatch;
242use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
243use crate::encodings::logical::list::StructuralListScheduler;
244use crate::encodings::logical::map::StructuralMapScheduler;
245use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
246use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
247use crate::format::pb::{self, column_encoding};
248use crate::format::pb21;
249use crate::previous::decoder::LogicalPageDecoder;
250use crate::previous::encodings::logical::list::OffsetPageInfo;
251use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
252use crate::previous::encodings::logical::{
253 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
254 primitive::PrimitiveFieldScheduler,
255};
256use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
257use crate::version::LanceFileVersion;
258use crate::{BufferScheduler, EncodingsIo};
259
260const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
262const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
263 "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
264const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
265
266fn default_cache_repetition_index() -> bool {
267 static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
268 *DEFAULT_CACHE_REPETITION_INDEX
269 .get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
270}
271
272#[derive(Debug)]
279pub enum PageEncoding {
280 Legacy(pb::ArrayEncoding),
281 Structural(pb21::PageLayout),
282}
283
284impl PageEncoding {
285 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
286 match self {
287 Self::Legacy(enc) => enc,
288 Self::Structural(_) => panic!("Expected a legacy encoding"),
289 }
290 }
291
292 pub fn as_structural(&self) -> &pb21::PageLayout {
293 match self {
294 Self::Structural(enc) => enc,
295 Self::Legacy(_) => panic!("Expected a structural encoding"),
296 }
297 }
298
299 pub fn is_structural(&self) -> bool {
300 matches!(self, Self::Structural(_))
301 }
302}
303
304#[derive(Debug)]
308pub struct PageInfo {
309 pub num_rows: u64,
311 pub priority: u64,
315 pub encoding: PageEncoding,
317 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
319}
320
321#[derive(Debug, Clone)]
325pub struct ColumnInfo {
326 pub index: u32,
328 pub page_infos: Arc<[PageInfo]>,
330 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
332 pub encoding: pb::ColumnEncoding,
333}
334
335impl ColumnInfo {
336 pub fn new(
338 index: u32,
339 page_infos: Arc<[PageInfo]>,
340 buffer_offsets_and_sizes: Vec<(u64, u64)>,
341 encoding: pb::ColumnEncoding,
342 ) -> Self {
343 Self {
344 index,
345 page_infos,
346 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
347 encoding,
348 }
349 }
350
351 pub fn is_structural(&self) -> bool {
352 self.page_infos
353 .first()
355 .map(|page| page.encoding.is_structural())
356 .unwrap_or(false)
357 }
358}
359
360enum RootScheduler {
361 Structural(Box<dyn StructuralFieldScheduler>),
362 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
363}
364
365impl RootScheduler {
366 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
367 match self {
368 Self::Structural(_) => panic!("Expected a legacy scheduler"),
369 Self::Legacy(s) => s,
370 }
371 }
372
373 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
374 match self {
375 Self::Structural(s) => s.as_ref(),
376 Self::Legacy(_) => panic!("Expected a structural scheduler"),
377 }
378 }
379}
380
381pub struct DecodeBatchScheduler {
403 root_scheduler: RootScheduler,
404 pub root_fields: Fields,
405 cache: Arc<LanceCache>,
406}
407
408pub struct ColumnInfoIter<'a> {
409 column_infos: Vec<Arc<ColumnInfo>>,
410 column_indices: &'a [u32],
411 column_info_pos: usize,
412 column_indices_pos: usize,
413}
414
415impl<'a> ColumnInfoIter<'a> {
416 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
417 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
418 Self {
419 column_infos,
420 column_indices,
421 column_info_pos: initial_pos,
422 column_indices_pos: 0,
423 }
424 }
425
426 pub fn peek(&self) -> &Arc<ColumnInfo> {
427 &self.column_infos[self.column_info_pos]
428 }
429
430 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
431 let column_info = self.column_infos[self.column_info_pos].clone();
432 let transformed = transform(column_info);
433 self.column_infos[self.column_info_pos] = transformed;
434 }
435
436 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
437 self.next().ok_or_else(|| {
438 Error::invalid_input(
439 "there were more fields in the schema than provided column indices / infos",
440 )
441 })
442 }
443
444 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
445 if self.column_info_pos < self.column_infos.len() {
446 let info = &self.column_infos[self.column_info_pos];
447 self.column_info_pos += 1;
448 Some(info)
449 } else {
450 None
451 }
452 }
453
454 pub(crate) fn next_top_level(&mut self) {
455 self.column_indices_pos += 1;
456 if self.column_indices_pos < self.column_indices.len() {
457 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
458 } else {
459 self.column_info_pos = self.column_infos.len();
460 }
461 }
462}
463
464#[derive(Clone, Copy, Debug)]
466pub struct FileBuffers<'a> {
467 pub positions_and_sizes: &'a [(u64, u64)],
468}
469
470#[derive(Clone, Copy, Debug)]
472pub struct ColumnBuffers<'a, 'b> {
473 pub file_buffers: FileBuffers<'a>,
474 pub positions_and_sizes: &'b [(u64, u64)],
475}
476
477#[derive(Clone, Copy, Debug)]
479pub struct PageBuffers<'a, 'b, 'c> {
480 pub column_buffers: ColumnBuffers<'a, 'b>,
481 pub positions_and_sizes: &'c [(u64, u64)],
482}
483
484#[derive(Debug)]
486pub struct CoreFieldDecoderStrategy {
487 pub validate_data: bool,
488 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
489 pub cache_repetition_index: bool,
490}
491
492impl Default for CoreFieldDecoderStrategy {
493 fn default() -> Self {
494 Self {
495 validate_data: false,
496 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
497 cache_repetition_index: false,
498 }
499 }
500}
501
502impl CoreFieldDecoderStrategy {
503 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
505 self.cache_repetition_index = cache_repetition_index;
506 self
507 }
508
509 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
511 Self {
512 validate_data: config.validate_on_decode,
513 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
514 cache_repetition_index: config.cache_repetition_index,
515 }
516 }
517
518 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
521 let column_encoding = column_info
522 .encoding
523 .column_encoding
524 .as_ref()
525 .ok_or_else(|| {
526 Error::invalid_input(format!(
527 "the column at index {} was missing a ColumnEncoding",
528 column_info.index
529 ))
530 })?;
531 if matches!(
532 column_encoding,
533 pb::column_encoding::ColumnEncoding::Values(_)
534 ) {
535 Ok(())
536 } else {
537 Err(Error::invalid_input(format!(
538 "the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
539 column_info.index, field_name, column_encoding
540 )))
541 }
542 }
543
544 fn is_structural_primitive(data_type: &DataType) -> bool {
545 if data_type.is_primitive() {
546 true
547 } else {
548 match data_type {
549 DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
551 DataType::Boolean
552 | DataType::Null
553 | DataType::FixedSizeBinary(_)
554 | DataType::Binary
555 | DataType::LargeBinary
556 | DataType::Utf8
557 | DataType::LargeUtf8 => true,
558 DataType::FixedSizeList(inner, _) => {
559 Self::is_structural_primitive(inner.data_type())
560 }
561 _ => false,
562 }
563 }
564 }
565
566 fn is_primitive_legacy(data_type: &DataType) -> bool {
567 if data_type.is_primitive() {
568 true
569 } else {
570 match data_type {
571 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
573 DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
574 _ => false,
575 }
576 }
577 }
578
579 fn create_primitive_scheduler(
580 &self,
581 field: &Field,
582 column: &ColumnInfo,
583 buffers: FileBuffers,
584 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
585 Self::ensure_values_encoded(column, &field.name)?;
586 let column_buffers = ColumnBuffers {
588 file_buffers: buffers,
589 positions_and_sizes: &column.buffer_offsets_and_sizes,
590 };
591 Ok(Box::new(PrimitiveFieldScheduler::new(
592 column.index,
593 field.data_type(),
594 column.page_infos.clone(),
595 column_buffers,
596 self.validate_data,
597 )))
598 }
599
600 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
602 Self::ensure_values_encoded(column_info, field_name)?;
603 if column_info.page_infos.len() != 1 {
604 return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
605 }
606 let encoding = &column_info.page_infos[0].encoding;
607 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
608 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
609 _ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
610 }
611 }
612
613 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
614 let encoding = &column_info.page_infos[0].encoding;
615 matches!(
616 encoding.as_legacy().array_encoding.as_ref().unwrap(),
617 pb::array_encoding::ArrayEncoding::PackedStruct(_)
618 )
619 }
620
621 fn create_list_scheduler(
622 &self,
623 list_field: &Field,
624 column_infos: &mut ColumnInfoIter,
625 buffers: FileBuffers,
626 offsets_column: &ColumnInfo,
627 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
628 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
629 let offsets_column_buffers = ColumnBuffers {
630 file_buffers: buffers,
631 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
632 };
633 let items_scheduler =
634 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
635
636 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
637 .page_infos
638 .iter()
639 .filter(|offsets_page| offsets_page.num_rows > 0)
640 .map(|offsets_page| {
641 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
642 &offsets_page.encoding.as_legacy().array_encoding
643 {
644 let inner = PageInfo {
645 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
646 encoding: PageEncoding::Legacy(
647 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
648 ),
649 num_rows: offsets_page.num_rows,
650 priority: 0,
651 };
652 (
653 inner,
654 OffsetPageInfo {
655 offsets_in_page: offsets_page.num_rows,
656 null_offset_adjustment: list_encoding.null_offset_adjustment,
657 num_items_referenced_by_page: list_encoding.num_items,
658 },
659 )
660 } else {
661 panic!("Expected a list column");
663 }
664 })
665 .unzip();
666 let inner = Arc::new(PrimitiveFieldScheduler::new(
667 offsets_column.index,
668 DataType::UInt64,
669 Arc::from(inner_infos.into_boxed_slice()),
670 offsets_column_buffers,
671 self.validate_data,
672 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
673 let items_field = match list_field.data_type() {
674 DataType::List(inner) => inner,
675 DataType::LargeList(inner) => inner,
676 _ => unreachable!(),
677 };
678 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
679 DataType::Int32
680 } else {
681 DataType::Int64
682 };
683 Ok(Box::new(ListFieldScheduler::new(
684 inner,
685 items_scheduler.into(),
686 items_field,
687 offset_type,
688 null_offset_adjustments,
689 )))
690 }
691
692 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
693 if let column_encoding::ColumnEncoding::Blob(blob) =
694 column_info.encoding.column_encoding.as_ref().unwrap()
695 {
696 let mut column_info = column_info.clone();
697 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
698 Some(column_info)
699 } else {
700 None
701 }
702 }
703
704 fn create_structural_field_scheduler(
705 &self,
706 field: &Field,
707 column_infos: &mut ColumnInfoIter,
708 ) -> Result<Box<dyn StructuralFieldScheduler>> {
709 let data_type = field.data_type();
710 if Self::is_structural_primitive(&data_type) {
711 let column_info = column_infos.expect_next()?;
712 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
713 column_info.as_ref(),
714 self.decompressor_strategy.as_ref(),
715 self.cache_repetition_index,
716 field,
717 )?);
718
719 column_infos.next_top_level();
721
722 return Ok(scheduler);
723 }
724 match &data_type {
725 DataType::Struct(fields) => {
726 if field.is_packed_struct() {
727 let column_info = column_infos.expect_next()?;
729 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
730 column_info.as_ref(),
731 self.decompressor_strategy.as_ref(),
732 self.cache_repetition_index,
733 field,
734 )?);
735
736 column_infos.next_top_level();
738
739 return Ok(scheduler);
740 }
741 if field.is_blob() {
743 let column_info = column_infos.peek();
744 if column_info.page_infos.iter().any(|page| {
745 matches!(
746 page.encoding,
747 PageEncoding::Structural(pb21::PageLayout {
748 layout: Some(pb21::page_layout::Layout::BlobLayout(_))
749 })
750 )
751 }) {
752 let column_info = column_infos.expect_next()?;
753 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
754 column_info.as_ref(),
755 self.decompressor_strategy.as_ref(),
756 self.cache_repetition_index,
757 field,
758 )?);
759 column_infos.next_top_level();
760 return Ok(scheduler);
761 }
762 }
763
764 let mut child_schedulers = Vec::with_capacity(field.children.len());
765 for field in field.children.iter() {
766 let field_scheduler =
767 self.create_structural_field_scheduler(field, column_infos)?;
768 child_schedulers.push(field_scheduler);
769 }
770
771 let fields = fields.clone();
772 Ok(
773 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
774 as Box<dyn StructuralFieldScheduler>,
775 )
776 }
777 DataType::List(_) | DataType::LargeList(_) => {
778 let child = field.children.first().expect_ok()?;
779 let child_scheduler =
780 self.create_structural_field_scheduler(child, column_infos)?;
781 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
782 as Box<dyn StructuralFieldScheduler>)
783 }
784 DataType::FixedSizeList(inner, dimension)
785 if matches!(inner.data_type(), DataType::Struct(_)) =>
786 {
787 let child = field.children.first().expect_ok()?;
788 let child_scheduler =
789 self.create_structural_field_scheduler(child, column_infos)?;
790 Ok(Box::new(StructuralFixedSizeListScheduler::new(
791 child_scheduler,
792 *dimension,
793 )) as Box<dyn StructuralFieldScheduler>)
794 }
795 DataType::Map(_, keys_sorted) => {
796 if *keys_sorted {
800 return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
801 }
802 let entries_child = field.children.first().expect_ok()?;
803 let child_scheduler =
804 self.create_structural_field_scheduler(entries_child, column_infos)?;
805 Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
806 as Box<dyn StructuralFieldScheduler>)
807 }
808 _ => todo!("create_structural_field_scheduler for {}", data_type),
809 }
810 }
811
812 fn create_legacy_field_scheduler(
813 &self,
814 field: &Field,
815 column_infos: &mut ColumnInfoIter,
816 buffers: FileBuffers,
817 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
818 let data_type = field.data_type();
819 if Self::is_primitive_legacy(&data_type) {
820 let column_info = column_infos.expect_next()?;
821 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
822 return Ok(scheduler);
823 } else if data_type.is_binary_like() {
824 let column_info = column_infos.expect_next()?.clone();
825 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
827 let desc_scheduler =
828 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
829 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
830 return Ok(blob_scheduler);
831 }
832 if let Some(page_info) = column_info.page_infos.first() {
833 if matches!(
834 page_info.encoding.as_legacy(),
835 pb::ArrayEncoding {
836 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
837 }
838 ) {
839 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
840 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
841 } else {
842 DataType::LargeList(Arc::new(ArrowField::new(
843 "item",
844 DataType::UInt8,
845 false,
846 )))
847 };
848 let list_field = Field::try_from(ArrowField::new(
849 field.name.clone(),
850 list_type,
851 field.nullable,
852 ))
853 .unwrap();
854 let list_scheduler = self.create_list_scheduler(
855 &list_field,
856 column_infos,
857 buffers,
858 &column_info,
859 )?;
860 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
861 list_scheduler.into(),
862 field.data_type(),
863 ));
864 return Ok(binary_scheduler);
865 } else {
866 let scheduler =
867 self.create_primitive_scheduler(field, &column_info, buffers)?;
868 return Ok(scheduler);
869 }
870 } else {
871 return self.create_primitive_scheduler(field, &column_info, buffers);
872 }
873 }
874 match &data_type {
875 DataType::FixedSizeList(inner, _dimension) => {
876 if Self::is_primitive_legacy(inner.data_type()) {
879 let primitive_col = column_infos.expect_next()?;
880 let scheduler =
881 self.create_primitive_scheduler(field, primitive_col, buffers)?;
882 Ok(scheduler)
883 } else {
884 todo!()
885 }
886 }
887 DataType::Dictionary(_key_type, value_type) => {
888 if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
889 let primitive_col = column_infos.expect_next()?;
890 let scheduler =
891 self.create_primitive_scheduler(field, primitive_col, buffers)?;
892 Ok(scheduler)
893 } else {
894 Err(Error::not_supported_source(
895 format!(
896 "No way to decode into a dictionary field of type {}",
897 value_type
898 )
899 .into(),
900 ))
901 }
902 }
903 DataType::List(_) | DataType::LargeList(_) => {
904 let offsets_column = column_infos.expect_next()?.clone();
905 column_infos.next_top_level();
906 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
907 }
908 DataType::Struct(fields) => {
909 let column_info = column_infos.expect_next()?;
910
911 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
913 return self.create_primitive_scheduler(field, &blob_col, buffers);
915 }
916
917 if Self::check_packed_struct(column_info) {
918 self.create_primitive_scheduler(field, column_info, buffers)
920 } else {
921 Self::check_simple_struct(column_info, &field.name).unwrap();
923 let num_rows = column_info
924 .page_infos
925 .iter()
926 .map(|page| page.num_rows)
927 .sum();
928 let mut child_schedulers = Vec::with_capacity(field.children.len());
929 for field in &field.children {
930 column_infos.next_top_level();
931 let field_scheduler =
932 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
933 child_schedulers.push(Arc::from(field_scheduler));
934 }
935
936 let fields = fields.clone();
937 Ok(Box::new(SimpleStructScheduler::new(
938 child_schedulers,
939 fields,
940 num_rows,
941 )))
942 }
943 }
944 _ => todo!(),
946 }
947 }
948}
949
950fn root_column(num_rows: u64) -> ColumnInfo {
952 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
953 let final_page_num_rows = num_rows % (u32::MAX as u64);
954 let root_pages = (0..num_root_pages)
955 .map(|i| PageInfo {
956 num_rows: if i == num_root_pages - 1 {
957 final_page_num_rows
958 } else {
959 u64::MAX
960 },
961 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
962 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
963 pb::SimpleStruct {},
964 )),
965 }),
966 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
968 })
969 .collect::<Vec<_>>();
970 ColumnInfo {
971 buffer_offsets_and_sizes: Arc::new([]),
972 encoding: pb::ColumnEncoding {
973 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
974 },
975 index: u32::MAX,
976 page_infos: Arc::from(root_pages),
977 }
978}
979
980pub enum RootDecoder {
981 Structural(StructuralStructDecoder),
982 Legacy(SimpleStructDecoder),
983}
984
985impl RootDecoder {
986 pub fn into_structural(self) -> StructuralStructDecoder {
987 match self {
988 Self::Structural(decoder) => decoder,
989 Self::Legacy(_) => panic!("Expected a structural decoder"),
990 }
991 }
992
993 pub fn into_legacy(self) -> SimpleStructDecoder {
994 match self {
995 Self::Legacy(decoder) => decoder,
996 Self::Structural(_) => panic!("Expected a legacy decoder"),
997 }
998 }
999}
1000
1001impl DecodeBatchScheduler {
1002 #[allow(clippy::too_many_arguments)]
1005 pub async fn try_new<'a>(
1006 schema: &'a Schema,
1007 column_indices: &[u32],
1008 column_infos: &[Arc<ColumnInfo>],
1009 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1010 num_rows: u64,
1011 _decoder_plugins: Arc<DecoderPlugins>,
1012 io: Arc<dyn EncodingsIo>,
1013 cache: Arc<LanceCache>,
1014 filter: &FilterExpression,
1015 decoder_config: &DecoderConfig,
1016 ) -> Result<Self> {
1017 assert!(num_rows > 0);
1018 let buffers = FileBuffers {
1019 positions_and_sizes: file_buffer_positions_and_sizes,
1020 };
1021 let arrow_schema = ArrowSchema::from(schema);
1022 let root_fields = arrow_schema.fields().clone();
1023 let root_type = DataType::Struct(root_fields.clone());
1024 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1025 root_field.children.clone_from(&schema.fields);
1029 root_field
1030 .metadata
1031 .insert("__lance_decoder_root".to_string(), "true".to_string());
1032
1033 if column_infos.is_empty() || column_infos[0].is_structural() {
1034 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1035
1036 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1037 let mut root_scheduler =
1038 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1039
1040 let context = SchedulerContext::new(io, cache.clone());
1041 root_scheduler.initialize(filter, &context).await?;
1042
1043 Ok(Self {
1044 root_scheduler: RootScheduler::Structural(root_scheduler),
1045 root_fields,
1046 cache,
1047 })
1048 } else {
1049 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1052 columns.push(Arc::new(root_column(num_rows)));
1053 columns.extend(column_infos.iter().cloned());
1054
1055 let adjusted_column_indices = [0_u32]
1056 .into_iter()
1057 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1058 .collect::<Vec<_>>();
1059 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1060 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1061 let root_scheduler =
1062 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1063
1064 let context = SchedulerContext::new(io, cache.clone());
1065 root_scheduler.initialize(filter, &context).await?;
1066
1067 Ok(Self {
1068 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1069 root_fields,
1070 cache,
1071 })
1072 }
1073 }
1074
1075 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1076 pub fn from_scheduler(
1077 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1078 root_fields: Fields,
1079 cache: Arc<LanceCache>,
1080 ) -> Self {
1081 Self {
1082 root_scheduler: RootScheduler::Legacy(root_scheduler),
1083 root_fields,
1084 cache,
1085 }
1086 }
1087
1088 fn do_schedule_ranges_structural(
1089 &mut self,
1090 ranges: &[Range<u64>],
1091 filter: &FilterExpression,
1092 io: Arc<dyn EncodingsIo>,
1093 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1094 ) {
1095 let root_scheduler = self.root_scheduler.as_structural();
1096 let mut context = SchedulerContext::new(io, self.cache.clone());
1097 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1098 if let Err(schedule_ranges_err) = maybe_root_job {
1099 schedule_action(Err(schedule_ranges_err));
1100 return;
1101 }
1102 let mut root_job = maybe_root_job.unwrap();
1103 let mut num_rows_scheduled = 0;
1104 loop {
1105 let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1106 if let Err(err) = maybe_next_scan_lines {
1107 schedule_action(Err(err));
1108 return;
1109 }
1110 let next_scan_lines = maybe_next_scan_lines.unwrap();
1111 if next_scan_lines.is_empty() {
1112 return;
1113 }
1114 for next_scan_line in next_scan_lines {
1115 trace!(
1116 "Scheduled scan line of {} rows and {} decoders",
1117 next_scan_line.rows_scheduled,
1118 next_scan_line.decoders.len()
1119 );
1120 num_rows_scheduled += next_scan_line.rows_scheduled;
1121 if !schedule_action(Ok(DecoderMessage {
1122 scheduled_so_far: num_rows_scheduled,
1123 decoders: next_scan_line.decoders,
1124 })) {
1125 return;
1127 }
1128 }
1129 }
1130 }
1131
1132 fn do_schedule_ranges_legacy(
1133 &mut self,
1134 ranges: &[Range<u64>],
1135 filter: &FilterExpression,
1136 io: Arc<dyn EncodingsIo>,
1137 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1138 priority: Option<Box<dyn PriorityRange>>,
1142 ) {
1143 let root_scheduler = self.root_scheduler.as_legacy();
1144 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1145 trace!(
1146 "Scheduling {} ranges across {}..{} ({} rows){}",
1147 ranges.len(),
1148 ranges.first().unwrap().start,
1149 ranges.last().unwrap().end,
1150 rows_requested,
1151 priority
1152 .as_ref()
1153 .map(|p| format!(" (priority={:?})", p))
1154 .unwrap_or_default()
1155 );
1156
1157 let mut context = SchedulerContext::new(io, self.cache.clone());
1158 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1159 if let Err(schedule_ranges_err) = maybe_root_job {
1160 schedule_action(Err(schedule_ranges_err));
1161 return;
1162 }
1163 let mut root_job = maybe_root_job.unwrap();
1164 let mut num_rows_scheduled = 0;
1165 let mut rows_to_schedule = root_job.num_rows();
1166 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1167 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1168 while rows_to_schedule > 0 {
1169 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1170 if let Err(schedule_next_err) = maybe_next_scan_line {
1171 schedule_action(Err(schedule_next_err));
1172 return;
1173 }
1174 let next_scan_line = maybe_next_scan_line.unwrap();
1175 priority.advance(next_scan_line.rows_scheduled);
1176 num_rows_scheduled += next_scan_line.rows_scheduled;
1177 rows_to_schedule -= next_scan_line.rows_scheduled;
1178 trace!(
1179 "Scheduled scan line of {} rows and {} decoders",
1180 next_scan_line.rows_scheduled,
1181 next_scan_line.decoders.len()
1182 );
1183 if !schedule_action(Ok(DecoderMessage {
1184 scheduled_so_far: num_rows_scheduled,
1185 decoders: next_scan_line.decoders,
1186 })) {
1187 return;
1189 }
1190
1191 trace!("Finished scheduling {} ranges", ranges.len());
1192 }
1193 }
1194
1195 fn do_schedule_ranges(
1196 &mut self,
1197 ranges: &[Range<u64>],
1198 filter: &FilterExpression,
1199 io: Arc<dyn EncodingsIo>,
1200 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1201 priority: Option<Box<dyn PriorityRange>>,
1205 ) {
1206 match &self.root_scheduler {
1207 RootScheduler::Legacy(_) => {
1208 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1209 }
1210 RootScheduler::Structural(_) => {
1211 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1212 }
1213 }
1214 }
1215
1216 pub fn schedule_ranges_to_vec(
1219 &mut self,
1220 ranges: &[Range<u64>],
1221 filter: &FilterExpression,
1222 io: Arc<dyn EncodingsIo>,
1223 priority: Option<Box<dyn PriorityRange>>,
1224 ) -> Result<Vec<DecoderMessage>> {
1225 let mut decode_messages = Vec::new();
1226 self.do_schedule_ranges(
1227 ranges,
1228 filter,
1229 io,
1230 |msg| {
1231 decode_messages.push(msg);
1232 true
1233 },
1234 priority,
1235 );
1236 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1237 }
1238
1239 #[instrument(skip_all)]
1249 pub fn schedule_ranges(
1250 &mut self,
1251 ranges: &[Range<u64>],
1252 filter: &FilterExpression,
1253 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1254 scheduler: Arc<dyn EncodingsIo>,
1255 ) {
1256 self.do_schedule_ranges(
1257 ranges,
1258 filter,
1259 scheduler,
1260 |msg| {
1261 match sink.send(msg) {
1262 Ok(_) => true,
1263 Err(SendError { .. }) => {
1264 debug!(
1267 "schedule_ranges aborting early since decoder appears to have been dropped"
1268 );
1269 false
1270 }
1271 }
1272 },
1273 None,
1274 )
1275 }
1276
1277 #[instrument(skip_all)]
1285 pub fn schedule_range(
1286 &mut self,
1287 range: Range<u64>,
1288 filter: &FilterExpression,
1289 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1290 scheduler: Arc<dyn EncodingsIo>,
1291 ) {
1292 self.schedule_ranges(&[range], filter, sink, scheduler)
1293 }
1294
1295 pub fn schedule_take(
1303 &mut self,
1304 indices: &[u64],
1305 filter: &FilterExpression,
1306 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1307 scheduler: Arc<dyn EncodingsIo>,
1308 ) {
1309 debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
1310 if indices.is_empty() {
1311 return;
1312 }
1313 trace!("Scheduling take of {} rows", indices.len());
1314 let ranges = Self::indices_to_ranges(indices);
1315 self.schedule_ranges(&ranges, filter, sink, scheduler)
1316 }
1317
1318 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1320 let mut ranges = Vec::new();
1321 let mut start = indices[0];
1322
1323 for window in indices.windows(2) {
1324 if window[1] != window[0] + 1 {
1325 ranges.push(start..window[0] + 1);
1326 start = window[1];
1327 }
1328 }
1329
1330 ranges.push(start..*indices.last().unwrap() + 1);
1331 ranges
1332 }
1333}
1334
1335pub struct ReadBatchTask {
1336 pub task: BoxFuture<'static, Result<RecordBatch>>,
1337 pub num_rows: u32,
1338}
1339
1340pub struct BatchDecodeStream {
1342 context: DecoderContext,
1343 root_decoder: SimpleStructDecoder,
1344 rows_remaining: u64,
1345 rows_per_batch: u32,
1346 rows_scheduled: u64,
1347 rows_drained: u64,
1348 scheduler_exhausted: bool,
1349 emitted_batch_size_warning: Arc<Once>,
1350}
1351
1352impl BatchDecodeStream {
1353 pub fn new(
1363 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1364 rows_per_batch: u32,
1365 num_rows: u64,
1366 root_decoder: SimpleStructDecoder,
1367 ) -> Self {
1368 Self {
1369 context: DecoderContext::new(scheduled),
1370 root_decoder,
1371 rows_remaining: num_rows,
1372 rows_per_batch,
1373 rows_scheduled: 0,
1374 rows_drained: 0,
1375 scheduler_exhausted: false,
1376 emitted_batch_size_warning: Arc::new(Once::new()),
1377 }
1378 }
1379
1380 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1381 if decoder.path.is_empty() {
1382 Ok(())
1384 } else {
1385 self.root_decoder.accept_child(decoder)
1386 }
1387 }
1388
1389 #[instrument(level = "debug", skip_all)]
1390 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1391 if self.scheduler_exhausted {
1392 return Ok(self.rows_scheduled);
1393 }
1394 while self.rows_scheduled < scheduled_need {
1395 let next_message = self.context.source.recv().await;
1396 match next_message {
1397 Some(scan_line) => {
1398 let scan_line = scan_line?;
1399 self.rows_scheduled = scan_line.scheduled_so_far;
1400 for message in scan_line.decoders {
1401 self.accept_decoder(message.into_legacy())?;
1402 }
1403 }
1404 None => {
1405 self.scheduler_exhausted = true;
1409 return Ok(self.rows_scheduled);
1410 }
1411 }
1412 }
1413 Ok(scheduled_need)
1414 }
1415
1416 #[instrument(level = "debug", skip_all)]
1417 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1418 trace!(
1419 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1420 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1421 );
1422 if self.rows_remaining == 0 {
1423 return Ok(None);
1424 }
1425
1426 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1427 self.rows_remaining -= to_take;
1428
1429 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1430 trace!(
1431 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1432 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1433 );
1434 if scheduled_need > 0 {
1435 let desired_scheduled = scheduled_need + self.rows_scheduled;
1436 trace!(
1437 "Draining from scheduler (desire at least {} scheduled rows)",
1438 desired_scheduled
1439 );
1440 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1441 if actually_scheduled < desired_scheduled {
1442 let under_scheduled = desired_scheduled - actually_scheduled;
1443 to_take -= under_scheduled;
1444 }
1445 }
1446
1447 if to_take == 0 {
1448 return Ok(None);
1449 }
1450
1451 let loaded_need = self.rows_drained + to_take - 1;
1453 trace!(
1454 "Waiting for I/O (desire at least {} fully loaded rows)",
1455 loaded_need
1456 );
1457 self.root_decoder.wait_for_loaded(loaded_need).await?;
1458
1459 let next_task = self.root_decoder.drain(to_take)?;
1460 self.rows_drained += to_take;
1461 Ok(Some(next_task))
1462 }
1463
1464 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1465 let stream = futures::stream::unfold(self, |mut slf| async move {
1466 let next_task = slf.next_batch_task().await;
1467 let next_task = next_task.transpose().map(|next_task| {
1468 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1469 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1470 let task = async move {
1471 let next_task = next_task?;
1472 tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
1476 .await
1477 .map_err(|err| Error::wrapped(err.into()))?
1478 };
1479 (task, num_rows)
1480 });
1481 next_task.map(|(task, num_rows)| {
1482 debug_assert!(num_rows <= u32::MAX as u64);
1484 let next_task = ReadBatchTask {
1485 task: task.boxed(),
1486 num_rows: num_rows as u32,
1487 };
1488 (next_task, slf)
1489 })
1490 });
1491 stream.boxed()
1492 }
1493}
1494
1495enum RootDecoderMessage {
1498 LoadedPage(LoadedPageShard),
1499 LegacyPage(crate::previous::decoder::DecoderReady),
1500}
1501trait RootDecoderType {
1502 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1503 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1504 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1505}
1506impl RootDecoderType for StructuralStructDecoder {
1507 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1508 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1509 unreachable!()
1510 };
1511 self.accept_page(loaded_page)
1512 }
1513 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1514 self.drain_batch_task(num_rows)
1515 }
1516 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1517 Ok(())
1519 }
1520}
1521impl RootDecoderType for SimpleStructDecoder {
1522 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1523 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1524 unreachable!()
1525 };
1526 self.accept_child(legacy_page)
1527 }
1528 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1529 self.drain(num_rows)
1530 }
1531 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1532 runtime.block_on(self.wait_for_loaded(loaded_need))
1533 }
1534}
1535
1536struct BatchDecodeIterator<T: RootDecoderType> {
1538 messages: VecDeque<Result<DecoderMessage>>,
1539 root_decoder: T,
1540 rows_remaining: u64,
1541 rows_per_batch: u32,
1542 rows_scheduled: u64,
1543 rows_drained: u64,
1544 emitted_batch_size_warning: Arc<Once>,
1545 wait_for_io_runtime: tokio::runtime::Runtime,
1549 schema: Arc<ArrowSchema>,
1550}
1551
1552impl<T: RootDecoderType> BatchDecodeIterator<T> {
1553 pub fn new(
1555 messages: VecDeque<Result<DecoderMessage>>,
1556 rows_per_batch: u32,
1557 num_rows: u64,
1558 root_decoder: T,
1559 schema: Arc<ArrowSchema>,
1560 ) -> Self {
1561 Self {
1562 messages,
1563 root_decoder,
1564 rows_remaining: num_rows,
1565 rows_per_batch,
1566 rows_scheduled: 0,
1567 rows_drained: 0,
1568 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1569 .build()
1570 .unwrap(),
1571 emitted_batch_size_warning: Arc::new(Once::new()),
1572 schema,
1573 }
1574 }
1575
1576 fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1581 match maybe_done(unloaded_page.0) {
1582 MaybeDone::Done(loaded_page) => loaded_page,
1584 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1586 MaybeDone::Gone => unreachable!(),
1587 }
1588 }
1589
1590 #[instrument(skip_all)]
1595 fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1596 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1597 let message = self.messages.pop_front().unwrap()?;
1598 self.rows_scheduled = message.scheduled_so_far;
1599 for decoder_message in message.decoders {
1600 match decoder_message {
1601 MessageType::UnloadedPage(unloaded_page) => {
1602 let loaded_page = self.wait_for_page(unloaded_page)?;
1603 self.root_decoder
1604 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1605 }
1606 MessageType::DecoderReady(decoder_ready) => {
1607 if !decoder_ready.path.is_empty() {
1609 self.root_decoder
1610 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1611 }
1612 }
1613 }
1614 }
1615 }
1616
1617 let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1618
1619 self.root_decoder
1620 .wait(loaded_need, &self.wait_for_io_runtime)?;
1621 Ok(self.rows_scheduled)
1622 }
1623
1624 #[instrument(level = "debug", skip_all)]
1625 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1626 trace!(
1627 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1628 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1629 );
1630 if self.rows_remaining == 0 {
1631 return Ok(None);
1632 }
1633
1634 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1635 self.rows_remaining -= to_take;
1636
1637 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1638 trace!(
1639 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1640 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1641 );
1642 if scheduled_need > 0 {
1643 let desired_scheduled = scheduled_need + self.rows_scheduled;
1644 trace!(
1645 "Draining from scheduler (desire at least {} scheduled rows)",
1646 desired_scheduled
1647 );
1648 let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1649 if actually_scheduled < desired_scheduled {
1650 let under_scheduled = desired_scheduled - actually_scheduled;
1651 to_take -= under_scheduled;
1652 }
1653 }
1654
1655 if to_take == 0 {
1656 return Ok(None);
1657 }
1658
1659 let next_task = self.root_decoder.drain_batch(to_take)?;
1660
1661 self.rows_drained += to_take;
1662
1663 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1664
1665 Ok(Some(batch))
1666 }
1667}
1668
1669impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1670 type Item = ArrowResult<RecordBatch>;
1671
1672 fn next(&mut self) -> Option<Self::Item> {
1673 self.next_batch_task()
1674 .transpose()
1675 .map(|r| r.map_err(ArrowError::from))
1676 }
1677}
1678
1679impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1680 fn schema(&self) -> Arc<ArrowSchema> {
1681 self.schema.clone()
1682 }
1683}
1684
1685pub struct StructuralBatchDecodeStream {
1687 context: DecoderContext,
1688 root_decoder: StructuralStructDecoder,
1689 rows_remaining: u64,
1690 rows_per_batch: u32,
1691 rows_scheduled: u64,
1692 rows_drained: u64,
1693 scheduler_exhausted: bool,
1694 emitted_batch_size_warning: Arc<Once>,
1695 spawn_batch_decode_tasks: bool,
1703}
1704
1705impl StructuralBatchDecodeStream {
1706 pub fn new(
1716 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1717 rows_per_batch: u32,
1718 num_rows: u64,
1719 root_decoder: StructuralStructDecoder,
1720 spawn_batch_decode_tasks: bool,
1721 ) -> Self {
1722 Self {
1723 context: DecoderContext::new(scheduled),
1724 root_decoder,
1725 rows_remaining: num_rows,
1726 rows_per_batch,
1727 rows_scheduled: 0,
1728 rows_drained: 0,
1729 scheduler_exhausted: false,
1730 emitted_batch_size_warning: Arc::new(Once::new()),
1731 spawn_batch_decode_tasks,
1732 }
1733 }
1734
1735 #[instrument(level = "debug", skip_all)]
1736 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1737 if self.scheduler_exhausted {
1738 return Ok(self.rows_scheduled);
1739 }
1740 while self.rows_scheduled < scheduled_need {
1741 let next_message = self.context.source.recv().await;
1742 match next_message {
1743 Some(scan_line) => {
1744 let scan_line = scan_line?;
1745 self.rows_scheduled = scan_line.scheduled_so_far;
1746 for message in scan_line.decoders {
1747 let unloaded_page = message.into_structural();
1748 let loaded_page = unloaded_page.0.await?;
1749 self.root_decoder.accept_page(loaded_page)?;
1750 }
1751 }
1752 None => {
1753 self.scheduler_exhausted = true;
1757 return Ok(self.rows_scheduled);
1758 }
1759 }
1760 }
1761 Ok(scheduled_need)
1762 }
1763
1764 #[instrument(level = "debug", skip_all)]
1765 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1766 trace!(
1767 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1768 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1769 );
1770 if self.rows_remaining == 0 {
1771 return Ok(None);
1772 }
1773
1774 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1775 self.rows_remaining -= to_take;
1776
1777 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1778 trace!(
1779 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1780 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1781 );
1782 if scheduled_need > 0 {
1783 let desired_scheduled = scheduled_need + self.rows_scheduled;
1784 trace!(
1785 "Draining from scheduler (desire at least {} scheduled rows)",
1786 desired_scheduled
1787 );
1788 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1789 if actually_scheduled < desired_scheduled {
1790 let under_scheduled = desired_scheduled - actually_scheduled;
1791 to_take -= under_scheduled;
1792 }
1793 }
1794
1795 if to_take == 0 {
1796 return Ok(None);
1797 }
1798
1799 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1800 self.rows_drained += to_take;
1801 Ok(Some(next_task))
1802 }
1803
1804 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1805 let stream = futures::stream::unfold(self, |mut slf| async move {
1806 let next_task = slf.next_batch_task().await;
1807 let next_task = next_task.transpose().map(|next_task| {
1808 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1809 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1810 let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
1813 let task = async move {
1814 let next_task = next_task?;
1815 if spawn_batch_decode_tasks {
1816 tokio::spawn(
1817 async move { next_task.into_batch(emitted_batch_size_warning) },
1818 )
1819 .await
1820 .map_err(|err| Error::wrapped(err.into()))?
1821 } else {
1822 next_task.into_batch(emitted_batch_size_warning)
1823 }
1824 };
1825 (task, num_rows)
1826 });
1827 next_task.map(|(task, num_rows)| {
1828 debug_assert!(num_rows <= u32::MAX as u64);
1830 let next_task = ReadBatchTask {
1831 task: task.boxed(),
1832 num_rows: num_rows as u32,
1833 };
1834 (next_task, slf)
1835 })
1836 });
1837 stream.boxed()
1838 }
1839}
1840
1841#[derive(Debug)]
1842pub enum RequestedRows {
1843 Ranges(Vec<Range<u64>>),
1844 Indices(Vec<u64>),
1845}
1846
1847impl RequestedRows {
1848 pub fn num_rows(&self) -> u64 {
1849 match self {
1850 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1851 Self::Indices(indices) => indices.len() as u64,
1852 }
1853 }
1854
1855 pub fn trim_empty_ranges(mut self) -> Self {
1856 if let Self::Ranges(ranges) = &mut self {
1857 ranges.retain(|r| !r.is_empty());
1858 }
1859 self
1860 }
1861}
1862
1863#[derive(Debug, Clone)]
1865pub struct DecoderConfig {
1866 pub cache_repetition_index: bool,
1873 pub validate_on_decode: bool,
1875}
1876
1877impl Default for DecoderConfig {
1878 fn default() -> Self {
1879 Self {
1880 cache_repetition_index: default_cache_repetition_index(),
1881 validate_on_decode: false,
1882 }
1883 }
1884}
1885
1886#[derive(Debug, Clone)]
1887pub struct SchedulerDecoderConfig {
1888 pub decoder_plugins: Arc<DecoderPlugins>,
1889 pub batch_size: u32,
1890 pub io: Arc<dyn EncodingsIo>,
1891 pub cache: Arc<LanceCache>,
1892 pub decoder_config: DecoderConfig,
1894}
1895
1896fn check_scheduler_on_drop(
1897 stream: BoxStream<'static, ReadBatchTask>,
1898 scheduler_handle: tokio::task::JoinHandle<()>,
1899) -> BoxStream<'static, ReadBatchTask> {
1900 let mut scheduler_handle = Some(scheduler_handle);
1904 let check_scheduler = stream::unfold((), move |_| {
1905 let handle = scheduler_handle.take();
1906 async move {
1907 if let Some(handle) = handle {
1908 handle.await.unwrap();
1909 }
1910 None
1911 }
1912 });
1913 stream.chain(check_scheduler).boxed()
1914}
1915
1916pub fn create_decode_stream(
1917 schema: &Schema,
1918 num_rows: u64,
1919 batch_size: u32,
1920 is_structural: bool,
1921 should_validate: bool,
1922 spawn_structural_batch_decode_tasks: bool,
1923 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1924) -> Result<BoxStream<'static, ReadBatchTask>> {
1925 if is_structural {
1926 let arrow_schema = ArrowSchema::from(schema);
1927 let structural_decoder = StructuralStructDecoder::new(
1928 arrow_schema.fields,
1929 should_validate,
1930 true,
1931 )?;
1932 Ok(StructuralBatchDecodeStream::new(
1933 rx,
1934 batch_size,
1935 num_rows,
1936 structural_decoder,
1937 spawn_structural_batch_decode_tasks,
1938 )
1939 .into_stream())
1940 } else {
1941 let arrow_schema = ArrowSchema::from(schema);
1942 let root_fields = arrow_schema.fields;
1943
1944 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1945 Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
1946 }
1947}
1948
1949pub fn create_decode_iterator(
1953 schema: &Schema,
1954 num_rows: u64,
1955 batch_size: u32,
1956 should_validate: bool,
1957 is_structural: bool,
1958 messages: VecDeque<Result<DecoderMessage>>,
1959) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1960 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1961 let root_fields = arrow_schema.fields.clone();
1962 if is_structural {
1963 let simple_struct_decoder =
1964 StructuralStructDecoder::new(root_fields, should_validate, true)?;
1965 Ok(Box::new(BatchDecodeIterator::new(
1966 messages,
1967 batch_size,
1968 num_rows,
1969 simple_struct_decoder,
1970 arrow_schema,
1971 )))
1972 } else {
1973 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1974 Ok(Box::new(BatchDecodeIterator::new(
1975 messages,
1976 batch_size,
1977 num_rows,
1978 root_decoder,
1979 arrow_schema,
1980 )))
1981 }
1982}
1983
1984fn create_scheduler_decoder(
1985 column_infos: Vec<Arc<ColumnInfo>>,
1986 requested_rows: RequestedRows,
1987 filter: FilterExpression,
1988 column_indices: Vec<u32>,
1989 target_schema: Arc<Schema>,
1990 config: SchedulerDecoderConfig,
1991) -> Result<BoxStream<'static, ReadBatchTask>> {
1992 let num_rows = requested_rows.num_rows();
1993
1994 let is_structural = column_infos[0].is_structural();
1995 let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
1996 let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
1997 Some("always") => true,
1998 Some("never") => false,
1999 _ => matches!(requested_rows, RequestedRows::Ranges(_)),
2000 };
2001
2002 let (tx, rx) = mpsc::unbounded_channel();
2003
2004 let decode_stream = create_decode_stream(
2005 &target_schema,
2006 num_rows,
2007 config.batch_size,
2008 is_structural,
2009 config.decoder_config.validate_on_decode,
2010 spawn_structural_batch_decode_tasks,
2011 rx,
2012 )?;
2013
2014 let scheduler_handle = tokio::task::spawn(async move {
2015 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
2016 target_schema.as_ref(),
2017 &column_indices,
2018 &column_infos,
2019 &vec![],
2020 num_rows,
2021 config.decoder_plugins,
2022 config.io.clone(),
2023 config.cache,
2024 &filter,
2025 &config.decoder_config,
2026 )
2027 .await
2028 {
2029 Ok(scheduler) => scheduler,
2030 Err(e) => {
2031 let _ = tx.send(Err(e));
2032 return;
2033 }
2034 };
2035
2036 match requested_rows {
2037 RequestedRows::Ranges(ranges) => {
2038 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2039 }
2040 RequestedRows::Indices(indices) => {
2041 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2042 }
2043 }
2044 });
2045
2046 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2047}
2048
2049pub fn schedule_and_decode(
2055 column_infos: Vec<Arc<ColumnInfo>>,
2056 requested_rows: RequestedRows,
2057 filter: FilterExpression,
2058 column_indices: Vec<u32>,
2059 target_schema: Arc<Schema>,
2060 config: SchedulerDecoderConfig,
2061) -> BoxStream<'static, ReadBatchTask> {
2062 if requested_rows.num_rows() == 0 {
2063 return stream::empty().boxed();
2064 }
2065
2066 let requested_rows = requested_rows.trim_empty_ranges();
2069
2070 let io = config.io.clone();
2071
2072 match create_scheduler_decoder(
2076 column_infos,
2077 requested_rows,
2078 filter,
2079 column_indices,
2080 target_schema,
2081 config,
2082 ) {
2083 Ok(stream) => stream.finally(move || drop(io)).boxed(),
2086 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2088 num_rows: 0,
2089 task: std::future::ready(Err(e)).boxed(),
2090 }))
2091 .boxed(),
2092 }
2093}
2094
2095pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2096 tokio::runtime::Builder::new_current_thread()
2097 .build()
2098 .unwrap()
2099});
2100
2101pub fn schedule_and_decode_blocking(
2116 column_infos: Vec<Arc<ColumnInfo>>,
2117 requested_rows: RequestedRows,
2118 filter: FilterExpression,
2119 column_indices: Vec<u32>,
2120 target_schema: Arc<Schema>,
2121 config: SchedulerDecoderConfig,
2122) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2123 if requested_rows.num_rows() == 0 {
2124 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2125 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2126 }
2127
2128 let num_rows = requested_rows.num_rows();
2129 let is_structural = column_infos[0].is_structural();
2130
2131 let (tx, mut rx) = mpsc::unbounded_channel();
2132
2133 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2136 target_schema.as_ref(),
2137 &column_indices,
2138 &column_infos,
2139 &vec![],
2140 num_rows,
2141 config.decoder_plugins,
2142 config.io.clone(),
2143 config.cache,
2144 &filter,
2145 &config.decoder_config,
2146 ))?;
2147
2148 match requested_rows {
2150 RequestedRows::Ranges(ranges) => {
2151 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2152 }
2153 RequestedRows::Indices(indices) => {
2154 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2155 }
2156 }
2157
2158 let mut messages = Vec::new();
2160 while rx
2161 .recv_many(&mut messages, usize::MAX)
2162 .now_or_never()
2163 .unwrap()
2164 != 0
2165 {}
2166
2167 let decode_iterator = create_decode_iterator(
2169 &target_schema,
2170 num_rows,
2171 config.batch_size,
2172 config.decoder_config.validate_on_decode,
2173 is_structural,
2174 messages.into(),
2175 )?;
2176
2177 Ok(decode_iterator)
2178}
2179
2180pub trait PrimitivePageDecoder: Send + Sync {
2192 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2224}
2225
2226pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2235 fn schedule_ranges(
2247 &self,
2248 ranges: &[Range<u64>],
2249 scheduler: &Arc<dyn EncodingsIo>,
2250 top_level_row: u64,
2251 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2252}
2253
2254pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2256 fn advance(&mut self, num_rows: u64);
2257 fn current_priority(&self) -> u64;
2258 fn box_clone(&self) -> Box<dyn PriorityRange>;
2259}
2260
2261#[derive(Debug)]
2264pub struct SimplePriorityRange {
2265 priority: u64,
2266}
2267
2268impl SimplePriorityRange {
2269 fn new(priority: u64) -> Self {
2270 Self { priority }
2271 }
2272}
2273
2274impl PriorityRange for SimplePriorityRange {
2275 fn advance(&mut self, num_rows: u64) {
2276 self.priority += num_rows;
2277 }
2278
2279 fn current_priority(&self) -> u64 {
2280 self.priority
2281 }
2282
2283 fn box_clone(&self) -> Box<dyn PriorityRange> {
2284 Box::new(Self {
2285 priority: self.priority,
2286 })
2287 }
2288}
2289
2290pub struct ListPriorityRange {
2303 base: Box<dyn PriorityRange>,
2304 offsets: Arc<[u64]>,
2305 cur_index_into_offsets: usize,
2306 cur_position: u64,
2307}
2308
2309impl ListPriorityRange {
2310 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2311 Self {
2312 base,
2313 offsets,
2314 cur_index_into_offsets: 0,
2315 cur_position: 0,
2316 }
2317 }
2318}
2319
2320impl std::fmt::Debug for ListPriorityRange {
2321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2322 f.debug_struct("ListPriorityRange")
2323 .field("base", &self.base)
2324 .field("offsets.len()", &self.offsets.len())
2325 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2326 .field("cur_position", &self.cur_position)
2327 .finish()
2328 }
2329}
2330
2331impl PriorityRange for ListPriorityRange {
2332 fn advance(&mut self, num_rows: u64) {
2333 self.cur_position += num_rows;
2336 let mut idx_into_offsets = self.cur_index_into_offsets;
2337 while idx_into_offsets + 1 < self.offsets.len()
2338 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2339 {
2340 idx_into_offsets += 1;
2341 }
2342 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2343 self.cur_index_into_offsets = idx_into_offsets;
2344 self.base.advance(base_rows_advanced as u64);
2345 }
2346
2347 fn current_priority(&self) -> u64 {
2348 self.base.current_priority()
2349 }
2350
2351 fn box_clone(&self) -> Box<dyn PriorityRange> {
2352 Box::new(Self {
2353 base: self.base.box_clone(),
2354 offsets: self.offsets.clone(),
2355 cur_index_into_offsets: self.cur_index_into_offsets,
2356 cur_position: self.cur_position,
2357 })
2358 }
2359}
2360
2361pub struct SchedulerContext {
2363 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2364 io: Arc<dyn EncodingsIo>,
2365 cache: Arc<LanceCache>,
2366 name: String,
2367 path: Vec<u32>,
2368 path_names: Vec<String>,
2369}
2370
2371pub struct ScopedSchedulerContext<'a> {
2372 pub context: &'a mut SchedulerContext,
2373}
2374
2375impl<'a> ScopedSchedulerContext<'a> {
2376 pub fn pop(self) -> &'a mut SchedulerContext {
2377 self.context.pop();
2378 self.context
2379 }
2380}
2381
2382impl SchedulerContext {
2383 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2384 Self {
2385 io,
2386 cache,
2387 recv: None,
2388 name: "".to_string(),
2389 path: Vec::new(),
2390 path_names: Vec::new(),
2391 }
2392 }
2393
2394 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2395 &self.io
2396 }
2397
2398 pub fn cache(&self) -> &Arc<LanceCache> {
2399 &self.cache
2400 }
2401
2402 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2403 self.path.push(index);
2404 self.path_names.push(name.to_string());
2405 ScopedSchedulerContext { context: self }
2406 }
2407
2408 pub fn pop(&mut self) {
2409 self.path.pop();
2410 self.path_names.pop();
2411 }
2412
2413 pub fn path_name(&self) -> String {
2414 let path = self.path_names.join("/");
2415 if self.recv.is_some() {
2416 format!("TEMP({}){}", self.name, path)
2417 } else {
2418 format!("ROOT{}", path)
2419 }
2420 }
2421
2422 pub fn current_path(&self) -> VecDeque<u32> {
2423 VecDeque::from_iter(self.path.iter().copied())
2424 }
2425
2426 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2427 pub fn locate_decoder(
2428 &mut self,
2429 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2430 ) -> crate::previous::decoder::DecoderReady {
2431 trace!(
2432 "Scheduling decoder of type {:?} for {:?}",
2433 decoder.data_type(),
2434 self.path,
2435 );
2436 crate::previous::decoder::DecoderReady {
2437 decoder,
2438 path: self.current_path(),
2439 }
2440 }
2441}
2442
2443pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2444
2445impl std::fmt::Debug for UnloadedPageShard {
2446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2447 f.debug_struct("UnloadedPage").finish()
2448 }
2449}
2450
2451#[derive(Debug)]
2452pub struct ScheduledScanLine {
2453 pub rows_scheduled: u64,
2454 pub decoders: Vec<MessageType>,
2455}
2456
2457pub trait StructuralSchedulingJob: std::fmt::Debug {
2458 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2465}
2466
2467pub struct FilterExpression(pub Bytes);
2475
2476impl FilterExpression {
2477 pub fn no_filter() -> Self {
2482 Self(Bytes::new())
2483 }
2484
2485 pub fn is_noop(&self) -> bool {
2487 self.0.is_empty()
2488 }
2489}
2490
2491pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2492 fn initialize<'a>(
2493 &'a mut self,
2494 filter: &'a FilterExpression,
2495 context: &'a SchedulerContext,
2496 ) -> BoxFuture<'a, Result<()>>;
2497 fn schedule_ranges<'a>(
2498 &'a self,
2499 ranges: &[Range<u64>],
2500 filter: &FilterExpression,
2501 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2502}
2503
2504pub trait DecodeArrayTask: Send {
2506 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2508}
2509
2510impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2511 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2512 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2513 }
2514}
2515
2516pub struct NextDecodeTask {
2521 pub task: Box<dyn DecodeArrayTask>,
2523 pub num_rows: u64,
2525}
2526
2527impl NextDecodeTask {
2528 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2533 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2534 let struct_arr = self.task.decode();
2535 match struct_arr {
2536 Ok(struct_arr) => {
2537 let batch = RecordBatch::from(struct_arr.as_struct());
2538 let size_bytes = batch.get_array_memory_size() as u64;
2539 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2540 emitted_batch_size_warning.call_once(|| {
2541 let size_mb = size_bytes / 1024 / 1024;
2542 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2543 });
2544 }
2545 Ok(batch)
2546 }
2547 Err(e) => {
2548 let e = Error::internal(format!("Error decoding batch: {}", e));
2549 Err(e)
2550 }
2551 }
2552 }
2553}
2554
2555#[derive(Debug)]
2559pub enum MessageType {
2560 DecoderReady(crate::previous::decoder::DecoderReady),
2565 UnloadedPage(UnloadedPageShard),
2569}
2570
2571impl MessageType {
2572 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2573 match self {
2574 Self::DecoderReady(decoder) => decoder,
2575 Self::UnloadedPage(_) => {
2576 panic!("Expected DecoderReady but got UnloadedPage")
2577 }
2578 }
2579 }
2580
2581 pub fn into_structural(self) -> UnloadedPageShard {
2582 match self {
2583 Self::UnloadedPage(unloaded) => unloaded,
2584 Self::DecoderReady(_) => {
2585 panic!("Expected UnloadedPage but got DecoderReady")
2586 }
2587 }
2588 }
2589}
2590
2591pub struct DecoderMessage {
2592 pub scheduled_so_far: u64,
2593 pub decoders: Vec<MessageType>,
2594}
2595
2596pub struct DecoderContext {
2597 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2598}
2599
2600impl DecoderContext {
2601 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2602 Self { source }
2603 }
2604}
2605
2606pub struct DecodedPage {
2607 pub data: DataBlock,
2608 pub repdef: RepDefUnraveler,
2609}
2610
2611pub trait DecodePageTask: Send + std::fmt::Debug {
2612 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2614}
2615
2616pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2617 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2618 fn num_rows(&self) -> u64;
2619}
2620
2621#[derive(Debug)]
2622pub struct LoadedPageShard {
2623 pub decoder: Box<dyn StructuralPageDecoder>,
2625 pub path: VecDeque<u32>,
2644}
2645
2646pub struct DecodedArray {
2647 pub array: ArrayRef,
2648 pub repdef: CompositeRepDefUnraveler,
2649}
2650
2651pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2652 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2653}
2654
2655pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2656 fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2661 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2663 fn data_type(&self) -> &DataType;
2665}
2666
2667#[derive(Debug, Default)]
2668pub struct DecoderPlugins {}
2669
2670pub async fn decode_batch(
2672 batch: &EncodedBatch,
2673 filter: &FilterExpression,
2674 decoder_plugins: Arc<DecoderPlugins>,
2675 should_validate: bool,
2676 version: LanceFileVersion,
2677 cache: Option<Arc<LanceCache>>,
2678) -> Result<RecordBatch> {
2679 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2684 let cache = if let Some(cache) = cache {
2685 cache
2686 } else {
2687 Arc::new(lance_core::cache::LanceCache::with_capacity(
2688 128 * 1024 * 1024,
2689 ))
2690 };
2691 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2692 batch.schema.as_ref(),
2693 &batch.top_level_columns,
2694 &batch.page_table,
2695 &vec![],
2696 batch.num_rows,
2697 decoder_plugins,
2698 io_scheduler.clone(),
2699 cache,
2700 filter,
2701 &DecoderConfig::default(),
2702 )
2703 .await?;
2704 let (tx, rx) = unbounded_channel();
2705 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2706 let is_structural = version >= LanceFileVersion::V2_1;
2707 let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2708 let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
2709 let mut decode_stream = create_decode_stream(
2710 &batch.schema,
2711 batch.num_rows,
2712 batch.num_rows as u32,
2713 is_structural,
2714 should_validate,
2715 spawn_structural_batch_decode_tasks,
2716 rx,
2717 )?;
2718 decode_stream.next().await.unwrap().task.await
2719}
2720
2721#[cfg(test)]
2722mod tests {
2724 use super::*;
2725
2726 #[test]
2727 fn test_coalesce_indices_to_ranges_with_single_index() {
2728 let indices = vec![1];
2729 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2730 assert_eq!(ranges, vec![1..2]);
2731 }
2732
2733 #[test]
2734 fn test_coalesce_indices_to_ranges() {
2735 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2736 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2737 assert_eq!(ranges, vec![1..10]);
2738 }
2739
2740 #[test]
2741 fn test_coalesce_indices_to_ranges_with_gaps() {
2742 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2743 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2744 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2745 }
2746}