1use std::collections::VecDeque;
216use std::sync::atomic::{AtomicU64, Ordering};
217use std::sync::{LazyLock, Once, OnceLock};
218use std::{ops::Range, sync::Arc};
219
220use arrow_array::cast::AsArray;
221use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
222use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
223use bytes::Bytes;
224use futures::future::{BoxFuture, MaybeDone, maybe_done};
225use futures::stream::{self, BoxStream};
226use futures::{FutureExt, StreamExt};
227use lance_arrow::DataTypeExt;
228use lance_core::cache::LanceCache;
229use lance_core::datatypes::{BLOB_DESC_LANCE_FIELD, Field, Schema};
230use lance_core::utils::futures::{FinallyStreamExt, StreamOnDropExt};
231use lance_core::utils::parse::parse_env_as_bool;
232use log::{debug, trace, warn};
233use tokio::sync::mpsc::error::SendError;
234use tokio::sync::mpsc::{self, unbounded_channel};
235
236use lance_core::error::LanceOptionExt;
237use lance_core::{ArrowResult, Error, Result};
238use tracing::instrument;
239
240use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
241use crate::data::DataBlock;
242use crate::encoder::EncodedBatch;
243use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
244use crate::encodings::logical::list::StructuralListScheduler;
245use crate::encodings::logical::map::StructuralMapScheduler;
246use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
247use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
248use crate::format::pb::{self, column_encoding};
249use crate::format::pb21;
250use crate::previous::decoder::LogicalPageDecoder;
251use crate::previous::encodings::logical::list::OffsetPageInfo;
252use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
253use crate::previous::encodings::logical::{
254 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
255 primitive::PrimitiveFieldScheduler,
256};
257use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
258use crate::version::LanceFileVersion;
259use crate::{BufferScheduler, EncodingsIo};
260
261const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
263const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
264 "LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
265const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
266const ENV_LANCE_INLINE_SCHEDULING_THRESHOLD: &str = "LANCE_INLINE_SCHEDULING_THRESHOLD";
267
268const DEFAULT_INLINE_SCHEDULING_THRESHOLD: u64 = 16 * 1024;
271
272fn default_cache_repetition_index() -> bool {
273 static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
274 *DEFAULT_CACHE_REPETITION_INDEX
275 .get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
276}
277
278fn inline_scheduling_threshold() -> u64 {
279 static THRESHOLD: OnceLock<u64> = OnceLock::new();
280 *THRESHOLD.get_or_init(|| {
281 std::env::var(ENV_LANCE_INLINE_SCHEDULING_THRESHOLD)
282 .ok()
283 .and_then(|v| v.trim().parse::<u64>().ok())
284 .unwrap_or(DEFAULT_INLINE_SCHEDULING_THRESHOLD)
285 })
286}
287
288#[derive(Debug)]
295pub enum PageEncoding {
296 Legacy(pb::ArrayEncoding),
297 Structural(pb21::PageLayout),
298}
299
300impl PageEncoding {
301 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
302 match self {
303 Self::Legacy(enc) => enc,
304 Self::Structural(_) => panic!("Expected a legacy encoding"),
305 }
306 }
307
308 pub fn as_structural(&self) -> &pb21::PageLayout {
309 match self {
310 Self::Structural(enc) => enc,
311 Self::Legacy(_) => panic!("Expected a structural encoding"),
312 }
313 }
314
315 pub fn is_structural(&self) -> bool {
316 matches!(self, Self::Structural(_))
317 }
318}
319
320#[derive(Debug)]
324pub struct PageInfo {
325 pub num_rows: u64,
327 pub priority: u64,
331 pub encoding: PageEncoding,
333 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
335}
336
337#[derive(Debug, Clone)]
341pub struct ColumnInfo {
342 pub index: u32,
344 pub page_infos: Arc<[PageInfo]>,
346 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
348 pub encoding: pb::ColumnEncoding,
349}
350
351impl ColumnInfo {
352 pub fn new(
354 index: u32,
355 page_infos: Arc<[PageInfo]>,
356 buffer_offsets_and_sizes: Vec<(u64, u64)>,
357 encoding: pb::ColumnEncoding,
358 ) -> Self {
359 Self {
360 index,
361 page_infos,
362 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
363 encoding,
364 }
365 }
366
367 pub fn is_structural(&self) -> bool {
368 self.page_infos
369 .first()
371 .map(|page| page.encoding.is_structural())
372 .unwrap_or(false)
373 }
374}
375
376enum RootScheduler {
377 Structural(Box<dyn StructuralFieldScheduler>),
378 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
379}
380
381impl RootScheduler {
382 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
383 match self {
384 Self::Structural(_) => panic!("Expected a legacy scheduler"),
385 Self::Legacy(s) => s,
386 }
387 }
388
389 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
390 match self {
391 Self::Structural(s) => s.as_ref(),
392 Self::Legacy(_) => panic!("Expected a structural scheduler"),
393 }
394 }
395}
396
397pub struct DecodeBatchScheduler {
419 root_scheduler: RootScheduler,
420 pub root_fields: Fields,
421 cache: Arc<LanceCache>,
422}
423
424pub struct ColumnInfoIter<'a> {
425 column_infos: Vec<Arc<ColumnInfo>>,
426 column_indices: &'a [u32],
427 column_info_pos: usize,
428 column_indices_pos: usize,
429}
430
431impl<'a> ColumnInfoIter<'a> {
432 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
433 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
434 Self {
435 column_infos,
436 column_indices,
437 column_info_pos: initial_pos,
438 column_indices_pos: 0,
439 }
440 }
441
442 pub fn peek(&self) -> &Arc<ColumnInfo> {
443 &self.column_infos[self.column_info_pos]
444 }
445
446 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
447 let column_info = self.column_infos[self.column_info_pos].clone();
448 let transformed = transform(column_info);
449 self.column_infos[self.column_info_pos] = transformed;
450 }
451
452 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
453 self.next().ok_or_else(|| {
454 Error::invalid_input(
455 "there were more fields in the schema than provided column indices / infos",
456 )
457 })
458 }
459
460 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
461 if self.column_info_pos < self.column_infos.len() {
462 let info = &self.column_infos[self.column_info_pos];
463 self.column_info_pos += 1;
464 Some(info)
465 } else {
466 None
467 }
468 }
469
470 pub(crate) fn next_top_level(&mut self) {
471 self.column_indices_pos += 1;
472 if self.column_indices_pos < self.column_indices.len() {
473 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
474 } else {
475 self.column_info_pos = self.column_infos.len();
476 }
477 }
478}
479
480#[derive(Clone, Copy, Debug)]
482pub struct FileBuffers<'a> {
483 pub positions_and_sizes: &'a [(u64, u64)],
484}
485
486#[derive(Clone, Copy, Debug)]
488pub struct ColumnBuffers<'a, 'b> {
489 pub file_buffers: FileBuffers<'a>,
490 pub positions_and_sizes: &'b [(u64, u64)],
491}
492
493#[derive(Clone, Copy, Debug)]
495pub struct PageBuffers<'a, 'b, 'c> {
496 pub column_buffers: ColumnBuffers<'a, 'b>,
497 pub positions_and_sizes: &'c [(u64, u64)],
498}
499
500#[derive(Debug)]
502pub struct CoreFieldDecoderStrategy {
503 pub validate_data: bool,
504 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
505 pub cache_repetition_index: bool,
506}
507
508impl Default for CoreFieldDecoderStrategy {
509 fn default() -> Self {
510 Self {
511 validate_data: false,
512 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
513 cache_repetition_index: false,
514 }
515 }
516}
517
518impl CoreFieldDecoderStrategy {
519 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
521 self.cache_repetition_index = cache_repetition_index;
522 self
523 }
524
525 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
527 Self {
528 validate_data: config.validate_on_decode,
529 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
530 cache_repetition_index: config.cache_repetition_index,
531 }
532 }
533
534 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
537 let column_encoding = column_info
538 .encoding
539 .column_encoding
540 .as_ref()
541 .ok_or_else(|| {
542 Error::invalid_input(format!(
543 "the column at index {} was missing a ColumnEncoding",
544 column_info.index
545 ))
546 })?;
547 if matches!(
548 column_encoding,
549 pb::column_encoding::ColumnEncoding::Values(_)
550 ) {
551 Ok(())
552 } else {
553 Err(Error::invalid_input(format!(
554 "the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
555 column_info.index, field_name, column_encoding
556 )))
557 }
558 }
559
560 fn is_structural_primitive(data_type: &DataType) -> bool {
561 if data_type.is_primitive() {
562 true
563 } else {
564 match data_type {
565 DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
567 DataType::Boolean
568 | DataType::Null
569 | DataType::FixedSizeBinary(_)
570 | DataType::Binary
571 | DataType::LargeBinary
572 | DataType::Utf8
573 | DataType::LargeUtf8 => true,
574 DataType::FixedSizeList(inner, _) => {
575 Self::is_structural_primitive(inner.data_type())
576 }
577 _ => false,
578 }
579 }
580 }
581
582 fn is_primitive_legacy(data_type: &DataType) -> bool {
583 if data_type.is_primitive() {
584 true
585 } else {
586 match data_type {
587 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
589 DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
590 _ => false,
591 }
592 }
593 }
594
595 fn create_primitive_scheduler(
596 &self,
597 field: &Field,
598 column: &ColumnInfo,
599 buffers: FileBuffers,
600 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
601 Self::ensure_values_encoded(column, &field.name)?;
602 let column_buffers = ColumnBuffers {
604 file_buffers: buffers,
605 positions_and_sizes: &column.buffer_offsets_and_sizes,
606 };
607 Ok(Box::new(PrimitiveFieldScheduler::new(
608 column.index,
609 field.data_type(),
610 column.page_infos.clone(),
611 column_buffers,
612 self.validate_data,
613 )))
614 }
615
616 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
618 Self::ensure_values_encoded(column_info, field_name)?;
619 if column_info.page_infos.len() != 1 {
620 return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
621 }
622 let encoding = &column_info.page_infos[0].encoding;
623 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
624 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
625 _ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
626 }
627 }
628
629 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
630 let encoding = &column_info.page_infos[0].encoding;
631 matches!(
632 encoding.as_legacy().array_encoding.as_ref().unwrap(),
633 pb::array_encoding::ArrayEncoding::PackedStruct(_)
634 )
635 }
636
637 fn create_list_scheduler(
638 &self,
639 list_field: &Field,
640 column_infos: &mut ColumnInfoIter,
641 buffers: FileBuffers,
642 offsets_column: &ColumnInfo,
643 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
644 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
645 let offsets_column_buffers = ColumnBuffers {
646 file_buffers: buffers,
647 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
648 };
649 let items_scheduler =
650 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
651
652 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
653 .page_infos
654 .iter()
655 .filter(|offsets_page| offsets_page.num_rows > 0)
656 .map(|offsets_page| {
657 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
658 &offsets_page.encoding.as_legacy().array_encoding
659 {
660 let inner = PageInfo {
661 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
662 encoding: PageEncoding::Legacy(
663 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
664 ),
665 num_rows: offsets_page.num_rows,
666 priority: 0,
667 };
668 (
669 inner,
670 OffsetPageInfo {
671 offsets_in_page: offsets_page.num_rows,
672 null_offset_adjustment: list_encoding.null_offset_adjustment,
673 num_items_referenced_by_page: list_encoding.num_items,
674 },
675 )
676 } else {
677 panic!("Expected a list column");
679 }
680 })
681 .unzip();
682 let inner = Arc::new(PrimitiveFieldScheduler::new(
683 offsets_column.index,
684 DataType::UInt64,
685 Arc::from(inner_infos.into_boxed_slice()),
686 offsets_column_buffers,
687 self.validate_data,
688 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
689 let items_field = match list_field.data_type() {
690 DataType::List(inner) => inner,
691 DataType::LargeList(inner) => inner,
692 _ => unreachable!(),
693 };
694 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
695 DataType::Int32
696 } else {
697 DataType::Int64
698 };
699 Ok(Box::new(ListFieldScheduler::new(
700 inner,
701 items_scheduler.into(),
702 items_field,
703 offset_type,
704 null_offset_adjustments,
705 )))
706 }
707
708 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
709 if let column_encoding::ColumnEncoding::Blob(blob) =
710 column_info.encoding.column_encoding.as_ref().unwrap()
711 {
712 let mut column_info = column_info.clone();
713 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
714 Some(column_info)
715 } else {
716 None
717 }
718 }
719
720 fn create_structural_field_scheduler(
721 &self,
722 field: &Field,
723 column_infos: &mut ColumnInfoIter,
724 ) -> Result<Box<dyn StructuralFieldScheduler>> {
725 let data_type = field.data_type();
726 if Self::is_structural_primitive(&data_type) {
727 let column_info = column_infos.expect_next()?;
728 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
729 column_info.as_ref(),
730 self.decompressor_strategy.as_ref(),
731 self.cache_repetition_index,
732 field,
733 )?);
734
735 column_infos.next_top_level();
737
738 return Ok(scheduler);
739 }
740 match &data_type {
741 DataType::Struct(fields) => {
742 if field.is_packed_struct() {
743 let column_info = column_infos.expect_next()?;
745 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
746 column_info.as_ref(),
747 self.decompressor_strategy.as_ref(),
748 self.cache_repetition_index,
749 field,
750 )?);
751
752 column_infos.next_top_level();
754
755 return Ok(scheduler);
756 }
757 if field.is_blob() {
759 let column_info = column_infos.peek();
760 if column_info.page_infos.iter().any(|page| {
761 matches!(
762 page.encoding,
763 PageEncoding::Structural(pb21::PageLayout {
764 layout: Some(pb21::page_layout::Layout::BlobLayout(_))
765 })
766 )
767 }) {
768 let column_info = column_infos.expect_next()?;
769 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
770 column_info.as_ref(),
771 self.decompressor_strategy.as_ref(),
772 self.cache_repetition_index,
773 field,
774 )?);
775 column_infos.next_top_level();
776 return Ok(scheduler);
777 }
778 }
779
780 let mut child_schedulers = Vec::with_capacity(field.children.len());
781 for field in field.children.iter() {
782 let field_scheduler =
783 self.create_structural_field_scheduler(field, column_infos)?;
784 child_schedulers.push(field_scheduler);
785 }
786
787 let fields = fields.clone();
788 Ok(
789 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
790 as Box<dyn StructuralFieldScheduler>,
791 )
792 }
793 DataType::List(_) | DataType::LargeList(_) => {
794 let child = field.children.first().expect_ok()?;
795 let child_scheduler =
796 self.create_structural_field_scheduler(child, column_infos)?;
797 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
798 as Box<dyn StructuralFieldScheduler>)
799 }
800 DataType::FixedSizeList(inner, dimension)
801 if matches!(inner.data_type(), DataType::Struct(_)) =>
802 {
803 let child = field.children.first().expect_ok()?;
804 let child_scheduler =
805 self.create_structural_field_scheduler(child, column_infos)?;
806 Ok(Box::new(StructuralFixedSizeListScheduler::new(
807 child_scheduler,
808 *dimension,
809 )) as Box<dyn StructuralFieldScheduler>)
810 }
811 DataType::Map(_, keys_sorted) => {
812 if *keys_sorted {
816 return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
817 }
818 let entries_child = field.children.first().expect_ok()?;
819 let child_scheduler =
820 self.create_structural_field_scheduler(entries_child, column_infos)?;
821 Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
822 as Box<dyn StructuralFieldScheduler>)
823 }
824 _ => todo!("create_structural_field_scheduler for {}", data_type),
825 }
826 }
827
828 fn create_legacy_field_scheduler(
829 &self,
830 field: &Field,
831 column_infos: &mut ColumnInfoIter,
832 buffers: FileBuffers,
833 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
834 let data_type = field.data_type();
835 if Self::is_primitive_legacy(&data_type) {
836 let column_info = column_infos.expect_next()?;
837 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
838 return Ok(scheduler);
839 } else if data_type.is_binary_like() {
840 let column_info = column_infos.expect_next()?.clone();
841 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
843 let desc_scheduler =
844 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
845 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
846 return Ok(blob_scheduler);
847 }
848 if let Some(page_info) = column_info.page_infos.first() {
849 if matches!(
850 page_info.encoding.as_legacy(),
851 pb::ArrayEncoding {
852 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
853 }
854 ) {
855 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
856 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
857 } else {
858 DataType::LargeList(Arc::new(ArrowField::new(
859 "item",
860 DataType::UInt8,
861 false,
862 )))
863 };
864 let list_field = Field::try_from(ArrowField::new(
865 field.name.clone(),
866 list_type,
867 field.nullable,
868 ))
869 .unwrap();
870 let list_scheduler = self.create_list_scheduler(
871 &list_field,
872 column_infos,
873 buffers,
874 &column_info,
875 )?;
876 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
877 list_scheduler.into(),
878 field.data_type(),
879 ));
880 return Ok(binary_scheduler);
881 } else {
882 let scheduler =
883 self.create_primitive_scheduler(field, &column_info, buffers)?;
884 return Ok(scheduler);
885 }
886 } else {
887 return self.create_primitive_scheduler(field, &column_info, buffers);
888 }
889 }
890 match &data_type {
891 DataType::FixedSizeList(inner, _dimension) => {
892 if Self::is_primitive_legacy(inner.data_type()) {
895 let primitive_col = column_infos.expect_next()?;
896 let scheduler =
897 self.create_primitive_scheduler(field, primitive_col, buffers)?;
898 Ok(scheduler)
899 } else {
900 todo!()
901 }
902 }
903 DataType::Dictionary(_key_type, value_type) => {
904 if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
905 let primitive_col = column_infos.expect_next()?;
906 let scheduler =
907 self.create_primitive_scheduler(field, primitive_col, buffers)?;
908 Ok(scheduler)
909 } else {
910 Err(Error::not_supported_source(
911 format!(
912 "No way to decode into a dictionary field of type {}",
913 value_type
914 )
915 .into(),
916 ))
917 }
918 }
919 DataType::List(_) | DataType::LargeList(_) => {
920 let offsets_column = column_infos.expect_next()?.clone();
921 column_infos.next_top_level();
922 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
923 }
924 DataType::Struct(fields) => {
925 let column_info = column_infos.expect_next()?;
926
927 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
929 return self.create_primitive_scheduler(field, &blob_col, buffers);
931 }
932
933 if Self::check_packed_struct(column_info) {
934 self.create_primitive_scheduler(field, column_info, buffers)
936 } else {
937 Self::check_simple_struct(column_info, &field.name).unwrap();
939 let num_rows = column_info
940 .page_infos
941 .iter()
942 .map(|page| page.num_rows)
943 .sum();
944 let mut child_schedulers = Vec::with_capacity(field.children.len());
945 for field in &field.children {
946 column_infos.next_top_level();
947 let field_scheduler =
948 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
949 child_schedulers.push(Arc::from(field_scheduler));
950 }
951
952 let fields = fields.clone();
953 Ok(Box::new(SimpleStructScheduler::new(
954 child_schedulers,
955 fields,
956 num_rows,
957 )))
958 }
959 }
960 _ => todo!(),
962 }
963 }
964}
965
966fn root_column(num_rows: u64) -> ColumnInfo {
968 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
969 let final_page_num_rows = num_rows % (u32::MAX as u64);
970 let root_pages = (0..num_root_pages)
971 .map(|i| PageInfo {
972 num_rows: if i == num_root_pages - 1 {
973 final_page_num_rows
974 } else {
975 u64::MAX
976 },
977 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
978 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
979 pb::SimpleStruct {},
980 )),
981 }),
982 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
984 })
985 .collect::<Vec<_>>();
986 ColumnInfo {
987 buffer_offsets_and_sizes: Arc::new([]),
988 encoding: pb::ColumnEncoding {
989 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
990 },
991 index: u32::MAX,
992 page_infos: Arc::from(root_pages),
993 }
994}
995
996pub enum RootDecoder {
997 Structural(StructuralStructDecoder),
998 Legacy(SimpleStructDecoder),
999}
1000
1001impl RootDecoder {
1002 pub fn into_structural(self) -> StructuralStructDecoder {
1003 match self {
1004 Self::Structural(decoder) => decoder,
1005 Self::Legacy(_) => panic!("Expected a structural decoder"),
1006 }
1007 }
1008
1009 pub fn into_legacy(self) -> SimpleStructDecoder {
1010 match self {
1011 Self::Legacy(decoder) => decoder,
1012 Self::Structural(_) => panic!("Expected a legacy decoder"),
1013 }
1014 }
1015}
1016
1017impl DecodeBatchScheduler {
1018 #[allow(clippy::too_many_arguments)]
1021 pub async fn try_new<'a>(
1022 schema: &'a Schema,
1023 column_indices: &[u32],
1024 column_infos: &[Arc<ColumnInfo>],
1025 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1026 num_rows: u64,
1027 _decoder_plugins: Arc<DecoderPlugins>,
1028 io: Arc<dyn EncodingsIo>,
1029 cache: Arc<LanceCache>,
1030 filter: &FilterExpression,
1031 decoder_config: &DecoderConfig,
1032 ) -> Result<Self> {
1033 assert!(num_rows > 0);
1034 let buffers = FileBuffers {
1035 positions_and_sizes: file_buffer_positions_and_sizes,
1036 };
1037 let arrow_schema = ArrowSchema::from(schema);
1038 let root_fields = arrow_schema.fields().clone();
1039 let root_type = DataType::Struct(root_fields.clone());
1040 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1041 root_field.children.clone_from(&schema.fields);
1045 root_field
1046 .metadata
1047 .insert("__lance_decoder_root".to_string(), "true".to_string());
1048
1049 if column_infos.is_empty() || column_infos[0].is_structural() {
1050 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1051
1052 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1053 let mut root_scheduler =
1054 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
1055
1056 let context = SchedulerContext::new(io, cache.clone());
1057 root_scheduler.initialize(filter, &context).await?;
1058
1059 Ok(Self {
1060 root_scheduler: RootScheduler::Structural(root_scheduler),
1061 root_fields,
1062 cache,
1063 })
1064 } else {
1065 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1068 columns.push(Arc::new(root_column(num_rows)));
1069 columns.extend(column_infos.iter().cloned());
1070
1071 let adjusted_column_indices = [0_u32]
1072 .into_iter()
1073 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1074 .collect::<Vec<_>>();
1075 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1076 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
1077 let root_scheduler =
1078 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1079
1080 let context = SchedulerContext::new(io, cache.clone());
1081 root_scheduler.initialize(filter, &context).await?;
1082
1083 Ok(Self {
1084 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1085 root_fields,
1086 cache,
1087 })
1088 }
1089 }
1090
1091 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1092 pub fn from_scheduler(
1093 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1094 root_fields: Fields,
1095 cache: Arc<LanceCache>,
1096 ) -> Self {
1097 Self {
1098 root_scheduler: RootScheduler::Legacy(root_scheduler),
1099 root_fields,
1100 cache,
1101 }
1102 }
1103
1104 fn do_schedule_ranges_structural(
1105 &mut self,
1106 ranges: &[Range<u64>],
1107 filter: &FilterExpression,
1108 io: Arc<dyn EncodingsIo>,
1109 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1110 ) {
1111 let root_scheduler = self.root_scheduler.as_structural();
1112 let mut context = SchedulerContext::new(io, self.cache.clone());
1113 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1114 if let Err(schedule_ranges_err) = maybe_root_job {
1115 schedule_action(Err(schedule_ranges_err));
1116 return;
1117 }
1118 let mut root_job = maybe_root_job.unwrap();
1119 let mut num_rows_scheduled = 0;
1120 loop {
1121 let maybe_next_scan_lines = root_job.schedule_next(&mut context);
1122 if let Err(err) = maybe_next_scan_lines {
1123 schedule_action(Err(err));
1124 return;
1125 }
1126 let next_scan_lines = maybe_next_scan_lines.unwrap();
1127 if next_scan_lines.is_empty() {
1128 return;
1129 }
1130 for next_scan_line in next_scan_lines {
1131 trace!(
1132 "Scheduled scan line of {} rows and {} decoders",
1133 next_scan_line.rows_scheduled,
1134 next_scan_line.decoders.len()
1135 );
1136 num_rows_scheduled += next_scan_line.rows_scheduled;
1137 if !schedule_action(Ok(DecoderMessage {
1138 scheduled_so_far: num_rows_scheduled,
1139 decoders: next_scan_line.decoders,
1140 })) {
1141 return;
1143 }
1144 }
1145 }
1146 }
1147
1148 fn do_schedule_ranges_legacy(
1149 &mut self,
1150 ranges: &[Range<u64>],
1151 filter: &FilterExpression,
1152 io: Arc<dyn EncodingsIo>,
1153 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1154 priority: Option<Box<dyn PriorityRange>>,
1158 ) {
1159 let root_scheduler = self.root_scheduler.as_legacy();
1160 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1161 trace!(
1162 "Scheduling {} ranges across {}..{} ({} rows){}",
1163 ranges.len(),
1164 ranges.first().unwrap().start,
1165 ranges.last().unwrap().end,
1166 rows_requested,
1167 priority
1168 .as_ref()
1169 .map(|p| format!(" (priority={:?})", p))
1170 .unwrap_or_default()
1171 );
1172
1173 let mut context = SchedulerContext::new(io, self.cache.clone());
1174 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1175 if let Err(schedule_ranges_err) = maybe_root_job {
1176 schedule_action(Err(schedule_ranges_err));
1177 return;
1178 }
1179 let mut root_job = maybe_root_job.unwrap();
1180 let mut num_rows_scheduled = 0;
1181 let mut rows_to_schedule = root_job.num_rows();
1182 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1183 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1184 while rows_to_schedule > 0 {
1185 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1186 if let Err(schedule_next_err) = maybe_next_scan_line {
1187 schedule_action(Err(schedule_next_err));
1188 return;
1189 }
1190 let next_scan_line = maybe_next_scan_line.unwrap();
1191 priority.advance(next_scan_line.rows_scheduled);
1192 num_rows_scheduled += next_scan_line.rows_scheduled;
1193 rows_to_schedule -= next_scan_line.rows_scheduled;
1194 trace!(
1195 "Scheduled scan line of {} rows and {} decoders",
1196 next_scan_line.rows_scheduled,
1197 next_scan_line.decoders.len()
1198 );
1199 if !schedule_action(Ok(DecoderMessage {
1200 scheduled_so_far: num_rows_scheduled,
1201 decoders: next_scan_line.decoders,
1202 })) {
1203 return;
1205 }
1206
1207 trace!("Finished scheduling {} ranges", ranges.len());
1208 }
1209 }
1210
1211 fn do_schedule_ranges(
1212 &mut self,
1213 ranges: &[Range<u64>],
1214 filter: &FilterExpression,
1215 io: Arc<dyn EncodingsIo>,
1216 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1217 priority: Option<Box<dyn PriorityRange>>,
1221 ) {
1222 match &self.root_scheduler {
1223 RootScheduler::Legacy(_) => {
1224 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1225 }
1226 RootScheduler::Structural(_) => {
1227 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1228 }
1229 }
1230 }
1231
1232 pub fn schedule_ranges_to_vec(
1235 &mut self,
1236 ranges: &[Range<u64>],
1237 filter: &FilterExpression,
1238 io: Arc<dyn EncodingsIo>,
1239 priority: Option<Box<dyn PriorityRange>>,
1240 ) -> Result<Vec<DecoderMessage>> {
1241 let mut decode_messages = Vec::new();
1242 self.do_schedule_ranges(
1243 ranges,
1244 filter,
1245 io,
1246 |msg| {
1247 decode_messages.push(msg);
1248 true
1249 },
1250 priority,
1251 );
1252 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1253 }
1254
1255 #[instrument(skip_all)]
1265 pub fn schedule_ranges(
1266 &mut self,
1267 ranges: &[Range<u64>],
1268 filter: &FilterExpression,
1269 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1270 scheduler: Arc<dyn EncodingsIo>,
1271 ) {
1272 self.do_schedule_ranges(
1273 ranges,
1274 filter,
1275 scheduler,
1276 |msg| {
1277 match sink.send(msg) {
1278 Ok(_) => true,
1279 Err(SendError { .. }) => {
1280 debug!(
1283 "schedule_ranges aborting early since decoder appears to have been dropped"
1284 );
1285 false
1286 }
1287 }
1288 },
1289 None,
1290 )
1291 }
1292
1293 #[instrument(skip_all)]
1301 pub fn schedule_range(
1302 &mut self,
1303 range: Range<u64>,
1304 filter: &FilterExpression,
1305 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1306 scheduler: Arc<dyn EncodingsIo>,
1307 ) {
1308 self.schedule_ranges(&[range], filter, sink, scheduler)
1309 }
1310
1311 pub fn schedule_take(
1319 &mut self,
1320 indices: &[u64],
1321 filter: &FilterExpression,
1322 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1323 scheduler: Arc<dyn EncodingsIo>,
1324 ) {
1325 debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
1326 if indices.is_empty() {
1327 return;
1328 }
1329 trace!("Scheduling take of {} rows", indices.len());
1330 let ranges = Self::indices_to_ranges(indices);
1331 self.schedule_ranges(&ranges, filter, sink, scheduler)
1332 }
1333
1334 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1336 let mut ranges = Vec::new();
1337 let mut start = indices[0];
1338
1339 for window in indices.windows(2) {
1340 if window[1] != window[0] + 1 {
1341 ranges.push(start..window[0] + 1);
1342 start = window[1];
1343 }
1344 }
1345
1346 ranges.push(start..*indices.last().unwrap() + 1);
1347 ranges
1348 }
1349}
1350
1351pub struct ReadBatchTask {
1352 pub task: BoxFuture<'static, Result<RecordBatch>>,
1353 pub num_rows: u32,
1354}
1355
1356pub struct BatchDecodeStream {
1358 context: DecoderContext,
1359 root_decoder: SimpleStructDecoder,
1360 rows_remaining: u64,
1361 rows_per_batch: u32,
1362 rows_scheduled: u64,
1363 rows_drained: u64,
1364 scheduler_exhausted: bool,
1365 emitted_batch_size_warning: Arc<Once>,
1366}
1367
1368impl BatchDecodeStream {
1369 pub fn new(
1379 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1380 rows_per_batch: u32,
1381 num_rows: u64,
1382 root_decoder: SimpleStructDecoder,
1383 ) -> Self {
1384 Self {
1385 context: DecoderContext::new(scheduled),
1386 root_decoder,
1387 rows_remaining: num_rows,
1388 rows_per_batch,
1389 rows_scheduled: 0,
1390 rows_drained: 0,
1391 scheduler_exhausted: false,
1392 emitted_batch_size_warning: Arc::new(Once::new()),
1393 }
1394 }
1395
1396 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1397 if decoder.path.is_empty() {
1398 Ok(())
1400 } else {
1401 self.root_decoder.accept_child(decoder)
1402 }
1403 }
1404
1405 #[instrument(level = "debug", skip_all)]
1406 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1407 if self.scheduler_exhausted {
1408 return Ok(self.rows_scheduled);
1409 }
1410 while self.rows_scheduled < scheduled_need {
1411 let next_message = self.context.source.recv().await;
1412 match next_message {
1413 Some(scan_line) => {
1414 let scan_line = scan_line?;
1415 self.rows_scheduled = scan_line.scheduled_so_far;
1416 for message in scan_line.decoders {
1417 self.accept_decoder(message.into_legacy())?;
1418 }
1419 }
1420 None => {
1421 self.scheduler_exhausted = true;
1425 return Ok(self.rows_scheduled);
1426 }
1427 }
1428 }
1429 Ok(scheduled_need)
1430 }
1431
1432 #[instrument(level = "debug", skip_all)]
1433 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1434 trace!(
1435 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1436 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1437 );
1438 if self.rows_remaining == 0 {
1439 return Ok(None);
1440 }
1441
1442 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1443 self.rows_remaining -= to_take;
1444
1445 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1446 trace!(
1447 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1448 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1449 );
1450 if scheduled_need > 0 {
1451 let desired_scheduled = scheduled_need + self.rows_scheduled;
1452 trace!(
1453 "Draining from scheduler (desire at least {} scheduled rows)",
1454 desired_scheduled
1455 );
1456 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1457 if actually_scheduled < desired_scheduled {
1458 let under_scheduled = desired_scheduled - actually_scheduled;
1459 to_take -= under_scheduled;
1460 }
1461 }
1462
1463 if to_take == 0 {
1464 return Ok(None);
1465 }
1466
1467 let loaded_need = self.rows_drained + to_take - 1;
1469 trace!(
1470 "Waiting for I/O (desire at least {} fully loaded rows)",
1471 loaded_need
1472 );
1473 self.root_decoder.wait_for_loaded(loaded_need).await?;
1474
1475 let next_task = self.root_decoder.drain(to_take)?;
1476 self.rows_drained += to_take;
1477 Ok(Some(next_task))
1478 }
1479
1480 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1481 let stream = futures::stream::unfold(self, |mut slf| async move {
1482 let next_task = slf.next_batch_task().await;
1483 let next_task = next_task.transpose().map(|next_task| {
1484 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1485 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1486 let task = async move {
1487 let next_task = next_task?;
1488 let (batch, _data_size) =
1492 tokio::spawn(
1493 async move { next_task.into_batch(emitted_batch_size_warning) },
1494 )
1495 .await
1496 .map_err(|err| Error::wrapped(err.into()))??;
1497 Ok(batch)
1498 };
1499 (task, num_rows)
1500 });
1501 next_task.map(|(task, num_rows)| {
1502 debug_assert!(num_rows <= u32::MAX as u64);
1504 let next_task = ReadBatchTask {
1505 task: task.boxed(),
1506 num_rows: num_rows as u32,
1507 };
1508 (next_task, slf)
1509 })
1510 });
1511 stream.boxed()
1512 }
1513}
1514
1515enum RootDecoderMessage {
1518 LoadedPage(LoadedPageShard),
1519 LegacyPage(crate::previous::decoder::DecoderReady),
1520}
1521trait RootDecoderType {
1522 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1523 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1524 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1525}
1526impl RootDecoderType for StructuralStructDecoder {
1527 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1528 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1529 unreachable!()
1530 };
1531 self.accept_page(loaded_page)
1532 }
1533 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1534 self.drain_batch_task(num_rows)
1535 }
1536 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1537 Ok(())
1539 }
1540}
1541impl RootDecoderType for SimpleStructDecoder {
1542 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1543 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1544 unreachable!()
1545 };
1546 self.accept_child(legacy_page)
1547 }
1548 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1549 self.drain(num_rows)
1550 }
1551 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1552 runtime.block_on(self.wait_for_loaded(loaded_need))
1553 }
1554}
1555
1556struct BatchDecodeIterator<T: RootDecoderType> {
1558 messages: VecDeque<Result<DecoderMessage>>,
1559 root_decoder: T,
1560 rows_remaining: u64,
1561 rows_per_batch: u32,
1562 rows_scheduled: u64,
1563 rows_drained: u64,
1564 emitted_batch_size_warning: Arc<Once>,
1565 wait_for_io_runtime: tokio::runtime::Runtime,
1569 schema: Arc<ArrowSchema>,
1570}
1571
1572impl<T: RootDecoderType> BatchDecodeIterator<T> {
1573 pub fn new(
1575 messages: VecDeque<Result<DecoderMessage>>,
1576 rows_per_batch: u32,
1577 num_rows: u64,
1578 root_decoder: T,
1579 schema: Arc<ArrowSchema>,
1580 ) -> Self {
1581 Self {
1582 messages,
1583 root_decoder,
1584 rows_remaining: num_rows,
1585 rows_per_batch,
1586 rows_scheduled: 0,
1587 rows_drained: 0,
1588 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1589 .build()
1590 .unwrap(),
1591 emitted_batch_size_warning: Arc::new(Once::new()),
1592 schema,
1593 }
1594 }
1595
1596 fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
1601 match maybe_done(unloaded_page.0) {
1602 MaybeDone::Done(loaded_page) => loaded_page,
1604 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1606 MaybeDone::Gone => unreachable!(),
1607 }
1608 }
1609
1610 #[instrument(skip_all)]
1615 fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
1616 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1617 let message = self.messages.pop_front().unwrap()?;
1618 self.rows_scheduled = message.scheduled_so_far;
1619 for decoder_message in message.decoders {
1620 match decoder_message {
1621 MessageType::UnloadedPage(unloaded_page) => {
1622 let loaded_page = self.wait_for_page(unloaded_page)?;
1623 self.root_decoder
1624 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1625 }
1626 MessageType::DecoderReady(decoder_ready) => {
1627 if !decoder_ready.path.is_empty() {
1629 self.root_decoder
1630 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1631 }
1632 }
1633 }
1634 }
1635 }
1636
1637 let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1638
1639 self.root_decoder
1640 .wait(loaded_need, &self.wait_for_io_runtime)?;
1641 Ok(self.rows_scheduled)
1642 }
1643
1644 #[instrument(level = "debug", skip_all)]
1645 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1646 trace!(
1647 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1648 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1649 );
1650 if self.rows_remaining == 0 {
1651 return Ok(None);
1652 }
1653
1654 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1655 self.rows_remaining -= to_take;
1656
1657 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1658 trace!(
1659 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1660 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1661 );
1662 if scheduled_need > 0 {
1663 let desired_scheduled = scheduled_need + self.rows_scheduled;
1664 trace!(
1665 "Draining from scheduler (desire at least {} scheduled rows)",
1666 desired_scheduled
1667 );
1668 let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
1669 if actually_scheduled < desired_scheduled {
1670 let under_scheduled = desired_scheduled - actually_scheduled;
1671 to_take -= under_scheduled;
1672 }
1673 }
1674
1675 if to_take == 0 {
1676 return Ok(None);
1677 }
1678
1679 let next_task = self.root_decoder.drain_batch(to_take)?;
1680
1681 self.rows_drained += to_take;
1682
1683 let (batch, _data_size) = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1684
1685 Ok(Some(batch))
1686 }
1687}
1688
1689impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1690 type Item = ArrowResult<RecordBatch>;
1691
1692 fn next(&mut self) -> Option<Self::Item> {
1693 self.next_batch_task()
1694 .transpose()
1695 .map(|r| r.map_err(ArrowError::from))
1696 }
1697}
1698
1699impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1700 fn schema(&self) -> Arc<ArrowSchema> {
1701 self.schema.clone()
1702 }
1703}
1704
1705fn estimate_bytes_per_row(data_type: &DataType) -> f64 {
1716 if let Some(w) = data_type.byte_width_opt() {
1717 return w as f64;
1718 }
1719 match data_type {
1720 DataType::Boolean => 1.0 / 8.0,
1721 DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => 64.0,
1722 DataType::Struct(fields) => fields
1723 .iter()
1724 .map(|f| estimate_bytes_per_row(f.data_type()))
1725 .sum(),
1726 DataType::List(child) | DataType::LargeList(child) => {
1727 5.0 * estimate_bytes_per_row(child.data_type())
1728 }
1729 DataType::FixedSizeList(child, dim) => {
1730 *dim as f64 * estimate_bytes_per_row(child.data_type())
1731 }
1732 DataType::Dictionary(_, value_type) => estimate_bytes_per_row(value_type),
1733 DataType::Map(entries, _) => 5.0 * estimate_bytes_per_row(entries.data_type()),
1734 _ => 64.0,
1735 }
1736}
1737
1738pub struct StructuralBatchDecodeStream {
1740 context: DecoderContext,
1741 root_decoder: StructuralStructDecoder,
1742 rows_remaining: u64,
1743 rows_per_batch: u32,
1744 rows_scheduled: u64,
1745 rows_drained: u64,
1746 scheduler_exhausted: bool,
1747 emitted_batch_size_warning: Arc<Once>,
1748 spawn_batch_decode_tasks: bool,
1756 batch_size_bytes: Option<u64>,
1758 schema_bytes_per_row: f64,
1761 bytes_per_row_feedback: Arc<AtomicU64>,
1764}
1765
1766impl StructuralBatchDecodeStream {
1767 pub fn new(
1777 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1778 rows_per_batch: u32,
1779 num_rows: u64,
1780 root_decoder: StructuralStructDecoder,
1781 spawn_batch_decode_tasks: bool,
1782 batch_size_bytes: Option<u64>,
1783 ) -> Self {
1784 let schema_bytes_per_row = if batch_size_bytes.is_some() {
1785 estimate_bytes_per_row(root_decoder.data_type()).max(1.0)
1786 } else {
1787 0.0
1788 };
1789 Self {
1790 context: DecoderContext::new(scheduled),
1791 root_decoder,
1792 rows_remaining: num_rows,
1793 rows_per_batch,
1794 rows_scheduled: 0,
1795 rows_drained: 0,
1796 scheduler_exhausted: false,
1797 emitted_batch_size_warning: Arc::new(Once::new()),
1798 spawn_batch_decode_tasks,
1799 batch_size_bytes,
1800 schema_bytes_per_row,
1801 bytes_per_row_feedback: Arc::new(AtomicU64::new(0)),
1802 }
1803 }
1804
1805 #[instrument(level = "debug", skip_all)]
1806 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1807 if self.scheduler_exhausted {
1808 return Ok(self.rows_scheduled);
1809 }
1810 while self.rows_scheduled < scheduled_need {
1811 let next_message = self.context.source.recv().await;
1812 match next_message {
1813 Some(scan_line) => {
1814 let scan_line = scan_line?;
1815 self.rows_scheduled = scan_line.scheduled_so_far;
1816 for message in scan_line.decoders {
1817 let unloaded_page = message.into_structural();
1818 let loaded_page = unloaded_page.0.await?;
1819 self.root_decoder.accept_page(loaded_page)?;
1820 }
1821 }
1822 None => {
1823 self.scheduler_exhausted = true;
1827 return Ok(self.rows_scheduled);
1828 }
1829 }
1830 }
1831 Ok(scheduled_need)
1832 }
1833
1834 #[instrument(level = "debug", skip_all)]
1835 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1836 trace!(
1837 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1838 self.rows_remaining, self.rows_drained, self.rows_scheduled,
1839 );
1840 if self.rows_remaining == 0 {
1841 return Ok(None);
1842 }
1843
1844 let mut to_take = if let Some(batch_size_bytes) = self.batch_size_bytes {
1845 let feedback = self.bytes_per_row_feedback.load(Ordering::Relaxed);
1846 let bpr = if feedback > 0 {
1847 feedback as f64
1848 } else {
1849 self.schema_bytes_per_row
1850 };
1851 let rows = (batch_size_bytes as f64 / bpr) as u64;
1852 self.rows_remaining.min(rows.max(1))
1853 } else {
1854 self.rows_remaining.min(self.rows_per_batch as u64)
1855 };
1856 self.rows_remaining -= to_take;
1857
1858 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1859 trace!(
1860 "scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
1861 scheduled_need, self.rows_drained, to_take, self.rows_scheduled
1862 );
1863 if scheduled_need > 0 {
1864 let desired_scheduled = scheduled_need + self.rows_scheduled;
1865 trace!(
1866 "Draining from scheduler (desire at least {} scheduled rows)",
1867 desired_scheduled
1868 );
1869 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1870 if actually_scheduled < desired_scheduled {
1871 let under_scheduled = desired_scheduled - actually_scheduled;
1872 to_take -= under_scheduled;
1873 }
1874 }
1875
1876 if to_take == 0 {
1877 return Ok(None);
1878 }
1879
1880 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1881 self.rows_drained += to_take;
1882 Ok(Some(next_task))
1883 }
1884
1885 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1886 let stream = futures::stream::unfold(self, |mut slf| async move {
1887 let next_task = slf.next_batch_task().await;
1888 let next_task = next_task.transpose().map(|next_task| {
1889 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1890 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1891 let bytes_per_row_feedback = slf.bytes_per_row_feedback.clone();
1892 let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
1895 let task = async move {
1896 let next_task = next_task?;
1897 let (batch, data_size) = if spawn_batch_decode_tasks {
1898 tokio::spawn(
1899 async move { next_task.into_batch(emitted_batch_size_warning) },
1900 )
1901 .await
1902 .map_err(|err| Error::wrapped(err.into()))??
1903 } else {
1904 next_task.into_batch(emitted_batch_size_warning)?
1905 };
1906 let num_rows = batch.num_rows() as u64;
1907 if num_rows > 0 {
1908 let bpr = data_size / num_rows;
1909 let prev = bytes_per_row_feedback.load(Ordering::Relaxed);
1910 let next = if prev == 0 || bpr >= prev {
1911 bpr
1914 } else {
1915 (prev + bpr) / 2
1919 };
1920 bytes_per_row_feedback.store(next.max(1), Ordering::Relaxed);
1921 }
1922 Ok(batch)
1923 };
1924 (task, num_rows)
1925 });
1926 next_task.map(|(task, num_rows)| {
1927 debug_assert!(num_rows <= u32::MAX as u64);
1929 let next_task = ReadBatchTask {
1930 task: task.boxed(),
1931 num_rows: num_rows as u32,
1932 };
1933 (next_task, slf)
1934 })
1935 });
1936 stream.boxed()
1937 }
1938}
1939
1940#[derive(Debug)]
1941pub enum RequestedRows {
1942 Ranges(Vec<Range<u64>>),
1943 Indices(Vec<u64>),
1944}
1945
1946impl RequestedRows {
1947 pub fn num_rows(&self) -> u64 {
1948 match self {
1949 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1950 Self::Indices(indices) => indices.len() as u64,
1951 }
1952 }
1953
1954 pub fn trim_empty_ranges(mut self) -> Self {
1955 if let Self::Ranges(ranges) = &mut self {
1956 ranges.retain(|r| !r.is_empty());
1957 }
1958 self
1959 }
1960}
1961
1962#[derive(Debug, Clone)]
1964pub struct DecoderConfig {
1965 pub cache_repetition_index: bool,
1972 pub validate_on_decode: bool,
1974 pub inline_scheduling: Option<bool>,
1992}
1993
1994impl Default for DecoderConfig {
1995 fn default() -> Self {
1996 Self {
1997 cache_repetition_index: default_cache_repetition_index(),
1998 validate_on_decode: false,
1999 inline_scheduling: None,
2000 }
2001 }
2002}
2003
2004#[derive(Debug, Clone)]
2005pub struct SchedulerDecoderConfig {
2006 pub decoder_plugins: Arc<DecoderPlugins>,
2007 pub batch_size: u32,
2008 pub io: Arc<dyn EncodingsIo>,
2009 pub cache: Arc<LanceCache>,
2010 pub decoder_config: DecoderConfig,
2012 pub batch_size_bytes: Option<u64>,
2017}
2018
2019fn check_scheduler_on_drop(
2020 stream: BoxStream<'static, ReadBatchTask>,
2021 scheduler_handle: tokio::task::JoinHandle<()>,
2022) -> BoxStream<'static, ReadBatchTask> {
2023 let abort_handle = scheduler_handle.abort_handle();
2027 let mut scheduler_handle = Some(scheduler_handle);
2028 let check_scheduler = stream::unfold((), move |_| {
2029 let handle = scheduler_handle.take();
2030 async move {
2031 if let Some(handle) = handle {
2032 handle.await.unwrap();
2033 }
2034 None
2035 }
2036 });
2037 stream
2038 .chain(check_scheduler)
2039 .on_drop(move || {
2040 abort_handle.abort();
2048 })
2049 .boxed()
2050}
2051
2052#[allow(clippy::too_many_arguments)]
2053pub fn create_decode_stream(
2054 schema: &Schema,
2055 num_rows: u64,
2056 batch_size: u32,
2057 is_structural: bool,
2058 should_validate: bool,
2059 spawn_structural_batch_decode_tasks: bool,
2060 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2061 batch_size_bytes: Option<u64>,
2062) -> Result<BoxStream<'static, ReadBatchTask>> {
2063 if is_structural {
2064 let arrow_schema = ArrowSchema::from(schema);
2065 let structural_decoder = StructuralStructDecoder::new(
2066 arrow_schema.fields,
2067 should_validate,
2068 true,
2069 )?;
2070 Ok(StructuralBatchDecodeStream::new(
2071 rx,
2072 batch_size,
2073 num_rows,
2074 structural_decoder,
2075 spawn_structural_batch_decode_tasks,
2076 batch_size_bytes,
2077 )
2078 .into_stream())
2079 } else {
2080 if batch_size_bytes.is_some() {
2081 warn!("batch_size_bytes is not supported for v2.0 files and will be ignored");
2082 }
2083 let arrow_schema = ArrowSchema::from(schema);
2084 let root_fields = arrow_schema.fields;
2085
2086 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2087 Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
2088 }
2089}
2090
2091pub fn create_decode_iterator(
2095 schema: &Schema,
2096 num_rows: u64,
2097 batch_size: u32,
2098 should_validate: bool,
2099 is_structural: bool,
2100 messages: VecDeque<Result<DecoderMessage>>,
2101) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2102 let arrow_schema = Arc::new(ArrowSchema::from(schema));
2103 let root_fields = arrow_schema.fields.clone();
2104 if is_structural {
2105 let simple_struct_decoder =
2106 StructuralStructDecoder::new(root_fields, should_validate, true)?;
2107 Ok(Box::new(BatchDecodeIterator::new(
2108 messages,
2109 batch_size,
2110 num_rows,
2111 simple_struct_decoder,
2112 arrow_schema,
2113 )))
2114 } else {
2115 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
2116 Ok(Box::new(BatchDecodeIterator::new(
2117 messages,
2118 batch_size,
2119 num_rows,
2120 root_decoder,
2121 arrow_schema,
2122 )))
2123 }
2124}
2125
2126async fn create_scheduler_decoder(
2127 column_infos: Vec<Arc<ColumnInfo>>,
2128 requested_rows: RequestedRows,
2129 filter: FilterExpression,
2130 column_indices: Vec<u32>,
2131 target_schema: Arc<Schema>,
2132 config: SchedulerDecoderConfig,
2133) -> Result<BoxStream<'static, ReadBatchTask>> {
2134 let num_rows = requested_rows.num_rows();
2135
2136 let is_structural = column_infos[0].is_structural();
2137 let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2138 let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
2139 Some("always") => true,
2140 Some("never") => false,
2141 _ => matches!(requested_rows, RequestedRows::Ranges(_)),
2142 };
2143
2144 let (tx, rx) = mpsc::unbounded_channel();
2145
2146 let decode_stream = create_decode_stream(
2147 &target_schema,
2148 num_rows,
2149 config.batch_size,
2150 is_structural,
2151 config.decoder_config.validate_on_decode,
2152 spawn_structural_batch_decode_tasks,
2153 rx,
2154 config.batch_size_bytes,
2155 )?;
2156
2157 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2162 target_schema.as_ref(),
2163 &column_indices,
2164 &column_infos,
2165 &vec![],
2166 num_rows,
2167 config.decoder_plugins,
2168 config.io.clone(),
2169 config.cache,
2170 &filter,
2171 &config.decoder_config,
2172 )
2173 .await?;
2174
2175 let inline_scheduling = config
2181 .decoder_config
2182 .inline_scheduling
2183 .unwrap_or_else(|| num_rows <= inline_scheduling_threshold());
2184
2185 if inline_scheduling {
2186 match requested_rows {
2187 RequestedRows::Ranges(ranges) => {
2188 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2189 }
2190 RequestedRows::Indices(indices) => {
2191 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2192 }
2193 }
2194 Ok(decode_stream)
2195 } else {
2196 let scheduling = async move {
2200 match requested_rows {
2201 RequestedRows::Ranges(ranges) => {
2202 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2203 }
2204 RequestedRows::Indices(indices) => {
2205 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2206 }
2207 }
2208 };
2209 let scheduler_handle = tokio::task::spawn(scheduling);
2210 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2211 }
2212}
2213
2214pub async fn schedule_and_decode(
2229 column_infos: Vec<Arc<ColumnInfo>>,
2230 requested_rows: RequestedRows,
2231 filter: FilterExpression,
2232 column_indices: Vec<u32>,
2233 target_schema: Arc<Schema>,
2234 config: SchedulerDecoderConfig,
2235) -> Result<BoxStream<'static, ReadBatchTask>> {
2236 if requested_rows.num_rows() == 0 {
2237 return Ok(stream::empty().boxed());
2238 }
2239
2240 let requested_rows = requested_rows.trim_empty_ranges();
2243
2244 let io = config.io.clone();
2245
2246 let stream = create_scheduler_decoder(
2247 column_infos,
2248 requested_rows,
2249 filter,
2250 column_indices,
2251 target_schema,
2252 config,
2253 )
2254 .await?;
2255
2256 Ok(stream.finally(move || drop(io)).boxed())
2259}
2260
2261pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
2262 tokio::runtime::Builder::new_current_thread()
2263 .build()
2264 .unwrap()
2265});
2266
2267pub fn schedule_and_decode_blocking(
2282 column_infos: Vec<Arc<ColumnInfo>>,
2283 requested_rows: RequestedRows,
2284 filter: FilterExpression,
2285 column_indices: Vec<u32>,
2286 target_schema: Arc<Schema>,
2287 config: SchedulerDecoderConfig,
2288) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2289 if requested_rows.num_rows() == 0 {
2290 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2291 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2292 }
2293
2294 let num_rows = requested_rows.num_rows();
2295 let is_structural = column_infos[0].is_structural();
2296
2297 let (tx, mut rx) = mpsc::unbounded_channel();
2298
2299 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2302 target_schema.as_ref(),
2303 &column_indices,
2304 &column_infos,
2305 &vec![],
2306 num_rows,
2307 config.decoder_plugins,
2308 config.io.clone(),
2309 config.cache,
2310 &filter,
2311 &config.decoder_config,
2312 ))?;
2313
2314 match requested_rows {
2316 RequestedRows::Ranges(ranges) => {
2317 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2318 }
2319 RequestedRows::Indices(indices) => {
2320 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2321 }
2322 }
2323
2324 let mut messages = Vec::new();
2326 while rx
2327 .recv_many(&mut messages, usize::MAX)
2328 .now_or_never()
2329 .unwrap()
2330 != 0
2331 {}
2332
2333 let decode_iterator = create_decode_iterator(
2335 &target_schema,
2336 num_rows,
2337 config.batch_size,
2338 config.decoder_config.validate_on_decode,
2339 is_structural,
2340 messages.into(),
2341 )?;
2342
2343 Ok(decode_iterator)
2344}
2345
2346pub trait PrimitivePageDecoder: Send + Sync {
2358 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2390}
2391
2392pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2401 fn schedule_ranges(
2413 &self,
2414 ranges: &[Range<u64>],
2415 scheduler: &Arc<dyn EncodingsIo>,
2416 top_level_row: u64,
2417 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2418}
2419
2420pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2422 fn advance(&mut self, num_rows: u64);
2423 fn current_priority(&self) -> u64;
2424 fn box_clone(&self) -> Box<dyn PriorityRange>;
2425}
2426
2427#[derive(Debug)]
2430pub struct SimplePriorityRange {
2431 priority: u64,
2432}
2433
2434impl SimplePriorityRange {
2435 fn new(priority: u64) -> Self {
2436 Self { priority }
2437 }
2438}
2439
2440impl PriorityRange for SimplePriorityRange {
2441 fn advance(&mut self, num_rows: u64) {
2442 self.priority += num_rows;
2443 }
2444
2445 fn current_priority(&self) -> u64 {
2446 self.priority
2447 }
2448
2449 fn box_clone(&self) -> Box<dyn PriorityRange> {
2450 Box::new(Self {
2451 priority: self.priority,
2452 })
2453 }
2454}
2455
2456pub struct ListPriorityRange {
2469 base: Box<dyn PriorityRange>,
2470 offsets: Arc<[u64]>,
2471 cur_index_into_offsets: usize,
2472 cur_position: u64,
2473}
2474
2475impl ListPriorityRange {
2476 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2477 Self {
2478 base,
2479 offsets,
2480 cur_index_into_offsets: 0,
2481 cur_position: 0,
2482 }
2483 }
2484}
2485
2486impl std::fmt::Debug for ListPriorityRange {
2487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2488 f.debug_struct("ListPriorityRange")
2489 .field("base", &self.base)
2490 .field("offsets.len()", &self.offsets.len())
2491 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2492 .field("cur_position", &self.cur_position)
2493 .finish()
2494 }
2495}
2496
2497impl PriorityRange for ListPriorityRange {
2498 fn advance(&mut self, num_rows: u64) {
2499 self.cur_position += num_rows;
2502 let mut idx_into_offsets = self.cur_index_into_offsets;
2503 while idx_into_offsets + 1 < self.offsets.len()
2504 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2505 {
2506 idx_into_offsets += 1;
2507 }
2508 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2509 self.cur_index_into_offsets = idx_into_offsets;
2510 self.base.advance(base_rows_advanced as u64);
2511 }
2512
2513 fn current_priority(&self) -> u64 {
2514 self.base.current_priority()
2515 }
2516
2517 fn box_clone(&self) -> Box<dyn PriorityRange> {
2518 Box::new(Self {
2519 base: self.base.box_clone(),
2520 offsets: self.offsets.clone(),
2521 cur_index_into_offsets: self.cur_index_into_offsets,
2522 cur_position: self.cur_position,
2523 })
2524 }
2525}
2526
2527pub struct SchedulerContext {
2529 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2530 io: Arc<dyn EncodingsIo>,
2531 cache: Arc<LanceCache>,
2532 name: String,
2533 path: Vec<u32>,
2534 path_names: Vec<String>,
2535}
2536
2537pub struct ScopedSchedulerContext<'a> {
2538 pub context: &'a mut SchedulerContext,
2539}
2540
2541impl<'a> ScopedSchedulerContext<'a> {
2542 pub fn pop(self) -> &'a mut SchedulerContext {
2543 self.context.pop();
2544 self.context
2545 }
2546}
2547
2548impl SchedulerContext {
2549 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2550 Self {
2551 io,
2552 cache,
2553 recv: None,
2554 name: "".to_string(),
2555 path: Vec::new(),
2556 path_names: Vec::new(),
2557 }
2558 }
2559
2560 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2561 &self.io
2562 }
2563
2564 pub fn cache(&self) -> &Arc<LanceCache> {
2565 &self.cache
2566 }
2567
2568 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2569 self.path.push(index);
2570 self.path_names.push(name.to_string());
2571 ScopedSchedulerContext { context: self }
2572 }
2573
2574 pub fn pop(&mut self) {
2575 self.path.pop();
2576 self.path_names.pop();
2577 }
2578
2579 pub fn path_name(&self) -> String {
2580 let path = self.path_names.join("/");
2581 if self.recv.is_some() {
2582 format!("TEMP({}){}", self.name, path)
2583 } else {
2584 format!("ROOT{}", path)
2585 }
2586 }
2587
2588 pub fn current_path(&self) -> VecDeque<u32> {
2589 VecDeque::from_iter(self.path.iter().copied())
2590 }
2591
2592 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2593 pub fn locate_decoder(
2594 &mut self,
2595 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2596 ) -> crate::previous::decoder::DecoderReady {
2597 trace!(
2598 "Scheduling decoder of type {:?} for {:?}",
2599 decoder.data_type(),
2600 self.path,
2601 );
2602 crate::previous::decoder::DecoderReady {
2603 decoder,
2604 path: self.current_path(),
2605 }
2606 }
2607}
2608
2609pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
2610
2611impl std::fmt::Debug for UnloadedPageShard {
2612 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2613 f.debug_struct("UnloadedPage").finish()
2614 }
2615}
2616
2617#[derive(Debug)]
2618pub struct ScheduledScanLine {
2619 pub rows_scheduled: u64,
2620 pub decoders: Vec<MessageType>,
2621}
2622
2623pub trait StructuralSchedulingJob: std::fmt::Debug {
2624 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
2631}
2632
2633pub struct FilterExpression(pub Bytes);
2641
2642impl FilterExpression {
2643 pub fn no_filter() -> Self {
2648 Self(Bytes::new())
2649 }
2650
2651 pub fn is_noop(&self) -> bool {
2653 self.0.is_empty()
2654 }
2655}
2656
2657pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2658 fn initialize<'a>(
2659 &'a mut self,
2660 filter: &'a FilterExpression,
2661 context: &'a SchedulerContext,
2662 ) -> BoxFuture<'a, Result<()>>;
2663 fn schedule_ranges<'a>(
2664 &'a self,
2665 ranges: &[Range<u64>],
2666 filter: &FilterExpression,
2667 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2668}
2669
2670pub trait DecodeArrayTask: Send {
2672 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)>;
2674}
2675
2676impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2677 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
2678 StructuralDecodeArrayTask::decode(*self)
2679 .map(|decoded_array| (decoded_array.array, decoded_array.data_size))
2680 }
2681}
2682
2683pub struct NextDecodeTask {
2688 pub task: Box<dyn DecodeArrayTask>,
2690 pub num_rows: u64,
2692}
2693
2694impl NextDecodeTask {
2695 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2700 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<(RecordBatch, u64)> {
2701 let (struct_arr, data_size) = self
2702 .task
2703 .decode()
2704 .map_err(|e| Error::internal(format!("Error decoding batch: {}", e)))?;
2705 let batch = RecordBatch::from(struct_arr.as_struct());
2706 if data_size > BATCH_SIZE_BYTES_WARNING {
2707 emitted_batch_size_warning.call_once(|| {
2708 let size_mb = data_size / 1024 / 1024;
2709 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2710 });
2711 }
2712 Ok((batch, data_size))
2713 }
2714}
2715
2716#[derive(Debug)]
2720pub enum MessageType {
2721 DecoderReady(crate::previous::decoder::DecoderReady),
2726 UnloadedPage(UnloadedPageShard),
2730}
2731
2732impl MessageType {
2733 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2734 match self {
2735 Self::DecoderReady(decoder) => decoder,
2736 Self::UnloadedPage(_) => {
2737 panic!("Expected DecoderReady but got UnloadedPage")
2738 }
2739 }
2740 }
2741
2742 pub fn into_structural(self) -> UnloadedPageShard {
2743 match self {
2744 Self::UnloadedPage(unloaded) => unloaded,
2745 Self::DecoderReady(_) => {
2746 panic!("Expected UnloadedPage but got DecoderReady")
2747 }
2748 }
2749 }
2750}
2751
2752pub struct DecoderMessage {
2753 pub scheduled_so_far: u64,
2754 pub decoders: Vec<MessageType>,
2755}
2756
2757pub struct DecoderContext {
2758 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2759}
2760
2761impl DecoderContext {
2762 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2763 Self { source }
2764 }
2765}
2766
2767pub struct DecodedPage {
2768 pub data: DataBlock,
2769 pub repdef: RepDefUnraveler,
2770}
2771
2772pub trait DecodePageTask: Send + std::fmt::Debug {
2773 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2775}
2776
2777pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2778 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2779 fn num_rows(&self) -> u64;
2780}
2781
2782#[derive(Debug)]
2783pub struct LoadedPageShard {
2784 pub decoder: Box<dyn StructuralPageDecoder>,
2786 pub path: VecDeque<u32>,
2805}
2806
2807pub struct DecodedArray {
2808 pub array: ArrayRef,
2809 pub repdef: CompositeRepDefUnraveler,
2810 pub data_size: u64,
2812}
2813
2814pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2815 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2816}
2817
2818pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2819 fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
2824 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2826 fn data_type(&self) -> &DataType;
2828}
2829
2830#[derive(Debug, Default)]
2831pub struct DecoderPlugins {}
2832
2833pub async fn decode_batch(
2835 batch: &EncodedBatch,
2836 filter: &FilterExpression,
2837 decoder_plugins: Arc<DecoderPlugins>,
2838 should_validate: bool,
2839 version: LanceFileVersion,
2840 cache: Option<Arc<LanceCache>>,
2841) -> Result<RecordBatch> {
2842 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2847 let cache = if let Some(cache) = cache {
2848 cache
2849 } else {
2850 Arc::new(lance_core::cache::LanceCache::with_capacity(
2851 128 * 1024 * 1024,
2852 ))
2853 };
2854 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2855 batch.schema.as_ref(),
2856 &batch.top_level_columns,
2857 &batch.page_table,
2858 &vec![],
2859 batch.num_rows,
2860 decoder_plugins,
2861 io_scheduler.clone(),
2862 cache,
2863 filter,
2864 &DecoderConfig::default(),
2865 )
2866 .await?;
2867 let (tx, rx) = unbounded_channel();
2868 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2869 let is_structural = version >= LanceFileVersion::V2_1;
2870 let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
2871 let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
2872 let mut decode_stream = create_decode_stream(
2873 &batch.schema,
2874 batch.num_rows,
2875 batch.num_rows as u32,
2876 is_structural,
2877 should_validate,
2878 spawn_structural_batch_decode_tasks,
2879 rx,
2880 None,
2881 )?;
2882 decode_stream.next().await.unwrap().task.await
2883}
2884
2885#[cfg(test)]
2886mod tests {
2888 use super::*;
2889
2890 #[test]
2891 fn test_coalesce_indices_to_ranges_with_single_index() {
2892 let indices = vec![1];
2893 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2894 assert_eq!(ranges, vec![1..2]);
2895 }
2896
2897 #[test]
2898 fn test_coalesce_indices_to_ranges() {
2899 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2900 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2901 assert_eq!(ranges, vec![1..10]);
2902 }
2903
2904 #[test]
2905 fn test_coalesce_indices_to_ranges_with_gaps() {
2906 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2907 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2908 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2909 }
2910
2911 #[test]
2912 fn test_estimate_bytes_per_row() {
2913 assert_eq!(estimate_bytes_per_row(&DataType::Int32), 4.0);
2914 assert_eq!(estimate_bytes_per_row(&DataType::Int64), 8.0);
2915 assert_eq!(estimate_bytes_per_row(&DataType::Float32), 4.0);
2916 assert_eq!(estimate_bytes_per_row(&DataType::Boolean), 1.0 / 8.0);
2917 assert_eq!(estimate_bytes_per_row(&DataType::Utf8), 64.0);
2918 assert_eq!(estimate_bytes_per_row(&DataType::Binary), 64.0);
2919 let struct_type = DataType::Struct(Fields::from(vec![
2921 ArrowField::new("a", DataType::Int32, false),
2922 ArrowField::new("b", DataType::Int32, false),
2923 ArrowField::new("c", DataType::Int32, false),
2924 ArrowField::new("d", DataType::Int32, false),
2925 ]));
2926 assert_eq!(estimate_bytes_per_row(&struct_type), 16.0);
2927 }
2928
2929 async fn decode_batches_with_byte_limit(
2932 batch: &RecordBatch,
2933 batch_size: u32,
2934 batch_size_bytes: Option<u64>,
2935 ) -> Vec<RecordBatch> {
2936 use crate::encoder::{EncodingOptions, default_encoding_strategy, encode_batch};
2937 use crate::version::LanceFileVersion;
2938
2939 let version = LanceFileVersion::V2_1;
2940 let options = EncodingOptions {
2941 version,
2942 ..Default::default()
2943 };
2944 let strategy = default_encoding_strategy(version);
2945 let schema = Schema::try_from(batch.schema().as_ref()).unwrap();
2946 let encoded = encode_batch(batch, Arc::new(schema.clone()), strategy.as_ref(), &options)
2947 .await
2948 .unwrap();
2949
2950 let io_scheduler =
2951 Arc::new(BufferScheduler::new(encoded.data.clone())) as Arc<dyn EncodingsIo>;
2952 let cache = Arc::new(lance_core::cache::LanceCache::with_capacity(
2953 128 * 1024 * 1024,
2954 ));
2955 let decoder_plugins = Arc::new(DecoderPlugins::default());
2956
2957 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2958 encoded.schema.as_ref(),
2959 &encoded.top_level_columns,
2960 &encoded.page_table,
2961 &vec![],
2962 encoded.num_rows,
2963 decoder_plugins,
2964 io_scheduler.clone(),
2965 cache,
2966 &FilterExpression::no_filter(),
2967 &DecoderConfig::default(),
2968 )
2969 .await
2970 .unwrap();
2971
2972 let (tx, rx) = unbounded_channel();
2973 decode_scheduler.schedule_range(
2974 0..encoded.num_rows,
2975 &FilterExpression::no_filter(),
2976 tx,
2977 io_scheduler,
2978 );
2979
2980 let mut decode_stream = create_decode_stream(
2981 &encoded.schema,
2982 encoded.num_rows,
2983 batch_size,
2984 true,
2985 true,
2986 true,
2987 rx,
2988 batch_size_bytes,
2989 )
2990 .unwrap();
2991
2992 let mut batches = Vec::new();
2993 while let Some(task) = decode_stream.next().await {
2994 batches.push(task.task.await.unwrap());
2995 }
2996 batches
2997 }
2998
2999 #[tokio::test]
3000 async fn test_byte_sized_batches_fixed_width() {
3001 use arrow_array::Int32Array;
3002
3003 let num_rows: i32 = 1000;
3005 let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..4)
3006 .map(|col| {
3007 Arc::new(Int32Array::from_iter_values(
3008 (0..num_rows).map(move |row| row * 10 + col),
3009 )) as _
3010 })
3011 .collect();
3012
3013 let schema = Arc::new(ArrowSchema::new(vec![
3014 ArrowField::new("a", DataType::Int32, false),
3015 ArrowField::new("b", DataType::Int32, false),
3016 ArrowField::new("c", DataType::Int32, false),
3017 ArrowField::new("d", DataType::Int32, false),
3018 ]));
3019 let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3020
3021 let batches =
3023 decode_batches_with_byte_limit(&input_batch, 1024, Some(1600)).await;
3024
3025 assert_eq!(batches.len(), 10);
3027 for (i, batch) in batches.iter().enumerate() {
3028 assert_eq!(
3029 batch.num_rows(),
3030 100,
3031 "batch {i} should have 100 rows, got {}",
3032 batch.num_rows()
3033 );
3034 }
3035
3036 let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3038 let concatenated =
3039 arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3040 .unwrap();
3041 assert_eq!(concatenated.num_rows(), num_rows as usize);
3042 for col in 0..4 {
3043 assert_eq!(
3044 concatenated.column(col).as_ref(),
3045 input_batch.column(col).as_ref(),
3046 "column {col} roundtrip mismatch"
3047 );
3048 }
3049 }
3050
3051 #[tokio::test]
3052 async fn test_byte_sized_batches_none_unchanged() {
3053 use arrow_array::Int32Array;
3054
3055 let num_rows: i32 = 1000;
3057 let arrays: Vec<Arc<dyn arrow_array::Array>> = (0..2)
3058 .map(|col| {
3059 Arc::new(Int32Array::from_iter_values(
3060 (0..num_rows).map(move |row| row * 10 + col),
3061 )) as _
3062 })
3063 .collect();
3064
3065 let schema = Arc::new(ArrowSchema::new(vec![
3066 ArrowField::new("x", DataType::Int32, false),
3067 ArrowField::new("y", DataType::Int32, false),
3068 ]));
3069 let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3070
3071 let batches = decode_batches_with_byte_limit(&input_batch, 250, None).await;
3073 assert_eq!(batches.len(), 4);
3074 for (i, batch) in batches.iter().enumerate() {
3075 assert_eq!(
3076 batch.num_rows(),
3077 250,
3078 "batch {i} should have 250 rows, got {}",
3079 batch.num_rows()
3080 );
3081 }
3082 }
3083
3084 #[tokio::test]
3085 async fn test_byte_sized_batches_feedback_convergence() {
3086 use arrow_array::StringArray;
3087
3088 let num_rows = 500;
3092 let value: String = "x".repeat(100);
3093 let arrays: Vec<Arc<dyn arrow_array::Array>> = vec![Arc::new(StringArray::from(
3094 (0..num_rows).map(|_| value.as_str()).collect::<Vec<_>>(),
3095 ))];
3096 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
3097 "s",
3098 DataType::Utf8,
3099 false,
3100 )]));
3101 let input_batch = RecordBatch::try_new(schema, arrays).unwrap();
3102
3103 let target_bytes: u64 = 5000;
3107 let batches = decode_batches_with_byte_limit(
3108 &input_batch,
3109 1024,
3110 Some(target_bytes),
3111 )
3112 .await;
3113
3114 let all_batches: Vec<&RecordBatch> = batches.iter().collect();
3116 let concatenated =
3117 arrow_select::concat::concat_batches(&batches[0].schema(), all_batches.iter().copied())
3118 .unwrap();
3119 assert_eq!(concatenated.num_rows(), num_rows as usize);
3120 assert_eq!(
3121 concatenated.column(0).as_ref(),
3122 input_batch.column(0).as_ref()
3123 );
3124
3125 assert!(
3128 batches.len() >= 2,
3129 "need at least 2 batches to test convergence"
3130 );
3131 if batches.len() >= 3 {
3134 let second_batch_rows = batches[1].num_rows();
3135 let third_batch_rows = batches[2].num_rows();
3136 assert!(
3138 (40..=60).contains(&second_batch_rows),
3139 "second batch should be near 50 rows, got {second_batch_rows}"
3140 );
3141 assert!(
3142 (40..=60).contains(&third_batch_rows),
3143 "third batch should be near 50 rows, got {third_batch_rows}"
3144 );
3145 }
3146 }
3147}