1use std::collections::VecDeque;
216use std::sync::atomic::{AtomicU64, Ordering};
217use std::sync::{LazyLock, Once, OnceLock};
218use std::{ops::Range, sync::Arc};
219
220use arrow_array::cast::AsArray;
221use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
222use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
223use bytes::Bytes;
224use futures::future::{BoxFuture, MaybeDone, maybe_done};
225use futures::stream::{self, BoxStream};
226use futures::{FutureExt, StreamExt};
227use lance_arrow::DataTypeExt;
228use lance_core::cache::LanceCache;
229use lance_core::datatypes::{
230 BLOB_DESC_LANCE_FIELD, Field, Schema, validate_fixed_size_list_dimensions,
231};
232use lance_core::utils::futures::{FinallyStreamExt, StreamOnDropExt};
233use lance_core::utils::parse::parse_env_as_bool;
234use log::{debug, trace, warn};
235use tokio::sync::mpsc::error::SendError;
236use tokio::sync::mpsc::{self, unbounded_channel};
237
238use lance_core::error::LanceOptionExt;
239use lance_core::{ArrowResult, Error, Result};
240use tracing::instrument;
241
242use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
243use crate::data::DataBlock;
244use crate::encoder::EncodedBatch;
245use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
246use crate::encodings::logical::list::StructuralListScheduler;
247use crate::encodings::logical::map::StructuralMapScheduler;
248use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
249use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
250use crate::format::pb::{self, column_encoding};
251use crate::format::pb21;
252use crate::previous::decoder::LogicalPageDecoder;
253use crate::previous::encodings::logical::list::OffsetPageInfo;
254use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
255use crate::previous::encodings::logical::{
256 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
257 primitive::PrimitiveFieldScheduler,
258};
259use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
260use crate::version::LanceFileVersion;
261use crate::{BufferScheduler, EncodingsIo};
262
263const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
265const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
266 "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
267const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
268const ENV_LANCE_INLINE_SCHEDULING_THRESHOLD: &str = "LANCE_INLINE_SCHEDULING_THRESHOLD";
269
270const DEFAULT_INLINE_SCHEDULING_THRESHOLD: u64 = 16 * 1024;
273
274fn default_cache_repetition_index() -> bool {
275 static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
276 *DEFAULT_CACHE_REPETITION_INDEX
277 .get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
278}
279
280fn inline_scheduling_threshold() -> u64 {
281 static THRESHOLD: OnceLock<u64> = OnceLock::new();
282 *THRESHOLD.get_or_init(|| {
283 std::env::var(ENV_LANCE_INLINE_SCHEDULING_THRESHOLD)
284 .ok()
285 .and_then(|v| v.trim().parse::<u64>().ok())
286 .unwrap_or(DEFAULT_INLINE_SCHEDULING_THRESHOLD)
287 })
288}
289
290#[derive(Debug)]
297pub enum PageEncoding {
298 Legacy(pb::ArrayEncoding),
299 Structural(pb21::PageLayout),
300}
301
302impl PageEncoding {
303 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
304 match self {
305 Self::Legacy(enc) => enc,
306 Self::Structural(_) => panic!("Expected a legacy encoding"),
307 }
308 }
309
310 pub fn as_structural(&self) -> &pb21::PageLayout {
311 match self {
312 Self::Structural(enc) => enc,
313 Self::Legacy(_) => panic!("Expected a structural encoding"),
314 }
315 }
316
317 pub fn is_structural(&self) -> bool {
318 matches!(self, Self::Structural(_))
319 }
320}
321
322#[derive(Debug)]
326pub struct PageInfo {
327 pub num_rows: u64,
329 pub priority: u64,
333 pub encoding: PageEncoding,
335 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
337}
338
339#[derive(Debug, Clone)]
343pub struct ColumnInfo {
344 pub index: u32,
346 pub page_infos: Arc<[PageInfo]>,
348 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
350 pub encoding: pb::ColumnEncoding,
351}
352
353impl ColumnInfo {
354 pub fn new(
356 index: u32,
357 page_infos: Arc<[PageInfo]>,
358 buffer_offsets_and_sizes: Vec<(u64, u64)>,
359 encoding: pb::ColumnEncoding,
360 ) -> Self {
361 Self {
362 index,
363 page_infos,
364 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
365 encoding,
366 }
367 }
368
369 pub fn is_structural(&self) -> bool {
370 self.page_infos
371 .first()
373 .map(|page| page.encoding.is_structural())
374 .unwrap_or(false)
375 }
376}
377
378enum RootScheduler {
379 Structural(Box<dyn StructuralFieldScheduler>),
380 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
381}
382
383impl RootScheduler {
384 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
385 match self {
386 Self::Structural(_) => panic!("Expected a legacy scheduler"),
387 Self::Legacy(s) => s,
388 }
389 }
390
391 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
392 match self {
393 Self::Structural(s) => s.as_ref(),
394 Self::Legacy(_) => panic!("Expected a structural scheduler"),
395 }
396 }
397}
398
399pub struct DecodeBatchScheduler {
421 root_scheduler: RootScheduler,
422 pub root_fields: Fields,
423 cache: Arc<LanceCache>,
424}
425
426pub struct ColumnInfoIter<'a> {
427 column_infos: Vec<Arc<ColumnInfo>>,
428 column_indices: &'a [u32],
429 column_info_pos: usize,
430 column_indices_pos: usize,
431}
432
433impl<'a> ColumnInfoIter<'a> {
434 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
435 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
436 Self {
437 column_infos,
438 column_indices,
439 column_info_pos: initial_pos,
440 column_indices_pos: 0,
441 }
442 }
443
444 pub fn peek(&self) -> &Arc<ColumnInfo> {
445 &self.column_infos[self.column_info_pos]
446 }
447
448 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
449 let column_info = self.column_infos[self.column_info_pos].clone();
450 let transformed = transform(column_info);
451 self.column_infos[self.column_info_pos] = transformed;
452 }
453
454 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
455 self.next().ok_or_else(|| {
456 Error::invalid_input(
457 "there were more fields in the schema than provided column indices / infos",
458 )
459 })
460 }
461
462 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
463 if self.column_info_pos < self.column_infos.len() {
464 let info = &self.column_infos[self.column_info_pos];
465 self.column_info_pos += 1;
466 Some(info)
467 } else {
468 None
469 }
470 }
471
472 pub(crate) fn next_top_level(&mut self) {
473 self.column_indices_pos += 1;
474 if self.column_indices_pos < self.column_indices.len() {
475 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
476 } else {
477 self.column_info_pos = self.column_infos.len();
478 }
479 }
480}
481
482#[derive(Clone, Copy, Debug)]
484pub struct FileBuffers<'a> {
485 pub positions_and_sizes: &'a [(u64, u64)],
486}
487
488#[derive(Clone, Copy, Debug)]
490pub struct ColumnBuffers<'a, 'b> {
491 pub file_buffers: FileBuffers<'a>,
492 pub positions_and_sizes: &'b [(u64, u64)],
493}
494
495#[derive(Clone, Copy, Debug)]
497pub struct PageBuffers<'a, 'b, 'c> {
498 pub column_buffers: ColumnBuffers<'a, 'b>,
499 pub positions_and_sizes: &'c [(u64, u64)],
500}
501
502#[derive(Debug)]
504pub struct CoreFieldDecoderStrategy {
505 pub validate_data: bool,
506 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
507 pub cache_repetition_index: bool,
508}
509
510impl Default for CoreFieldDecoderStrategy {
511 fn default() -> Self {
512 Self {
513 validate_data: false,
514 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
515 cache_repetition_index: false,
516 }
517 }
518}
519
520impl CoreFieldDecoderStrategy {
521 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
523 self.cache_repetition_index = cache_repetition_index;
524 self
525 }
526
527 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
529 Self {
530 validate_data: config.validate_on_decode,
531 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
532 cache_repetition_index: config.cache_repetition_index,
533 }
534 }
535
536 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
539 let column_encoding = column_info
540 .encoding
541 .column_encoding
542 .as_ref()
543 .ok_or_else(|| {
544 Error::invalid_input(format!(
545 "the column at index {} was missing a ColumnEncoding",
546 column_info.index
547 ))
548 })?;
549 if matches!(
550 column_encoding,
551 pb::column_encoding::ColumnEncoding::Values(_)
552 ) {
553 Ok(())
554 } else {
555 Err(Error::invalid_input(format!(
556 "the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
557 column_info.index, field_name, column_encoding
558 )))
559 }
560 }
561
562 fn is_structural_primitive(data_type: &DataType) -> bool {
563 if data_type.is_primitive() {
564 true
565 } else {
566 match data_type {
567 DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
569 DataType::Boolean
570 | DataType::Null
571 | DataType::FixedSizeBinary(_)
572 | DataType::Binary
573 | DataType::LargeBinary
574 | DataType::Utf8
575 | DataType::LargeUtf8 => true,
576 DataType::FixedSizeList(inner, _) => {
577 Self::is_structural_primitive(inner.data_type())
578 }
579 _ => false,
580 }
581 }
582 }
583
584 fn is_primitive_legacy(data_type: &DataType) -> bool {
585 if data_type.is_primitive() {
586 true
587 } else {
588 match data_type {
589 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
591 DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
592 _ => false,
593 }
594 }
595 }
596
597 fn create_primitive_scheduler(
598 &self,
599 field: &Field,
600 column: &ColumnInfo,
601 buffers: FileBuffers,
602 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
603 Self::ensure_values_encoded(column, &field.name)?;
604 let column_buffers = ColumnBuffers {
606 file_buffers: buffers,
607 positions_and_sizes: &column.buffer_offsets_and_sizes,
608 };
609 Ok(Box::new(PrimitiveFieldScheduler::new(
610 column.index,
611 field.data_type(),
612 column.page_infos.clone(),
613 column_buffers,
614 self.validate_data,
615 )))
616 }
617
618 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
620 Self::ensure_values_encoded(column_info, field_name)?;
621 if column_info.page_infos.len() != 1 {
622 return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
623 }
624 let encoding = &column_info.page_infos[0].encoding;
625 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
626 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
627 _ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
628 }
629 }
630
631 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
632 let encoding = &column_info.page_infos[0].encoding;
633 matches!(
634 encoding.as_legacy().array_encoding.as_ref().unwrap(),
635 pb::array_encoding::ArrayEncoding::PackedStruct(_)
636 )
637 }
638
639 fn create_list_scheduler(
640 &self,
641 list_field: &Field,
642 column_infos: &mut ColumnInfoIter,
643 buffers: FileBuffers,
644 offsets_column: &ColumnInfo,
645 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
646 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
647 let offsets_column_buffers = ColumnBuffers {
648 file_buffers: buffers,
649 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
650 };
651 let items_scheduler =
652 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
653
654 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
655 .page_infos
656 .iter()
657 .filter(|offsets_page| offsets_page.num_rows > 0)
658 .map(|offsets_page| {
659 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
660 &offsets_page.encoding.as_legacy().array_encoding
661 {
662 let inner = PageInfo {
663 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
664 encoding: PageEncoding::Legacy(
665 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
666 ),
667 num_rows: offsets_page.num_rows,
668 priority: 0,
669 };
670 (
671 inner,
672 OffsetPageInfo {
673 offsets_in_page: offsets_page.num_rows,
674 null_offset_adjustment: list_encoding.null_offset_adjustment,
675 num_items_referenced_by_page: list_encoding.num_items,
676 },
677 )
678 } else {
679 panic!("Expected a list column");
681 }
682 })
683 .unzip();
684 let inner = Arc::new(PrimitiveFieldScheduler::new(
685 offsets_column.index,
686 DataType::UInt64,
687 Arc::from(inner_infos.into_boxed_slice()),
688 offsets_column_buffers,
689 self.validate_data,
690 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
691 let items_field = match list_field.data_type() {
692 DataType::List(inner) => inner,
693 DataType::LargeList(inner) => inner,
694 _ => unreachable!(),
695 };
696 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
697 DataType::Int32
698 } else {
699 DataType::Int64
700 };
701 Ok(Box::new(ListFieldScheduler::new(
702 inner,
703 items_scheduler.into(),
704 items_field,
705 offset_type,
706 null_offset_adjustments,
707 )))
708 }
709
710 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
711 if let column_encoding::ColumnEncoding::Blob(blob) =
712 column_info.encoding.column_encoding.as_ref().unwrap()
713 {
714 let mut column_info = column_info.clone();
715 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
716 Some(column_info)
717 } else {
718 None
719 }
720 }
721
722 fn create_structural_field_scheduler(
723 &self,
724 field: &Field,
725 column_infos: &mut ColumnInfoIter,
726 ) -> Result<Box<dyn StructuralFieldScheduler>> {
727 let data_type = field.data_type();
728 validate_fixed_size_list_dimensions(&field.name, &data_type)?;
729 if Self::is_structural_primitive(&data_type) {
730 let column_info = column_infos.expect_next()?;
731 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
732 column_info.as_ref(),
733 self.decompressor_strategy.as_ref(),
734 self.cache_repetition_index,
735 field,
736 )?);
737
738 column_infos.next_top_level();
740
741 return Ok(scheduler);
742 }
743 match &data_type {
744 DataType::Struct(fields) => {
745 if field.is_packed_struct() {
746 let column_info = column_infos.expect_next()?;
748 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
749 column_info.as_ref(),
750 self.decompressor_strategy.as_ref(),
751 self.cache_repetition_index,
752 field,
753 )?);
754
755 column_infos.next_top_level();
757
758 return Ok(scheduler);
759 }
760 if field.is_blob() {
762 let column_info = column_infos.peek();
763 if column_info.page_infos.iter().any(|page| {
764 matches!(
765 page.encoding,
766 PageEncoding::Structural(pb21::PageLayout {
767 layout: Some(pb21::page_layout::Layout::BlobLayout(_))
768 })
769 )
770 }) {
771 let column_info = column_infos.expect_next()?;
772 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
773 column_info.as_ref(),
774 self.decompressor_strategy.as_ref(),
775 self.cache_repetition_index,
776 field,
777 )?);
778 column_infos.next_top_level();
779 return Ok(scheduler);
780 }
781 }
782
783 let mut child_schedulers = Vec::with_capacity(field.children.len());
784 for field in field.children.iter() {
785 let field_scheduler =
786 self.create_structural_field_scheduler(field, column_infos)?;
787 child_schedulers.push(field_scheduler);
788 }
789
790 let fields = fields.clone();
791 Ok(
792 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
793 as Box<dyn StructuralFieldScheduler>,
794 )
795 }
796 DataType::List(_) | DataType::LargeList(_) => {
797 let child = field.children.first().expect_ok()?;
798 let child_scheduler =
799 self.create_structural_field_scheduler(child, column_infos)?;
800 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
801 as Box<dyn StructuralFieldScheduler>)
802 }
803 DataType::FixedSizeList(inner, dimension)
804 if matches!(inner.data_type(), DataType::Struct(_)) =>
805 {
806 let child = field.children.first().expect_ok()?;
807 let child_scheduler =
808 self.create_structural_field_scheduler(child, column_infos)?;
809 Ok(Box::new(StructuralFixedSizeListScheduler::new(
810 child_scheduler,
811 *dimension,
812 )) as Box<dyn StructuralFieldScheduler>)
813 }
814 DataType::Map(_, keys_sorted) => {
815 if *keys_sorted {
819 return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
820 }
821 let entries_child = field.children.first().expect_ok()?;
822 let child_scheduler =
823 self.create_structural_field_scheduler(entries_child, column_infos)?;
824 Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
825 as Box<dyn StructuralFieldScheduler>)
826 }
827 _ => todo!("create_structural_field_scheduler for {}", data_type),
828 }
829 }
830
831 fn create_legacy_field_scheduler(
832 &self,
833 field: &Field,
834 column_infos: &mut ColumnInfoIter,
835 buffers: FileBuffers,
836 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
837 let data_type = field.data_type();
838 validate_fixed_size_list_dimensions(&field.name, &data_type)?;
839 if Self::is_primitive_legacy(&data_type) {
840 let column_info = column_infos.expect_next()?;
841 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
842 return Ok(scheduler);
843 } else if data_type.is_binary_like() {
844 let column_info = column_infos.expect_next()?.clone();
845 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
847 let desc_scheduler =
848 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
849 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
850 return Ok(blob_scheduler);
851 }
852 if let Some(page_info) = column_info.page_infos.first() {
853 if matches!(
854 page_info.encoding.as_legacy(),
855 pb::ArrayEncoding {
856 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
857 }
858 ) {
859 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
860 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
861 } else {
862 DataType::LargeList(Arc::new(ArrowField::new(
863 "item",
864 DataType::UInt8,
865 false,
866 )))
867 };
868 let list_field = Field::try_from(ArrowField::new(
869 field.name.clone(),
870 list_type,
871 field.nullable,
872 ))
873 .unwrap();
874 let list_scheduler = self.create_list_scheduler(
875 &list_field,
876 column_infos,
877 buffers,
878 &column_info,
879 )?;
880 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
881 list_scheduler.into(),
882 field.data_type(),
883 ));
884 return Ok(binary_scheduler);
885 } else {
886 let scheduler =
887 self.create_primitive_scheduler(field, &column_info, buffers)?;
888 return Ok(scheduler);
889 }
890 } else {
891 return self.create_primitive_scheduler(field, &column_info, buffers);
892 }
893 }
894 match &data_type {
895 DataType::FixedSizeList(inner, _dimension) => {
896 if Self::is_primitive_legacy(inner.data_type()) {
899 let primitive_col = column_infos.expect_next()?;
900 let scheduler =
901 self.create_primitive_scheduler(field, primitive_col, buffers)?;
902 Ok(scheduler)
903 } else {
904 todo!()
905 }
906 }
907 DataType::Dictionary(_key_type, value_type) => {
908 if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
909 let primitive_col = column_infos.expect_next()?;
910 let scheduler =
911 self.create_primitive_scheduler(field, primitive_col, buffers)?;
912 Ok(scheduler)
913 } else {
914 Err(Error::not_supported_source(
915 format!(
916 "No way to decode into a dictionary field of type {}",
917 value_type
918 )
919 .into(),
920 ))
921 }
922 }
923 DataType::List(_) | DataType::LargeList(_) => {
924 let offsets_column = column_infos.expect_next()?.clone();
925 column_infos.next_top_level();
926 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
927 }
928 DataType::Struct(fields) => {
929 let column_info = column_infos.expect_next()?;
930
931 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
933 return self.create_primitive_scheduler(field, &blob_col, buffers);
935 }
936
937 if Self::check_packed_struct(column_info) {
938 self.create_primitive_scheduler(field, column_info, buffers)
940 } else {
941 Self::check_simple_struct(column_info, &field.name).unwrap();
943 let num_rows = column_info
944 .page_infos
945 .iter()
946 .map(|page| page.num_rows)
947 .sum();
948 let mut child_schedulers = Vec::with_capacity(field.children.len());
949 for field in &field.children {
950 column_infos.next_top_level();
951 let field_scheduler =
952 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
953 child_schedulers.push(Arc::from(field_scheduler));
954 }
955
956 let fields = fields.clone();
957 Ok(Box::new(SimpleStructScheduler::new(
958 child_schedulers,
959 fields,
960 num_rows,
961 )))
962 }
963 }
964 _ => todo!(),
966 }
967 }
968}
969
970fn root_column(num_rows: u64) -> ColumnInfo {
972 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
973 let final_page_num_rows = num_rows % (u32::MAX as u64);
974 let root_pages = (0..num_root_pages)
975 .map(|i| PageInfo {
976 num_rows: if i == num_root_pages - 1 {
977 final_page_num_rows
978 } else {
979 u64::MAX
980 },
981 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
982 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
983 pb::SimpleStruct {},
984 )),
985 }),
986 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
988 })
989 .collect::<Vec<_>>();
990 ColumnInfo {
991 buffer_offsets_and_sizes: Arc::new([]),
992 encoding: pb::ColumnEncoding {
993 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
994 },
995 index: u32::MAX,
996 page_infos: Arc::from(root_pages),
997 }
998}
999
1000pub enum RootDecoder {
1001 Structural(StructuralStructDecoder),
1002 Legacy(SimpleStructDecoder),
1003}
1004
1005impl RootDecoder {
1006 pub fn into_structural(self) -> StructuralStructDecoder {
1007 match self {
1008 Self::Structural(decoder) => decoder,
1009 Self::Legacy(_) => panic!("Expected a structural decoder"),
1010 }
1011 }
1012
1013 pub fn into_legacy(self) -> SimpleStructDecoder {
1014 match self {
1015 Self::Legacy(decoder) => decoder,
1016 Self::Structural(_) => panic!("Expected a legacy decoder"),
1017 }
1018 }
1019}
1020
1021impl DecodeBatchScheduler {
1022 #[allow(clippy::too_many_arguments)]
1025 pub async fn try_new<'a>(
1026 schema: &'a Schema,
1027 column_indices: &[u32],
1028 column_infos: &[Arc<ColumnInfo>],
1029 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1030 num_rows: u64,
1031 _decoder_plugins: Arc<DecoderPlugins>,
1032 io: Arc<dyn EncodingsIo>,
1033 cache: Arc<LanceCache>,
1034 filter: &FilterExpression,
1035 decoder_config: &DecoderConfig,
1036 ) -> Result<Self> {
1037 assert!(num_rows > 0);
1038 let buffers = FileBuffers {
1039 positions_and_sizes: file_buffer_positions_and_sizes,
1040 };
1041 let arrow_schema = ArrowSchema::from(schema);
1042 let root_fields = arrow_schema.fields().clone();
1043 let root_type = DataType::Struct(root_fields.clone());
1044 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1045 root_field.children.clone_from(&schema.fields);
1049 root_field
1050 .metadata
1051 .insert("__lance_decoder_root".to_string(), "true".to_string());
1052
1053 if column_infos.is_empty() || column_infos[0].is_structural() {
1054 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1055
1056 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1057 let mut root_scheduler =
1058 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1059
1060 let context = SchedulerContext::new(io, cache.clone());
1061 root_scheduler.initialize(filter, &context).await?;
1062
1063 Ok(Self {
1064 root_scheduler: RootScheduler::Structural(root_scheduler),
1065 root_fields,
1066 cache,
1067 })
1068 } else {
1069 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1072 columns.push(Arc::new(root_column(num_rows)));
1073 columns.extend(column_infos.iter().cloned());
1074
1075 let adjusted_column_indices = [0_u32]
1076 .into_iter()
1077 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1078 .collect::<Vec<_>>();
1079 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1080 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1081 let root_scheduler =
1082 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1083
1084 let context = SchedulerContext::new(io, cache.clone());
1085 root_scheduler.initialize(filter, &context).await?;
1086
1087 Ok(Self {
1088 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1089 root_fields,
1090 cache,
1091 })
1092 }
1093 }
1094
1095 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1096 pub fn from_scheduler(
1097 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1098 root_fields: Fields,
1099 cache: Arc<LanceCache>,
1100 ) -> Self {
1101 Self {
1102 root_scheduler: RootScheduler::Legacy(root_scheduler),
1103 root_fields,
1104 cache,
1105 }
1106 }
1107
1108 fn do_schedule_ranges_structural(
1109 &mut self,
1110 ranges: &[Range<u64>],
1111 filter: &FilterExpression,
1112 io: Arc<dyn EncodingsIo>,
1113 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1114 ) {
1115 let root_scheduler = self.root_scheduler.as_structural();
1116 let mut context = SchedulerContext::new(io, self.cache.clone());
1117 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1118 if let Err(schedule_ranges_err) = maybe_root_job {
1119 schedule_action(Err(schedule_ranges_err));
1120 return;
1121 }
1122 let mut root_job = maybe_root_job.unwrap();
1123 let mut num_rows_scheduled = 0;
1124 loop {
1125 let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1126 if let Err(err) = maybe_next_scan_lines {
1127 schedule_action(Err(err));
1128 return;
1129 }
1130 let next_scan_lines = maybe_next_scan_lines.unwrap();
1131 if next_scan_lines.is_empty() {
1132 return;
1133 }
1134 for next_scan_line in next_scan_lines {
1135 trace!(
1136 "Scheduled scan line of {} rows and {} decoders",
1137 next_scan_line.rows_scheduled,
1138 next_scan_line.decoders.len()
1139 );
1140 num_rows_scheduled += next_scan_line.rows_scheduled;
1141 if !schedule_action(Ok(DecoderMessage {
1142 scheduled_so_far: num_rows_scheduled,
1143 decoders: next_scan_line.decoders,
1144 })) {
1145 return;
1147 }
1148 }
1149 }
1150 }
1151
1152 fn do_schedule_ranges_legacy(
1153 &mut self,
1154 ranges: &[Range<u64>],
1155 filter: &FilterExpression,
1156 io: Arc<dyn EncodingsIo>,
1157 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1158 priority: Option<Box<dyn PriorityRange>>,
1162 ) {
1163 let root_scheduler = self.root_scheduler.as_legacy();
1164 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1165 trace!(
1166 "Scheduling {} ranges across {}..{} ({} rows){}",
1167 ranges.len(),
1168 ranges.first().unwrap().start,
1169 ranges.last().unwrap().end,
1170 rows_requested,
1171 priority
1172 .as_ref()
1173 .map(|p| format!(" (priority={:?})", p))
1174 .unwrap_or_default()
1175 );
1176
1177 let mut context = SchedulerContext::new(io, self.cache.clone());
1178 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1179 if let Err(schedule_ranges_err) = maybe_root_job {
1180 schedule_action(Err(schedule_ranges_err));
1181 return;
1182 }
1183 let mut root_job = maybe_root_job.unwrap();
1184 let mut num_rows_scheduled = 0;
1185 let mut rows_to_schedule = root_job.num_rows();
1186 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1187 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1188 while rows_to_schedule > 0 {
1189 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1190 if let Err(schedule_next_err) = maybe_next_scan_line {
1191 schedule_action(Err(schedule_next_err));
1192 return;
1193 }
1194 let next_scan_line = maybe_next_scan_line.unwrap();
1195 priority.advance(next_scan_line.rows_scheduled);
1196 num_rows_scheduled += next_scan_line.rows_scheduled;
1197 rows_to_schedule -= next_scan_line.rows_scheduled;
1198 trace!(
1199 "Scheduled scan line of {} rows and {} decoders",
1200 next_scan_line.rows_scheduled,
1201 next_scan_line.decoders.len()
1202 );
1203 if !schedule_action(Ok(DecoderMessage {
1204 scheduled_so_far: num_rows_scheduled,
1205 decoders: next_scan_line.decoders,
1206 })) {
1207 return;
1209 }
1210
1211 trace!("Finished scheduling {} ranges", ranges.len());
1212 }
1213 }
1214
1215 fn do_schedule_ranges(
1216 &mut self,
1217 ranges: &[Range<u64>],
1218 filter: &FilterExpression,
1219 io: Arc<dyn EncodingsIo>,
1220 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1221 priority: Option<Box<dyn PriorityRange>>,
1225 ) {
1226 match &self.root_scheduler {
1227 RootScheduler::Legacy(_) => {
1228 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1229 }
1230 RootScheduler::Structural(_) => {
1231 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1232 }
1233 }
1234 }
1235
1236 pub fn schedule_ranges_to_vec(
1239 &mut self,
1240 ranges: &[Range<u64>],
1241 filter: &FilterExpression,
1242 io: Arc<dyn EncodingsIo>,
1243 priority: Option<Box<dyn PriorityRange>>,
1244 ) -> Result<Vec<DecoderMessage>> {
1245 let mut decode_messages = Vec::new();
1246 self.do_schedule_ranges(
1247 ranges,
1248 filter,
1249 io,
1250 |msg| {
1251 decode_messages.push(msg);
1252 true
1253 },
1254 priority,
1255 );
1256 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1257 }
1258
1259 #[instrument(skip_all)]
1269 pub fn schedule_ranges(
1270 &mut self,
1271 ranges: &[Range<u64>],
1272 filter: &FilterExpression,
1273 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1274 scheduler: Arc<dyn EncodingsIo>,
1275 ) {
1276 self.do_schedule_ranges(
1277 ranges,
1278 filter,
1279 scheduler,
1280 |msg| {
1281 match sink.send(msg) {
1282 Ok(_) => true,
1283 Err(SendError { .. }) => {
1284 debug!(
1287 "schedule_ranges aborting early since decoder appears to have been dropped"
1288 );
1289 false
1290 }
1291 }
1292 },
1293 None,
1294 )
1295 }
1296
1297 #[instrument(skip_all)]
1305 pub fn schedule_range(
1306 &mut self,
1307 range: Range<u64>,
1308 filter: &FilterExpression,
1309 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1310 scheduler: Arc<dyn EncodingsIo>,
1311 ) {
1312 self.schedule_ranges(&[range], filter, sink, scheduler)
1313 }
1314
1315 pub fn schedule_take(
1323 &mut self,
1324 indices: &[u64],
1325 filter: &FilterExpression,
1326 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1327 scheduler: Arc<dyn EncodingsIo>,
1328 ) {
1329 debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
1330 if indices.is_empty() {
1331 return;
1332 }
1333 trace!("Scheduling take of {} rows", indices.len());
1334 let ranges = Self::indices_to_ranges(indices);
1335 self.schedule_ranges(&ranges, filter, sink, scheduler)
1336 }
1337
1338 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1340 let mut ranges = Vec::new();
1341 let mut start = indices[0];
1342
1343 for window in indices.windows(2) {
1344 if window[1] != window[0] + 1 {
1345 ranges.push(start..window[0] + 1);
1346 start = window[1];
1347 }
1348 }
1349
1350 ranges.push(start..*indices.last().unwrap() + 1);
1351 ranges
1352 }
1353}
1354
1355pub struct ReadBatchTask {
1356 pub task: BoxFuture<'static, Result<RecordBatch>>,
1357 pub num_rows: u32,
1358}
1359
1360pub struct BatchDecodeStream {
1362 context: DecoderContext,
1363 root_decoder: SimpleStructDecoder,
1364 rows_remaining: u64,
1365 rows_per_batch: u32,
1366 rows_scheduled: u64,
1367 rows_drained: u64,
1368 scheduler_exhausted: bool,
1369 emitted_batch_size_warning: Arc<Once>,
1370}
1371
1372impl BatchDecodeStream {
1373 pub fn new(
1383 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1384 rows_per_batch: u32,
1385 num_rows: u64,
1386 root_decoder: SimpleStructDecoder,
1387 ) -> Self {
1388 Self {
1389 context: DecoderContext::new(scheduled),
1390 root_decoder,
1391 rows_remaining: num_rows,
1392 rows_per_batch,
1393 rows_scheduled: 0,
1394 rows_drained: 0,
1395 scheduler_exhausted: false,
1396 emitted_batch_size_warning: Arc::new(Once::new()),
1397 }
1398 }
1399
1400 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1401 if decoder.path.is_empty() {
1402 Ok(())
1404 } else {
1405 self.root_decoder.accept_child(decoder)
1406 }
1407 }
1408
1409 #[instrument(level = "debug", skip_all)]
1410 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1411 if self.scheduler_exhausted {
1412 return Ok(self.rows_scheduled);
1413 }
1414 while self.rows_scheduled < scheduled_need {
1415 let next_message = self.context.source.recv().await;
1416 match next_message {
1417 Some(scan_line) => {
1418 let scan_line = scan_line?;
1419 self.rows_scheduled = scan_line.scheduled_so_far;
1420 for message in scan_line.decoders {
1421 self.accept_decoder(message.into_legacy())?;
1422 }
1423 }
1424 None => {
1425 self.scheduler_exhausted = true;
1429 return Ok(self.rows_scheduled);
1430 }
1431 }
1432 }
1433 Ok(scheduled_need)
1434 }
1435
1436 #[instrument(level = "debug", skip_all)]
1437 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1438 trace!(
1439 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1440 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1441 );
1442 if self.rows_remaining == 0 {
1443 return Ok(None);
1444 }
1445
1446 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1447 self.rows_remaining -= to_take;
1448
1449 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1450 trace!(
1451 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1452 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1453 );
1454 if scheduled_need > 0 {
1455 let desired_scheduled = scheduled_need + self.rows_scheduled;
1456 trace!(
1457 "Draining from scheduler (desire at least {} scheduled rows)",
1458 desired_scheduled
1459 );
1460 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1461 if actually_scheduled < desired_scheduled {
1462 let under_scheduled = desired_scheduled - actually_scheduled;
1463 to_take -= under_scheduled;
1464 }
1465 }
1466
1467 if to_take == 0 {
1468 return Ok(None);
1469 }
1470
1471 let loaded_need = self.rows_drained + to_take - 1;
1473 trace!(
1474 "Waiting for I/O (desire at least {} fully loaded rows)",
1475 loaded_need
1476 );
1477 self.root_decoder.wait_for_loaded(loaded_need).await?;
1478
1479 let next_task = self.root_decoder.drain(to_take)?;
1480 self.rows_drained += to_take;
1481 Ok(Some(next_task))
1482 }
1483
1484 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1485 let stream = futures::stream::unfold(self, |mut slf| async move {
1486 let next_task = slf.next_batch_task().await;
1487 let next_task = next_task.transpose().map(|next_task| {
1488 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1489 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1490 let task = async move {
1491 let next_task = next_task?;
1492 let (batch, _data_size) =
1496 tokio::spawn(
1497 async move { next_task.into_batch(emitted_batch_size_warning) },
1498 )
1499 .await
1500 .map_err(|err| Error::wrapped(err.into()))??;
1501 Ok(batch)
1502 };
1503 (task, num_rows)
1504 });
1505 next_task.map(|(task, num_rows)| {
1506 debug_assert!(num_rows <= u32::MAX as u64);
1508 let next_task = ReadBatchTask {
1509 task: task.boxed(),
1510 num_rows: num_rows as u32,
1511 };
1512 (next_task, slf)
1513 })
1514 });
1515 stream.boxed()
1516 }
1517}
1518
1519enum RootDecoderMessage {
1522 LoadedPage(LoadedPageShard),
1523 LegacyPage(crate::previous::decoder::DecoderReady),
1524}
1525trait RootDecoderType {
1526 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1527 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1528 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1529}
1530impl RootDecoderType for StructuralStructDecoder {
1531 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1532 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1533 unreachable!()
1534 };
1535 self.accept_page(loaded_page)
1536 }
1537 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1538 self.drain_batch_task(num_rows)
1539 }
1540 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1541 Ok(())
1543 }
1544}
1545impl RootDecoderType for SimpleStructDecoder {
1546 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1547 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1548 unreachable!()
1549 };
1550 self.accept_child(legacy_page)
1551 }
1552 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1553 self.drain(num_rows)
1554 }
1555 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1556 runtime.block_on(self.wait_for_loaded(loaded_need))
1557 }
1558}
1559
1560struct BatchDecodeIterator<T: RootDecoderType> {
1562 messages: VecDeque<Result<DecoderMessage>>,
1563 root_decoder: T,
1564 rows_remaining: u64,
1565 rows_per_batch: u32,
1566 rows_scheduled: u64,
1567 rows_drained: u64,
1568 emitted_batch_size_warning: Arc<Once>,
1569 wait_for_io_runtime: tokio::runtime::Runtime,
1573 schema: Arc<ArrowSchema>,
1574}
1575
1576impl<T: RootDecoderType> BatchDecodeIterator<T> {
1577 pub fn new(
1579 messages: VecDeque<Result<DecoderMessage>>,
1580 rows_per_batch: u32,
1581 num_rows: u64,
1582 root_decoder: T,
1583 schema: Arc<ArrowSchema>,
1584 ) -> Self {
1585 Self {
1586 messages,
1587 root_decoder,
1588 rows_remaining: num_rows,
1589 rows_per_batch,
1590 rows_scheduled: 0,
1591 rows_drained: 0,
1592 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1593 .build()
1594 .unwrap(),
1595 emitted_batch_size_warning: Arc::new(Once::new()),
1596 schema,
1597 }
1598 }
1599
1600 fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1605 match maybe_done(unloaded_page.0) {
1606 MaybeDone::Done(loaded_page) => loaded_page,
1608 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1610 MaybeDone::Gone => unreachable!(),
1611 }
1612 }
1613
1614 #[instrument(skip_all)]
1619 fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1620 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1621 let message = self.messages.pop_front().unwrap()?;
1622 self.rows_scheduled = message.scheduled_so_far;
1623 for decoder_message in message.decoders {
1624 match decoder_message {
1625 MessageType::UnloadedPage(unloaded_page) => {
1626 let loaded_page = self.wait_for_page(unloaded_page)?;
1627 self.root_decoder
1628 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1629 }
1630 MessageType::DecoderReady(decoder_ready) => {
1631 if !decoder_ready.path.is_empty() {
1633 self.root_decoder
1634 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1635 }
1636 }
1637 }
1638 }
1639 }
1640
1641 let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1642
1643 self.root_decoder
1644 .wait(loaded_need, &self.wait_for_io_runtime)?;
1645 Ok(self.rows_scheduled)
1646 }
1647
1648 #[instrument(level = "debug", skip_all)]
1649 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1650 trace!(
1651 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1652 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1653 );
1654 if self.rows_remaining == 0 {
1655 return Ok(None);
1656 }
1657
1658 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1659 self.rows_remaining -= to_take;
1660
1661 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1662 trace!(
1663 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1664 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1665 );
1666 if scheduled_need > 0 {
1667 let desired_scheduled = scheduled_need + self.rows_scheduled;
1668 trace!(
1669 "Draining from scheduler (desire at least {} scheduled rows)",
1670 desired_scheduled
1671 );
1672 let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1673 if actually_scheduled < desired_scheduled {
1674 let under_scheduled = desired_scheduled - actually_scheduled;
1675 to_take -= under_scheduled;
1676 }
1677 }
1678
1679 if to_take == 0 {
1680 return Ok(None);
1681 }
1682
1683 let next_task = self.root_decoder.drain_batch(to_take)?;
1684
1685 self.rows_drained += to_take;
1686
1687 let (batch, _data_size) = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1688
1689 Ok(Some(batch))
1690 }
1691}
1692
1693impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1694 type Item = ArrowResult<RecordBatch>;
1695
1696 fn next(&mut self) -> Option<Self::Item> {
1697 self.next_batch_task()
1698 .transpose()
1699 .map(|r| r.map_err(ArrowError::from))
1700 }
1701}
1702
1703impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1704 fn schema(&self) -> Arc<ArrowSchema> {
1705 self.schema.clone()
1706 }
1707}
1708
1709fn estimate_bytes_per_row(data_type: &DataType) -> f64 {
1720 if let Some(w) = data_type.byte_width_opt() {
1721 return w as f64;
1722 }
1723 match data_type {
1724 DataType::Boolean => 1.0 / 8.0,
1725 DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => 64.0,
1726 DataType::Struct(fields) => fields
1727 .iter()
1728 .map(|f| estimate_bytes_per_row(f.data_type()))
1729 .sum(),
1730 DataType::List(child) | DataType::LargeList(child) => {
1731 5.0 * estimate_bytes_per_row(child.data_type())
1732 }
1733 DataType::FixedSizeList(child, dim) => {
1734 *dim as f64 * estimate_bytes_per_row(child.data_type())
1735 }
1736 DataType::Dictionary(_, value_type) => estimate_bytes_per_row(value_type),
1737 DataType::Map(entries, _) => 5.0 * estimate_bytes_per_row(entries.data_type()),
1738 _ => 64.0,
1739 }
1740}
1741
1742pub struct StructuralBatchDecodeStream {
1744 context: DecoderContext,
1745 root_decoder: StructuralStructDecoder,
1746 rows_remaining: u64,
1747 rows_per_batch: u32,
1748 rows_scheduled: u64,
1749 rows_drained: u64,
1750 scheduler_exhausted: bool,
1751 emitted_batch_size_warning: Arc<Once>,
1752 spawn_batch_decode_tasks: bool,
1760 batch_size_bytes: Option<u64>,
1762 schema_bytes_per_row: f64,
1765 bytes_per_row_feedback: Arc<AtomicU64>,
1768}
1769
1770impl StructuralBatchDecodeStream {
1771 pub fn new(
1781 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1782 rows_per_batch: u32,
1783 num_rows: u64,
1784 root_decoder: StructuralStructDecoder,
1785 spawn_batch_decode_tasks: bool,
1786 batch_size_bytes: Option<u64>,
1787 ) -> Self {
1788 let schema_bytes_per_row = if batch_size_bytes.is_some() {
1789 estimate_bytes_per_row(root_decoder.data_type()).max(1.0)
1790 } else {
1791 0.0
1792 };
1793 Self {
1794 context: DecoderContext::new(scheduled),
1795 root_decoder,
1796 rows_remaining: num_rows,
1797 rows_per_batch,
1798 rows_scheduled: 0,
1799 rows_drained: 0,
1800 scheduler_exhausted: false,
1801 emitted_batch_size_warning: Arc::new(Once::new()),
1802 spawn_batch_decode_tasks,
1803 batch_size_bytes,
1804 schema_bytes_per_row,
1805 bytes_per_row_feedback: Arc::new(AtomicU64::new(0)),
1806 }
1807 }
1808
1809 #[instrument(level = "debug", skip_all)]
1810 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1811 if self.scheduler_exhausted {
1812 return Ok(self.rows_scheduled);
1813 }
1814 while self.rows_scheduled < scheduled_need {
1815 let next_message = self.context.source.recv().await;
1816 match next_message {
1817 Some(scan_line) => {
1818 let scan_line = scan_line?;
1819 self.rows_scheduled = scan_line.scheduled_so_far;
1820 for message in scan_line.decoders {
1821 let unloaded_page = message.into_structural();
1822 let loaded_page = unloaded_page.0.await?;
1823 self.root_decoder.accept_page(loaded_page)?;
1824 }
1825 }
1826 None => {
1827 self.scheduler_exhausted = true;
1831 return Ok(self.rows_scheduled);
1832 }
1833 }
1834 }
1835 Ok(scheduled_need)
1836 }
1837
1838 #[instrument(level = "debug", skip_all)]
1839 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1840 trace!(
1841 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1842 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1843 );
1844 if self.rows_remaining == 0 {
1845 return Ok(None);
1846 }
1847
1848 let mut to_take = if let Some(batch_size_bytes) = self.batch_size_bytes {
1849 let feedback = self.bytes_per_row_feedback.load(Ordering::Relaxed);
1850 let bpr = if feedback > 0 {
1851 feedback as f64
1852 } else {
1853 self.schema_bytes_per_row
1854 };
1855 let rows = (batch_size_bytes as f64 / bpr) as u64;
1856 self.rows_remaining.min(rows.max(1))
1857 } else {
1858 self.rows_remaining.min(self.rows_per_batch as u64)
1859 };
1860 self.rows_remaining -= to_take;
1861
1862 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1863 trace!(
1864 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1865 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1866 );
1867 if scheduled_need > 0 {
1868 let desired_scheduled = scheduled_need + self.rows_scheduled;
1869 trace!(
1870 "Draining from scheduler (desire at least {} scheduled rows)",
1871 desired_scheduled
1872 );
1873 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1874 if actually_scheduled < desired_scheduled {
1875 let under_scheduled = desired_scheduled - actually_scheduled;
1876 to_take -= under_scheduled;
1877 }
1878 }
1879
1880 if to_take == 0 {
1881 return Ok(None);
1882 }
1883
1884 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1885 self.rows_drained += to_take;
1886 Ok(Some(next_task))
1887 }
1888
1889 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1890 let stream = futures::stream::unfold(self, |mut slf| async move {
1891 let next_task = slf.next_batch_task().await;
1892 let next_task = next_task.transpose().map(|next_task| {
1893 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1894 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1895 let bytes_per_row_feedback = slf.bytes_per_row_feedback.clone();
1896 let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
1899 let task = async move {
1900 let next_task = next_task?;
1901 let (batch, data_size) = if spawn_batch_decode_tasks {
1902 tokio::spawn(
1903 async move { next_task.into_batch(emitted_batch_size_warning) },
1904 )
1905 .await
1906 .map_err(|err| Error::wrapped(err.into()))??
1907 } else {
1908 next_task.into_batch(emitted_batch_size_warning)?
1909 };
1910 let num_rows = batch.num_rows() as u64;
1911 if num_rows > 0 {
1912 let bpr = data_size / num_rows;
1913 let prev = bytes_per_row_feedback.load(Ordering::Relaxed);
1914 let next = if prev == 0 || bpr >= prev {
1915 bpr
1918 } else {
1919 (prev + bpr) / 2
1923 };
1924 bytes_per_row_feedback.store(next.max(1), Ordering::Relaxed);
1925 }
1926 Ok(batch)
1927 };
1928 (task, num_rows)
1929 });
1930 next_task.map(|(task, num_rows)| {
1931 debug_assert!(num_rows <= u32::MAX as u64);
1933 let next_task = ReadBatchTask {
1934 task: task.boxed(),
1935 num_rows: num_rows as u32,
1936 };
1937 (next_task, slf)
1938 })
1939 });
1940 stream.boxed()
1941 }
1942}
1943
1944#[derive(Debug)]
1945pub enum RequestedRows {
1946 Ranges(Vec<Range<u64>>),
1947 Indices(Vec<u64>),
1948}
1949
1950impl RequestedRows {
1951 pub fn num_rows(&self) -> u64 {
1952 match self {
1953 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1954 Self::Indices(indices) => indices.len() as u64,
1955 }
1956 }
1957
1958 pub fn trim_empty_ranges(mut self) -> Self {
1959 if let Self::Ranges(ranges) = &mut self {
1960 ranges.retain(|r| !r.is_empty());
1961 }
1962 self
1963 }
1964}
1965
1966#[derive(Debug, Clone)]
1968pub struct DecoderConfig {
1969 pub cache_repetition_index: bool,
1976 pub validate_on_decode: bool,
1978 pub inline_scheduling: Option<bool>,
1996}
1997
1998impl Default for DecoderConfig {
1999 fn default() -> Self {
2000 Self {
2001 cache_repetition_index: default_cache_repetition_index(),
2002 validate_on_decode: false,
2003 inline_scheduling: None,
2004 }
2005 }
2006}
2007
2008#[derive(Debug, Clone)]
2009pub struct SchedulerDecoderConfig {
2010 pub decoder_plugins: Arc<DecoderPlugins>,
2011 pub batch_size: u32,
2012 pub io: Arc<dyn EncodingsIo>,
2013 pub cache: Arc<LanceCache>,
2014 pub decoder_config: DecoderConfig,
2016 pub batch_size_bytes: Option<u64>,
2021}
2022
2023fn check_scheduler_on_drop(
2024 stream: BoxStream<'static, ReadBatchTask>,
2025 scheduler_handle: tokio::task::JoinHandle<()>,
2026) -> BoxStream<'static, ReadBatchTask> {
2027 let abort_handle = scheduler_handle.abort_handle();
2031 let mut scheduler_handle = Some(scheduler_handle);
2032 let check_scheduler = stream::unfold((), move |_| {
2033 let handle = scheduler_handle.take();
2034 async move {
2035 if let Some(handle) = handle {
2036 handle.await.unwrap();
2037 }
2038 None
2039 }
2040 });
2041 stream
2042 .chain(check_scheduler)
2043 .on_drop(move || {
2044 abort_handle.abort();
2052 })
2053 .boxed()
2054}
2055
2056#[allow(clippy::too_many_arguments)]
2057pub fn create_decode_stream(
2058 schema: &Schema,
2059 num_rows: u64,
2060 batch_size: u32,
2061 is_structural: bool,
2062 should_validate: bool,
2063 spawn_structural_batch_decode_tasks: bool,
2064 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2065 batch_size_bytes: Option<u64>,
2066) -> Result<BoxStream<'static, ReadBatchTask>> {
2067 if is_structural {
2068 let arrow_schema = ArrowSchema::from(schema);
2069 let structural_decoder = StructuralStructDecoder::new(
2070 arrow_schema.fields,
2071 should_validate,
2072 true,
2073 )?;
2074 Ok(StructuralBatchDecodeStream::new(
2075 rx,
2076 batch_size,
2077 num_rows,
2078 structural_decoder,
2079 spawn_structural_batch_decode_tasks,
2080 batch_size_bytes,
2081 )
2082 .into_stream())
2083 } else {
2084 if batch_size_bytes.is_some() {
2085 warn!("batch_size_bytes is not supported for v2.0 files and will be ignored");
2086 }
2087 let arrow_schema = ArrowSchema::from(schema);
2088 let root_fields = arrow_schema.fields;
2089
2090 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2091 Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
2092 }
2093}
2094
2095pub fn create_decode_iterator(
2099 schema: &Schema,
2100 num_rows: u64,
2101 batch_size: u32,
2102 should_validate: bool,
2103 is_structural: bool,
2104 messages: VecDeque<Result<DecoderMessage>>,
2105) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2106 let arrow_schema = Arc::new(ArrowSchema::from(schema));
2107 let root_fields = arrow_schema.fields.clone();
2108 if is_structural {
2109 let simple_struct_decoder =
2110 StructuralStructDecoder::new(root_fields, should_validate, true)?;
2111 Ok(Box::new(BatchDecodeIterator::new(
2112 messages,
2113 batch_size,
2114 num_rows,
2115 simple_struct_decoder,
2116 arrow_schema,
2117 )))
2118 } else {
2119 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2120 Ok(Box::new(BatchDecodeIterator::new(
2121 messages,
2122 batch_size,
2123 num_rows,
2124 root_decoder,
2125 arrow_schema,
2126 )))
2127 }
2128}
2129
2130async fn create_scheduler_decoder(
2131 column_infos: Vec<Arc<ColumnInfo>>,
2132 requested_rows: RequestedRows,
2133 filter: FilterExpression,
2134 column_indices: Vec<u32>,
2135 target_schema: Arc<Schema>,
2136 config: SchedulerDecoderConfig,
2137) -> Result<BoxStream<'static, ReadBatchTask>> {
2138 let num_rows = requested_rows.num_rows();
2139
2140 let is_structural = column_infos[0].is_structural();
2141 let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2142 let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
2143 Some("always") => true,
2144 Some("never") => false,
2145 _ => matches!(requested_rows, RequestedRows::Ranges(_)),
2146 };
2147
2148 let (tx, rx) = mpsc::unbounded_channel();
2149
2150 let decode_stream = create_decode_stream(
2151 &target_schema,
2152 num_rows,
2153 config.batch_size,
2154 is_structural,
2155 config.decoder_config.validate_on_decode,
2156 spawn_structural_batch_decode_tasks,
2157 rx,
2158 config.batch_size_bytes,
2159 )?;
2160
2161 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2166 target_schema.as_ref(),
2167 &column_indices,
2168 &column_infos,
2169 &vec![],
2170 num_rows,
2171 config.decoder_plugins,
2172 config.io.clone(),
2173 config.cache,
2174 &filter,
2175 &config.decoder_config,
2176 )
2177 .await?;
2178
2179 let inline_scheduling = config
2185 .decoder_config
2186 .inline_scheduling
2187 .unwrap_or_else(|| num_rows <= inline_scheduling_threshold());
2188
2189 if inline_scheduling {
2190 match requested_rows {
2191 RequestedRows::Ranges(ranges) => {
2192 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2193 }
2194 RequestedRows::Indices(indices) => {
2195 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2196 }
2197 }
2198 Ok(decode_stream)
2199 } else {
2200 let scheduling = async move {
2204 match requested_rows {
2205 RequestedRows::Ranges(ranges) => {
2206 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2207 }
2208 RequestedRows::Indices(indices) => {
2209 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2210 }
2211 }
2212 };
2213 let scheduler_handle = tokio::task::spawn(scheduling);
2214 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2215 }
2216}
2217
2218pub async fn schedule_and_decode(
2233 column_infos: Vec<Arc<ColumnInfo>>,
2234 requested_rows: RequestedRows,
2235 filter: FilterExpression,
2236 column_indices: Vec<u32>,
2237 target_schema: Arc<Schema>,
2238 config: SchedulerDecoderConfig,
2239) -> Result<BoxStream<'static, ReadBatchTask>> {
2240 if requested_rows.num_rows() == 0 {
2241 return Ok(stream::empty().boxed());
2242 }
2243
2244 let requested_rows = requested_rows.trim_empty_ranges();
2247
2248 let io = config.io.clone();
2249
2250 let stream = create_scheduler_decoder(
2251 column_infos,
2252 requested_rows,
2253 filter,
2254 column_indices,
2255 target_schema,
2256 config,
2257 )
2258 .await?;
2259
2260 Ok(stream.finally(move || drop(io)).boxed())
2263}
2264
2265pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2266 tokio::runtime::Builder::new_current_thread()
2267 .build()
2268 .unwrap()
2269});
2270
2271pub fn schedule_and_decode_blocking(
2286 column_infos: Vec<Arc<ColumnInfo>>,
2287 requested_rows: RequestedRows,
2288 filter: FilterExpression,
2289 column_indices: Vec<u32>,
2290 target_schema: Arc<Schema>,
2291 config: SchedulerDecoderConfig,
2292) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2293 if requested_rows.num_rows() == 0 {
2294 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2295 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2296 }
2297
2298 let num_rows = requested_rows.num_rows();
2299 let is_structural = column_infos[0].is_structural();
2300
2301 let (tx, mut rx) = mpsc::unbounded_channel();
2302
2303 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2306 target_schema.as_ref(),
2307 &column_indices,
2308 &column_infos,
2309 &vec![],
2310 num_rows,
2311 config.decoder_plugins,
2312 config.io.clone(),
2313 config.cache,
2314 &filter,
2315 &config.decoder_config,
2316 ))?;
2317
2318 match requested_rows {
2320 RequestedRows::Ranges(ranges) => {
2321 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2322 }
2323 RequestedRows::Indices(indices) => {
2324 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2325 }
2326 }
2327
2328 let mut messages = Vec::new();
2330 while rx
2331 .recv_many(&mut messages, usize::MAX)
2332 .now_or_never()
2333 .unwrap()
2334 != 0
2335 {}
2336
2337 let decode_iterator = create_decode_iterator(
2339 &target_schema,
2340 num_rows,
2341 config.batch_size,
2342 config.decoder_config.validate_on_decode,
2343 is_structural,
2344 messages.into(),
2345 )?;
2346
2347 Ok(decode_iterator)
2348}
2349
2350pub trait PrimitivePageDecoder: Send + Sync {
2362 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2394}
2395
2396pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2405 fn schedule_ranges(
2417 &self,
2418 ranges: &[Range<u64>],
2419 scheduler: &Arc<dyn EncodingsIo>,
2420 top_level_row: u64,
2421 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2422}
2423
2424pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2426 fn advance(&mut self, num_rows: u64);
2427 fn current_priority(&self) -> u64;
2428 fn box_clone(&self) -> Box<dyn PriorityRange>;
2429}
2430
2431#[derive(Debug)]
2434pub struct SimplePriorityRange {
2435 priority: u64,
2436}
2437
2438impl SimplePriorityRange {
2439 fn new(priority: u64) -> Self {
2440 Self { priority }
2441 }
2442}
2443
2444impl PriorityRange for SimplePriorityRange {
2445 fn advance(&mut self, num_rows: u64) {
2446 self.priority += num_rows;
2447 }
2448
2449 fn current_priority(&self) -> u64 {
2450 self.priority
2451 }
2452
2453 fn box_clone(&self) -> Box<dyn PriorityRange> {
2454 Box::new(Self {
2455 priority: self.priority,
2456 })
2457 }
2458}
2459
2460pub struct ListPriorityRange {
2473 base: Box<dyn PriorityRange>,
2474 offsets: Arc<[u64]>,
2475 cur_index_into_offsets: usize,
2476 cur_position: u64,
2477}
2478
2479impl ListPriorityRange {
2480 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2481 Self {
2482 base,
2483 offsets,
2484 cur_index_into_offsets: 0,
2485 cur_position: 0,
2486 }
2487 }
2488}
2489
2490impl std::fmt::Debug for ListPriorityRange {
2491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2492 f.debug_struct("ListPriorityRange")
2493 .field("base", &self.base)
2494 .field("offsets.len()", &self.offsets.len())
2495 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2496 .field("cur_position", &self.cur_position)
2497 .finish()
2498 }
2499}
2500
2501impl PriorityRange for ListPriorityRange {
2502 fn advance(&mut self, num_rows: u64) {
2503 self.cur_position += num_rows;
2506 let mut idx_into_offsets = self.cur_index_into_offsets;
2507 while idx_into_offsets + 1 < self.offsets.len()
2508 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2509 {
2510 idx_into_offsets += 1;
2511 }
2512 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2513 self.cur_index_into_offsets = idx_into_offsets;
2514 self.base.advance(base_rows_advanced as u64);
2515 }
2516
2517 fn current_priority(&self) -> u64 {
2518 self.base.current_priority()
2519 }
2520
2521 fn box_clone(&self) -> Box<dyn PriorityRange> {
2522 Box::new(Self {
2523 base: self.base.box_clone(),
2524 offsets: self.offsets.clone(),
2525 cur_index_into_offsets: self.cur_index_into_offsets,
2526 cur_position: self.cur_position,
2527 })
2528 }
2529}
2530
2531pub struct SchedulerContext {
2533 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2534 io: Arc<dyn EncodingsIo>,
2535 cache: Arc<LanceCache>,
2536 name: String,
2537 path: Vec<u32>,
2538 path_names: Vec<String>,
2539}
2540
2541pub struct ScopedSchedulerContext<'a> {
2542 pub context: &'a mut SchedulerContext,
2543}
2544
2545impl<'a> ScopedSchedulerContext<'a> {
2546 pub fn pop(self) -> &'a mut SchedulerContext {
2547 self.context.pop();
2548 self.context
2549 }
2550}
2551
2552impl SchedulerContext {
2553 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2554 Self {
2555 io,
2556 cache,
2557 recv: None,
2558 name: "".to_string(),
2559 path: Vec::new(),
2560 path_names: Vec::new(),
2561 }
2562 }
2563
2564 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2565 &self.io
2566 }
2567
2568 pub fn cache(&self) -> &Arc<LanceCache> {
2569 &self.cache
2570 }
2571
2572 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2573 self.path.push(index);
2574 self.path_names.push(name.to_string());
2575 ScopedSchedulerContext { context: self }
2576 }
2577
2578 pub fn pop(&mut self) {
2579 self.path.pop();
2580 self.path_names.pop();
2581 }
2582
2583 pub fn path_name(&self) -> String {
2584 let path = self.path_names.join("/");
2585 if self.recv.is_some() {
2586 format!("TEMP({}){}", self.name, path)
2587 } else {
2588 format!("ROOT{}", path)
2589 }
2590 }
2591
2592 pub fn current_path(&self) -> VecDeque<u32> {
2593 VecDeque::from_iter(self.path.iter().copied())
2594 }
2595
2596 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2597 pub fn locate_decoder(
2598 &mut self,
2599 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2600 ) -> crate::previous::decoder::DecoderReady {
2601 trace!(
2602 "Scheduling decoder of type {:?} for {:?}",
2603 decoder.data_type(),
2604 self.path,
2605 );
2606 crate::previous::decoder::DecoderReady {
2607 decoder,
2608 path: self.current_path(),
2609 }
2610 }
2611}
2612
2613pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2614
2615impl std::fmt::Debug for UnloadedPageShard {
2616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2617 f.debug_struct("UnloadedPage").finish()
2618 }
2619}
2620
2621#[derive(Debug)]
2622pub struct ScheduledScanLine {
2623 pub rows_scheduled: u64,
2624 pub decoders: Vec<MessageType>,
2625}
2626
2627pub trait StructuralSchedulingJob: std::fmt::Debug {
2628 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2635}
2636
2637pub struct FilterExpression(pub Bytes);
2645
2646impl FilterExpression {
2647 pub fn no_filter() -> Self {
2652 Self(Bytes::new())
2653 }
2654
2655 pub fn is_noop(&self) -> bool {
2657 self.0.is_empty()
2658 }
2659}
2660
2661pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2662 fn initialize<'a>(
2663 &'a mut self,
2664 filter: &'a FilterExpression,
2665 context: &'a SchedulerContext,
2666 ) -> BoxFuture<'a, Result<()>>;
2667 fn schedule_ranges<'a>(
2668 &'a self,
2669 ranges: &[Range<u64>],
2670 filter: &FilterExpression,
2671 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2672}
2673
2674pub trait DecodeArrayTask: Send {
2676 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)>;
2678}
2679
2680impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2681 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
2682 StructuralDecodeArrayTask::decode(*self)
2683 .map(|decoded_array| (decoded_array.array, decoded_array.data_size))
2684 }
2685}
2686
2687pub struct NextDecodeTask {
2692 pub task: Box<dyn DecodeArrayTask>,
2694 pub num_rows: u64,
2696}
2697
2698impl NextDecodeTask {
2699 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2704 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<(RecordBatch, u64)> {
2705 let (struct_arr, data_size) = self
2706 .task
2707 .decode()
2708 .map_err(|e| Error::internal(format!("Error decoding batch: {}", e)))?;
2709 let batch = RecordBatch::from(struct_arr.as_struct());
2710 if data_size > BATCH_SIZE_BYTES_WARNING {
2711 emitted_batch_size_warning.call_once(|| {
2712 let size_mb = data_size / 1024 / 1024;
2713 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2714 });
2715 }
2716 Ok((batch, data_size))
2717 }
2718}
2719
2720#[derive(Debug)]
2724pub enum MessageType {
2725 DecoderReady(crate::previous::decoder::DecoderReady),
2730 UnloadedPage(UnloadedPageShard),
2734}
2735
2736impl MessageType {
2737 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2738 match self {
2739 Self::DecoderReady(decoder) => decoder,
2740 Self::UnloadedPage(_) => {
2741 panic!("Expected DecoderReady but got UnloadedPage")
2742 }
2743 }
2744 }
2745
2746 pub fn into_structural(self) -> UnloadedPageShard {
2747 match self {
2748 Self::UnloadedPage(unloaded) => unloaded,
2749 Self::DecoderReady(_) => {
2750 panic!("Expected UnloadedPage but got DecoderReady")
2751 }
2752 }
2753 }
2754}
2755
2756pub struct DecoderMessage {
2757 pub scheduled_so_far: u64,
2758 pub decoders: Vec<MessageType>,
2759}
2760
2761pub struct DecoderContext {
2762 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2763}
2764
2765impl DecoderContext {
2766 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2767 Self { source }
2768 }
2769}
2770
2771pub struct DecodedPage {
2772 pub data: DataBlock,
2773 pub repdef: RepDefUnraveler,
2774}
2775
2776pub trait DecodePageTask: Send + std::fmt::Debug {
2777 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2779}
2780
2781pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2782 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2783 fn num_rows(&self) -> u64;
2784}
2785
2786#[derive(Debug)]
2787pub struct LoadedPageShard {
2788 pub decoder: Box<dyn StructuralPageDecoder>,
2790 pub path: VecDeque<u32>,
2809}
2810
2811pub struct DecodedArray {
2812 pub array: ArrayRef,
2813 pub repdef: CompositeRepDefUnraveler,
2814 pub data_size: u64,
2816}
2817
2818pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2819 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2820}
2821
2822pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2823 fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2828 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2830 fn data_type(&self) -> &DataType;
2832}
2833
2834#[derive(Debug, Default)]
2835pub struct DecoderPlugins {}
2836
2837pub async fn decode_batch(
2839 batch: &EncodedBatch,
2840 filter: &FilterExpression,
2841 decoder_plugins: Arc<DecoderPlugins>,
2842 should_validate: bool,
2843 version: LanceFileVersion,
2844 cache: Option<Arc<LanceCache>>,
2845) -> Result<RecordBatch> {
2846 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2851 let cache = if let Some(cache) = cache {
2852 cache
2853 } else {
2854 Arc::new(lance_core::cache::LanceCache::with_capacity(
2855 128 * 1024 * 1024,
2856 ))
2857 };
2858 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2859 batch.schema.as_ref(),
2860 &batch.top_level_columns,
2861 &batch.page_table,
2862 &vec![],
2863 batch.num_rows,
2864 decoder_plugins,
2865 io_scheduler.clone(),
2866 cache,
2867 filter,
2868 &DecoderConfig::default(),
2869 )
2870 .await?;
2871 let (tx, rx) = unbounded_channel();
2872 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2873 let is_structural = version >= LanceFileVersion::V2_1;
2874 let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2875 let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
2876 let mut decode_stream = create_decode_stream(
2877 &batch.schema,
2878 batch.num_rows,
2879 batch.num_rows as u32,
2880 is_structural,
2881 should_validate,
2882 spawn_structural_batch_decode_tasks,
2883 rx,
2884 None,
2885 )?;
2886 decode_stream.next().await.unwrap().task.await
2887}
2888
2889#[cfg(test)]
2890mod tests {
2892 use super::*;
2893
2894 #[test]
2895 fn test_read_zero_dimension_fsl_errors_instead_of_panicking() {
2896 use arrow_schema::Field as ArrowField;
2903
2904 let zero_dim = DataType::FixedSizeList(
2905 Arc::new(ArrowField::new("item", DataType::Float32, true)),
2906 0,
2907 );
2908 let field = Field::try_from(&ArrowField::new("vec", zero_dim, true)).unwrap();
2909 let strategy = CoreFieldDecoderStrategy::default();
2910
2911 let mut structural_columns = ColumnInfoIter::new(vec![], &[]);
2912 let err = strategy
2913 .create_structural_field_scheduler(&field, &mut structural_columns)
2914 .unwrap_err();
2915 assert!(
2916 err.to_string()
2917 .contains("dimension must be a positive integer"),
2918 "unexpected error: {}",
2919 err
2920 );
2921
2922 let mut legacy_columns = ColumnInfoIter::new(vec![], &[]);
2923 let err = strategy
2924 .create_legacy_field_scheduler(
2925 &field,
2926 &mut legacy_columns,
2927 FileBuffers {
2928 positions_and_sizes: &[],
2929 },
2930 )
2931 .unwrap_err();
2932 assert!(
2933 err.to_string()
2934 .contains("dimension must be a positive integer"),
2935 "unexpected error: {}",
2936 err
2937 );
2938 }
2939
2940 #[test]
2941 fn test_coalesce_indices_to_ranges_with_single_index() {
2942 let indices = vec![1];
2943 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2944 assert_eq!(ranges, vec![1..2]);
2945 }
2946
2947 #[test]
2948 fn test_coalesce_indices_to_ranges() {
2949 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2950 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2951 assert_eq!(ranges, vec![1..10]);
2952 }
2953
2954 #[test]
2955 fn test_coalesce_indices_to_ranges_with_gaps() {
2956 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2957 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2958 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2959 }
2960
2961 #[test]
2962 fn test_estimate_bytes_per_row() {
2963 assert_eq!(estimate_bytes_per_row(&DataType::Int32), 4.0);
2964 assert_eq!(estimate_bytes_per_row(&DataType::Int64), 8.0);
2965 assert_eq!(estimate_bytes_per_row(&DataType::Float32), 4.0);
2966 assert_eq!(estimate_bytes_per_row(&DataType::Boolean), 1.0 / 8.0);
2967 assert_eq!(estimate_bytes_per_row(&DataType::Utf8), 64.0);
2968 assert_eq!(estimate_bytes_per_row(&DataType::Binary), 64.0);
2969 let struct_type = DataType::Struct(Fields::from(vec![
2971 ArrowField::new("a", DataType::Int32, false),
2972 ArrowField::new("b", DataType::Int32, false),
2973 ArrowField::new("c", DataType::Int32, false),
2974 ArrowField::new("d", DataType::Int32, false),
2975 ]));
2976 assert_eq!(estimate_bytes_per_row(&struct_type), 16.0);
2977 }
2978
2979 async fn decode_batches_with_byte_limit(
2982 batch: &RecordBatch,
2983 batch_size: u32,
2984 batch_size_bytes: Option<u64>,
2985 ) -> Vec<RecordBatch> {
2986 use crate::encoder::{EncodingOptions, default_encoding_strategy, encode_batch};
2987 use crate::version::LanceFileVersion;
2988
2989 let version = LanceFileVersion::V2_1;
2990 let options = EncodingOptions {
2991 version,
2992 ..Default::default()
2993 };
2994 let strategy = default_encoding_strategy(version);
2995 let schema = Schema::try_from(batch.schema().as_ref()).unwrap();
2996 let encoded = encode_batch(batch, Arc::new(schema.clone()), strategy.as_ref(), &options)
2997 .await
2998 .unwrap();
2999
3000 let io_scheduler =
3001 Arc::new(BufferScheduler::new(encoded.data.clone())) as Arc<dyn EncodingsIo>;
3002 let cache = Arc::new(lance_core::cache::LanceCache::with_capacity(
3003 128 * 1024 * 1024,
3004 ));
3005 let decoder_plugins = Arc::new(DecoderPlugins::default());
3006
3007 let mut decode_scheduler = DecodeBatchScheduler::try_new(
3008 encoded.schema.as_ref(),
3009 &encoded.top_level_columns,
3010 &encoded.page_table,
3011 &vec![],
3012 encoded.num_rows,
3013 decoder_plugins,
3014 io_scheduler.clone(),
3015 cache,
3016 &FilterExpression::no_filter(),
3017 &DecoderConfig::default(),
3018 )
3019 .await
3020 .unwrap();
3021
3022 let (tx, rx) = unbounded_channel();
3023 decode_scheduler.schedule_range(
3024 0..encoded.num_rows,
3025 &FilterExpression::no_filter(),
3026 tx,
3027 io_scheduler,
3028 );
3029
3030 let mut decode_stream = create_decode_stream(
3031 &encoded.schema,
3032 encoded.num_rows,
3033 batch_size,
3034 true,
3035 true,
3036 true,
3037 rx,
3038 batch_size_bytes,
3039 )
3040 .unwrap();
3041
3042 let mut batches = Vec::new();
3043 while let Some(task) = decode_stream.next().await {
3044 batches.push(task.task.await.unwrap());
3045 }
3046 batches
3047 }
3048
3049 #[tokio::test]
3050 async fn test_byte_sized_batches_fixed_width() {
3051 use arrow_array::Int32Array;
3052
3053 let num_rows: i32 = 1000;
3055 let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..4)
3056 .map(|col| {
3057 Arc::new(Int32Array::from_iter_values(
3058 (0..num_rows).map(move |row| row * 10 + col),
3059 )) as _
3060 })
3061 .collect();
3062
3063 let schema = Arc::new(ArrowSchema::new(vec![
3064 ArrowField::new("a", DataType::Int32, false),
3065 ArrowField::new("b", DataType::Int32, false),
3066 ArrowField::new("c", DataType::Int32, false),
3067 ArrowField::new("d", DataType::Int32, false),
3068 ]));
3069 let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3070
3071 let batches =
3073 decode_batches_with_byte_limit(&input_batch, 1024, Some(1600)).await;
3074
3075 assert_eq!(batches.len(), 10);
3077 for (i, batch) in batches.iter().enumerate() {
3078 assert_eq!(
3079 batch.num_rows(),
3080 100,
3081 "batch {i} should have 100 rows, got {}",
3082 batch.num_rows()
3083 );
3084 }
3085
3086 let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3088 let concatenated =
3089 arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3090 .unwrap();
3091 assert_eq!(concatenated.num_rows(), num_rows as usize);
3092 for col in 0..4 {
3093 assert_eq!(
3094 concatenated.column(col).as_ref(),
3095 input_batch.column(col).as_ref(),
3096 "column {col} roundtrip mismatch"
3097 );
3098 }
3099 }
3100
3101 #[tokio::test]
3102 async fn test_byte_sized_batches_none_unchanged() {
3103 use arrow_array::Int32Array;
3104
3105 let num_rows: i32 = 1000;
3107 let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..2)
3108 .map(|col| {
3109 Arc::new(Int32Array::from_iter_values(
3110 (0..num_rows).map(move |row| row * 10 + col),
3111 )) as _
3112 })
3113 .collect();
3114
3115 let schema = Arc::new(ArrowSchema::new(vec![
3116 ArrowField::new("x", DataType::Int32, false),
3117 ArrowField::new("y", DataType::Int32, false),
3118 ]));
3119 let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3120
3121 let batches = decode_batches_with_byte_limit(&input_batch, 250, None).await;
3123 assert_eq!(batches.len(), 4);
3124 for (i, batch) in batches.iter().enumerate() {
3125 assert_eq!(
3126 batch.num_rows(),
3127 250,
3128 "batch {i} should have 250 rows, got {}",
3129 batch.num_rows()
3130 );
3131 }
3132 }
3133
3134 #[tokio::test]
3135 async fn test_byte_sized_batches_feedback_convergence() {
3136 use arrow_array::StringArray;
3137
3138 let num_rows = 500;
3142 let value: String = "x".repeat(100);
3143 let arrays: Vec<Arc<dyn arrow_array::Array>> = vec![Arc::new(StringArray::from(
3144 (0..num_rows).map(|_| value.as_str()).collect::<Vec<_>>(),
3145 ))];
3146 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
3147 "s",
3148 DataType::Utf8,
3149 false,
3150 )]));
3151 let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3152
3153 let target_bytes: u64 = 5000;
3157 let batches = decode_batches_with_byte_limit(
3158 &input_batch,
3159 1024,
3160 Some(target_bytes),
3161 )
3162 .await;
3163
3164 let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3166 let concatenated =
3167 arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3168 .unwrap();
3169 assert_eq!(concatenated.num_rows(), num_rows as usize);
3170 assert_eq!(
3171 concatenated.column(0).as_ref(),
3172 input_batch.column(0).as_ref()
3173 );
3174
3175 assert!(
3178 batches.len() >= 2,
3179 "need at least 2 batches to test convergence"
3180 );
3181 if batches.len() >= 3 {
3184 let second_batch_rows = batches[1].num_rows();
3185 let third_batch_rows = batches[2].num_rows();
3186 assert!(
3188 (40..=60).contains(&second_batch_rows),
3189 "second batch should be near 50 rows, got {second_batch_rows}"
3190 );
3191 assert!(
3192 (40..=60).contains(&third_batch_rows),
3193 "third batch should be near 50 rows, got {third_batch_rows}"
3194 );
3195 }
3196 }
3197}