1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
245use crate::v2::decoder::LogicalPageDecoder;
246use crate::v2::encodings::logical::list::OffsetPageInfo;
247use crate::v2::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::v2::encodings::logical::{
249 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250 primitive::PrimitiveFieldScheduler,
251};
252use crate::version::LanceFileVersion;
253use crate::{BufferScheduler, EncodingsIo};
254
255const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
257
258#[derive(Debug)]
265pub enum PageEncoding {
266 Legacy(pb::ArrayEncoding),
267 Structural(pb::PageLayout),
268}
269
270impl PageEncoding {
271 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
272 match self {
273 Self::Legacy(enc) => enc,
274 Self::Structural(_) => panic!("Expected a legacy encoding"),
275 }
276 }
277
278 pub fn as_structural(&self) -> &pb::PageLayout {
279 match self {
280 Self::Structural(enc) => enc,
281 Self::Legacy(_) => panic!("Expected a structural encoding"),
282 }
283 }
284
285 pub fn is_structural(&self) -> bool {
286 matches!(self, Self::Structural(_))
287 }
288}
289
290#[derive(Debug)]
294pub struct PageInfo {
295 pub num_rows: u64,
297 pub priority: u64,
301 pub encoding: PageEncoding,
303 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
305}
306
307#[derive(Debug, Clone)]
311pub struct ColumnInfo {
312 pub index: u32,
314 pub page_infos: Arc<[PageInfo]>,
316 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
318 pub encoding: pb::ColumnEncoding,
319}
320
321impl ColumnInfo {
322 pub fn new(
324 index: u32,
325 page_infos: Arc<[PageInfo]>,
326 buffer_offsets_and_sizes: Vec<(u64, u64)>,
327 encoding: pb::ColumnEncoding,
328 ) -> Self {
329 Self {
330 index,
331 page_infos,
332 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
333 encoding,
334 }
335 }
336
337 pub fn is_structural(&self) -> bool {
338 self.page_infos
339 .first()
341 .map(|page| page.encoding.is_structural())
342 .unwrap_or(false)
343 }
344}
345
346enum RootScheduler {
347 Structural(Box<dyn StructuralFieldScheduler>),
348 Legacy(Arc<dyn crate::v2::decoder::FieldScheduler>),
349}
350
351impl RootScheduler {
352 fn as_legacy(&self) -> &Arc<dyn crate::v2::decoder::FieldScheduler> {
353 match self {
354 Self::Structural(_) => panic!("Expected a legacy scheduler"),
355 Self::Legacy(s) => s,
356 }
357 }
358
359 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
360 match self {
361 Self::Structural(s) => s.as_ref(),
362 Self::Legacy(_) => panic!("Expected a structural scheduler"),
363 }
364 }
365}
366
367pub struct DecodeBatchScheduler {
389 root_scheduler: RootScheduler,
390 pub root_fields: Fields,
391 cache: Arc<LanceCache>,
392}
393
394pub struct ColumnInfoIter<'a> {
395 column_infos: Vec<Arc<ColumnInfo>>,
396 column_indices: &'a [u32],
397 column_info_pos: usize,
398 column_indices_pos: usize,
399}
400
401impl<'a> ColumnInfoIter<'a> {
402 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
403 let initial_pos = column_indices[0] as usize;
404 Self {
405 column_infos,
406 column_indices,
407 column_info_pos: initial_pos,
408 column_indices_pos: 0,
409 }
410 }
411
412 pub fn peek(&self) -> &Arc<ColumnInfo> {
413 &self.column_infos[self.column_info_pos]
414 }
415
416 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
417 let column_info = self.column_infos[self.column_info_pos].clone();
418 let transformed = transform(column_info);
419 self.column_infos[self.column_info_pos] = transformed;
420 }
421
422 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
423 self.next().ok_or_else(|| {
424 Error::invalid_input(
425 "there were more fields in the schema than provided column indices / infos",
426 location!(),
427 )
428 })
429 }
430
431 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
432 if self.column_info_pos < self.column_infos.len() {
433 let info = &self.column_infos[self.column_info_pos];
434 self.column_info_pos += 1;
435 Some(info)
436 } else {
437 None
438 }
439 }
440
441 pub(crate) fn next_top_level(&mut self) {
442 self.column_indices_pos += 1;
443 if self.column_indices_pos < self.column_indices.len() {
444 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
445 } else {
446 self.column_info_pos = self.column_infos.len();
447 }
448 }
449}
450
451#[derive(Clone, Copy, Debug)]
453pub struct FileBuffers<'a> {
454 pub positions_and_sizes: &'a [(u64, u64)],
455}
456
457#[derive(Clone, Copy, Debug)]
459pub struct ColumnBuffers<'a, 'b> {
460 pub file_buffers: FileBuffers<'a>,
461 pub positions_and_sizes: &'b [(u64, u64)],
462}
463
464#[derive(Clone, Copy, Debug)]
466pub struct PageBuffers<'a, 'b, 'c> {
467 pub column_buffers: ColumnBuffers<'a, 'b>,
468 pub positions_and_sizes: &'c [(u64, u64)],
469}
470
471#[derive(Debug)]
473pub struct CoreFieldDecoderStrategy {
474 pub validate_data: bool,
475 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
476 pub cache_repetition_index: bool,
477}
478
479impl Default for CoreFieldDecoderStrategy {
480 fn default() -> Self {
481 Self {
482 validate_data: false,
483 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
484 cache_repetition_index: false,
485 }
486 }
487}
488
489impl CoreFieldDecoderStrategy {
490 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
492 self.cache_repetition_index = cache_repetition_index;
493 self
494 }
495
496 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
499 let column_encoding = column_info
500 .encoding
501 .column_encoding
502 .as_ref()
503 .ok_or_else(|| {
504 Error::invalid_input(
505 format!(
506 "the column at index {} was missing a ColumnEncoding",
507 column_info.index
508 ),
509 location!(),
510 )
511 })?;
512 if matches!(
513 column_encoding,
514 pb::column_encoding::ColumnEncoding::Values(_)
515 ) {
516 Ok(())
517 } else {
518 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
519 }
520 }
521
522 fn is_primitive(data_type: &DataType) -> bool {
523 if data_type.is_primitive() {
524 true
525 } else {
526 match data_type {
527 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
529 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
530 _ => false,
531 }
532 }
533 }
534
535 fn create_primitive_scheduler(
536 &self,
537 field: &Field,
538 column: &ColumnInfo,
539 buffers: FileBuffers,
540 ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
541 Self::ensure_values_encoded(column, &field.name)?;
542 let column_buffers = ColumnBuffers {
544 file_buffers: buffers,
545 positions_and_sizes: &column.buffer_offsets_and_sizes,
546 };
547 Ok(Box::new(PrimitiveFieldScheduler::new(
548 column.index,
549 field.data_type(),
550 column.page_infos.clone(),
551 column_buffers,
552 self.validate_data,
553 )))
554 }
555
556 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
558 Self::ensure_values_encoded(column_info, field_name)?;
559 if column_info.page_infos.len() != 1 {
560 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
561 }
562 let encoding = &column_info.page_infos[0].encoding;
563 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
564 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
565 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
566 }
567 }
568
569 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
570 let encoding = &column_info.page_infos[0].encoding;
571 matches!(
572 encoding.as_legacy().array_encoding.as_ref().unwrap(),
573 pb::array_encoding::ArrayEncoding::PackedStruct(_)
574 )
575 }
576
577 fn create_list_scheduler(
578 &self,
579 list_field: &Field,
580 column_infos: &mut ColumnInfoIter,
581 buffers: FileBuffers,
582 offsets_column: &ColumnInfo,
583 ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
584 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
585 let offsets_column_buffers = ColumnBuffers {
586 file_buffers: buffers,
587 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
588 };
589 let items_scheduler =
590 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
591
592 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
593 .page_infos
594 .iter()
595 .filter(|offsets_page| offsets_page.num_rows > 0)
596 .map(|offsets_page| {
597 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
598 &offsets_page.encoding.as_legacy().array_encoding
599 {
600 let inner = PageInfo {
601 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
602 encoding: PageEncoding::Legacy(
603 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
604 ),
605 num_rows: offsets_page.num_rows,
606 priority: 0,
607 };
608 (
609 inner,
610 OffsetPageInfo {
611 offsets_in_page: offsets_page.num_rows,
612 null_offset_adjustment: list_encoding.null_offset_adjustment,
613 num_items_referenced_by_page: list_encoding.num_items,
614 },
615 )
616 } else {
617 panic!("Expected a list column");
619 }
620 })
621 .unzip();
622 let inner = Arc::new(PrimitiveFieldScheduler::new(
623 offsets_column.index,
624 DataType::UInt64,
625 Arc::from(inner_infos.into_boxed_slice()),
626 offsets_column_buffers,
627 self.validate_data,
628 )) as Arc<dyn crate::v2::decoder::FieldScheduler>;
629 let items_field = match list_field.data_type() {
630 DataType::List(inner) => inner,
631 DataType::LargeList(inner) => inner,
632 _ => unreachable!(),
633 };
634 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
635 DataType::Int32
636 } else {
637 DataType::Int64
638 };
639 Ok(Box::new(ListFieldScheduler::new(
640 inner,
641 items_scheduler.into(),
642 items_field,
643 offset_type,
644 null_offset_adjustments,
645 )))
646 }
647
648 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
649 if let column_encoding::ColumnEncoding::Blob(blob) =
650 column_info.encoding.column_encoding.as_ref().unwrap()
651 {
652 let mut column_info = column_info.clone();
653 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
654 Some(column_info)
655 } else {
656 None
657 }
658 }
659
660 fn create_structural_field_scheduler(
661 &self,
662 field: &Field,
663 column_infos: &mut ColumnInfoIter,
664 ) -> Result<Box<dyn StructuralFieldScheduler>> {
665 let data_type = field.data_type();
666 if Self::is_primitive(&data_type) {
667 let column_info = column_infos.expect_next()?;
668 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
669 column_info.as_ref(),
670 self.decompressor_strategy.as_ref(),
671 self.cache_repetition_index,
672 )?);
673
674 column_infos.next_top_level();
676
677 return Ok(scheduler);
678 }
679 match &data_type {
680 DataType::Struct(fields) => {
681 if field.is_packed_struct() {
682 let column_info = column_infos.expect_next()?;
683 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
684 column_info.as_ref(),
685 self.decompressor_strategy.as_ref(),
686 self.cache_repetition_index,
687 )?);
688
689 column_infos.next_top_level();
691
692 return Ok(scheduler);
693 }
694 let mut child_schedulers = Vec::with_capacity(field.children.len());
695 for field in field.children.iter() {
696 let field_scheduler =
697 self.create_structural_field_scheduler(field, column_infos)?;
698 child_schedulers.push(field_scheduler);
699 }
700
701 let fields = fields.clone();
702 Ok(
703 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
704 as Box<dyn StructuralFieldScheduler>,
705 )
706 }
707 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
708 let column_info = column_infos.expect_next()?;
709 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
710 column_info.as_ref(),
711 self.decompressor_strategy.as_ref(),
712 self.cache_repetition_index,
713 )?);
714 column_infos.next_top_level();
715 Ok(scheduler)
716 }
717 DataType::List(_) | DataType::LargeList(_) => {
718 let child = field
719 .children
720 .first()
721 .expect("List field must have a child");
722 let child_scheduler =
723 self.create_structural_field_scheduler(child, column_infos)?;
724 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
725 as Box<dyn StructuralFieldScheduler>)
726 }
727 _ => todo!(),
728 }
729 }
730
731 fn create_legacy_field_scheduler(
732 &self,
733 field: &Field,
734 column_infos: &mut ColumnInfoIter,
735 buffers: FileBuffers,
736 ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
737 let data_type = field.data_type();
738 if Self::is_primitive(&data_type) {
739 let column_info = column_infos.expect_next()?;
740 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
741 return Ok(scheduler);
742 } else if data_type.is_binary_like() {
743 let column_info = column_infos.next().unwrap().clone();
744 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
746 let desc_scheduler =
747 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
748 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
749 return Ok(blob_scheduler);
750 }
751 if let Some(page_info) = column_info.page_infos.first() {
752 if matches!(
753 page_info.encoding.as_legacy(),
754 pb::ArrayEncoding {
755 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
756 }
757 ) {
758 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
759 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
760 } else {
761 DataType::LargeList(Arc::new(ArrowField::new(
762 "item",
763 DataType::UInt8,
764 false,
765 )))
766 };
767 let list_field = Field::try_from(ArrowField::new(
768 field.name.clone(),
769 list_type,
770 field.nullable,
771 ))
772 .unwrap();
773 let list_scheduler = self.create_list_scheduler(
774 &list_field,
775 column_infos,
776 buffers,
777 &column_info,
778 )?;
779 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
780 list_scheduler.into(),
781 field.data_type(),
782 ));
783 return Ok(binary_scheduler);
784 } else {
785 let scheduler =
786 self.create_primitive_scheduler(field, &column_info, buffers)?;
787 return Ok(scheduler);
788 }
789 } else {
790 return self.create_primitive_scheduler(field, &column_info, buffers);
791 }
792 }
793 match &data_type {
794 DataType::FixedSizeList(inner, _dimension) => {
795 if Self::is_primitive(inner.data_type()) {
798 let primitive_col = column_infos.expect_next()?;
799 let scheduler =
800 self.create_primitive_scheduler(field, primitive_col, buffers)?;
801 Ok(scheduler)
802 } else {
803 todo!()
804 }
805 }
806 DataType::Dictionary(_key_type, value_type) => {
807 if Self::is_primitive(value_type) || value_type.is_binary_like() {
808 let primitive_col = column_infos.expect_next()?;
809 let scheduler =
810 self.create_primitive_scheduler(field, primitive_col, buffers)?;
811 Ok(scheduler)
812 } else {
813 Err(Error::NotSupported {
814 source: format!(
815 "No way to decode into a dictionary field of type {}",
816 value_type
817 )
818 .into(),
819 location: location!(),
820 })
821 }
822 }
823 DataType::List(_) | DataType::LargeList(_) => {
824 let offsets_column = column_infos.expect_next()?.clone();
825 column_infos.next_top_level();
826 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
827 }
828 DataType::Struct(fields) => {
829 let column_info = column_infos.expect_next()?;
830
831 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
833 return self.create_primitive_scheduler(field, &blob_col, buffers);
835 }
836
837 if Self::check_packed_struct(column_info) {
838 self.create_primitive_scheduler(field, column_info, buffers)
840 } else {
841 Self::check_simple_struct(column_info, &field.name).unwrap();
843 let num_rows = column_info
844 .page_infos
845 .iter()
846 .map(|page| page.num_rows)
847 .sum();
848 let mut child_schedulers = Vec::with_capacity(field.children.len());
849 for field in &field.children {
850 column_infos.next_top_level();
851 let field_scheduler =
852 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
853 child_schedulers.push(Arc::from(field_scheduler));
854 }
855
856 let fields = fields.clone();
857 Ok(Box::new(SimpleStructScheduler::new(
858 child_schedulers,
859 fields,
860 num_rows,
861 )))
862 }
863 }
864 _ => todo!(),
866 }
867 }
868}
869
870fn root_column(num_rows: u64) -> ColumnInfo {
872 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
873 let final_page_num_rows = num_rows % (u32::MAX as u64);
874 let root_pages = (0..num_root_pages)
875 .map(|i| PageInfo {
876 num_rows: if i == num_root_pages - 1 {
877 final_page_num_rows
878 } else {
879 u64::MAX
880 },
881 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
882 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
883 pb::SimpleStruct {},
884 )),
885 }),
886 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
888 })
889 .collect::<Vec<_>>();
890 ColumnInfo {
891 buffer_offsets_and_sizes: Arc::new([]),
892 encoding: pb::ColumnEncoding {
893 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
894 },
895 index: u32::MAX,
896 page_infos: Arc::from(root_pages),
897 }
898}
899
900pub enum RootDecoder {
901 Structural(StructuralStructDecoder),
902 Legacy(SimpleStructDecoder),
903}
904
905impl RootDecoder {
906 pub fn into_structural(self) -> StructuralStructDecoder {
907 match self {
908 Self::Structural(decoder) => decoder,
909 Self::Legacy(_) => panic!("Expected a structural decoder"),
910 }
911 }
912
913 pub fn into_legacy(self) -> SimpleStructDecoder {
914 match self {
915 Self::Legacy(decoder) => decoder,
916 Self::Structural(_) => panic!("Expected a legacy decoder"),
917 }
918 }
919}
920
921impl DecodeBatchScheduler {
922 #[allow(clippy::too_many_arguments)]
925 pub async fn try_new<'a>(
926 schema: &'a Schema,
927 column_indices: &[u32],
928 column_infos: &[Arc<ColumnInfo>],
929 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
930 num_rows: u64,
931 _decoder_plugins: Arc<DecoderPlugins>,
932 io: Arc<dyn EncodingsIo>,
933 cache: Arc<LanceCache>,
934 filter: &FilterExpression,
935 cache_repetition_index: bool,
936 ) -> Result<Self> {
937 assert!(num_rows > 0);
938 let buffers = FileBuffers {
939 positions_and_sizes: file_buffer_positions_and_sizes,
940 };
941 let arrow_schema = ArrowSchema::from(schema);
942 let root_fields = arrow_schema.fields().clone();
943 let root_type = DataType::Struct(root_fields.clone());
944 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
945 root_field.children.clone_from(&schema.fields);
949 root_field
950 .metadata
951 .insert("__lance_decoder_root".to_string(), "true".to_string());
952
953 if column_infos[0].is_structural() {
954 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
955
956 let strategy = CoreFieldDecoderStrategy::default()
957 .with_cache_repetition_index(cache_repetition_index);
958 let mut root_scheduler =
959 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
960
961 let context = SchedulerContext::new(io, cache.clone());
962 root_scheduler.initialize(filter, &context).await?;
963
964 Ok(Self {
965 root_scheduler: RootScheduler::Structural(root_scheduler),
966 root_fields,
967 cache,
968 })
969 } else {
970 let mut columns = Vec::with_capacity(column_infos.len() + 1);
973 columns.push(Arc::new(root_column(num_rows)));
974 columns.extend(column_infos.iter().cloned());
975
976 let adjusted_column_indices = [0_u32]
977 .into_iter()
978 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
979 .collect::<Vec<_>>();
980 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
981 let strategy = CoreFieldDecoderStrategy::default()
982 .with_cache_repetition_index(cache_repetition_index);
983 let root_scheduler =
984 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
985
986 let context = SchedulerContext::new(io, cache.clone());
987 root_scheduler.initialize(filter, &context).await?;
988
989 Ok(Self {
990 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
991 root_fields,
992 cache,
993 })
994 }
995 }
996
997 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
998 pub fn from_scheduler(
999 root_scheduler: Arc<dyn crate::v2::decoder::FieldScheduler>,
1000 root_fields: Fields,
1001 cache: Arc<LanceCache>,
1002 ) -> Self {
1003 Self {
1004 root_scheduler: RootScheduler::Legacy(root_scheduler),
1005 root_fields,
1006 cache,
1007 }
1008 }
1009
1010 fn do_schedule_ranges_structural(
1011 &mut self,
1012 ranges: &[Range<u64>],
1013 filter: &FilterExpression,
1014 io: Arc<dyn EncodingsIo>,
1015 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1016 ) {
1017 let root_scheduler = self.root_scheduler.as_structural();
1018 let mut context = SchedulerContext::new(io, self.cache.clone());
1019 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1020 if let Err(schedule_ranges_err) = maybe_root_job {
1021 schedule_action(Err(schedule_ranges_err));
1022 return;
1023 }
1024 let mut root_job = maybe_root_job.unwrap();
1025 let mut num_rows_scheduled = 0;
1026 loop {
1027 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1028 if let Err(err) = maybe_next_scan_line {
1029 schedule_action(Err(err));
1030 return;
1031 }
1032 let next_scan_line = maybe_next_scan_line.unwrap();
1033 match next_scan_line {
1034 Some(next_scan_line) => {
1035 trace!(
1036 "Scheduled scan line of {} rows and {} decoders",
1037 next_scan_line.rows_scheduled,
1038 next_scan_line.decoders.len()
1039 );
1040 num_rows_scheduled += next_scan_line.rows_scheduled;
1041 if !schedule_action(Ok(DecoderMessage {
1042 scheduled_so_far: num_rows_scheduled,
1043 decoders: next_scan_line.decoders,
1044 })) {
1045 return;
1047 }
1048 }
1049 None => return,
1050 }
1051 }
1052 }
1053
1054 fn do_schedule_ranges_legacy(
1055 &mut self,
1056 ranges: &[Range<u64>],
1057 filter: &FilterExpression,
1058 io: Arc<dyn EncodingsIo>,
1059 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1060 priority: Option<Box<dyn PriorityRange>>,
1064 ) {
1065 let root_scheduler = self.root_scheduler.as_legacy();
1066 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1067 trace!(
1068 "Scheduling {} ranges across {}..{} ({} rows){}",
1069 ranges.len(),
1070 ranges.first().unwrap().start,
1071 ranges.last().unwrap().end,
1072 rows_requested,
1073 priority
1074 .as_ref()
1075 .map(|p| format!(" (priority={:?})", p))
1076 .unwrap_or_default()
1077 );
1078
1079 let mut context = SchedulerContext::new(io, self.cache.clone());
1080 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1081 if let Err(schedule_ranges_err) = maybe_root_job {
1082 schedule_action(Err(schedule_ranges_err));
1083 return;
1084 }
1085 let mut root_job = maybe_root_job.unwrap();
1086 let mut num_rows_scheduled = 0;
1087 let mut rows_to_schedule = root_job.num_rows();
1088 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1089 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1090 while rows_to_schedule > 0 {
1091 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1092 if let Err(schedule_next_err) = maybe_next_scan_line {
1093 schedule_action(Err(schedule_next_err));
1094 return;
1095 }
1096 let next_scan_line = maybe_next_scan_line.unwrap();
1097 priority.advance(next_scan_line.rows_scheduled);
1098 num_rows_scheduled += next_scan_line.rows_scheduled;
1099 rows_to_schedule -= next_scan_line.rows_scheduled;
1100 trace!(
1101 "Scheduled scan line of {} rows and {} decoders",
1102 next_scan_line.rows_scheduled,
1103 next_scan_line.decoders.len()
1104 );
1105 if !schedule_action(Ok(DecoderMessage {
1106 scheduled_so_far: num_rows_scheduled,
1107 decoders: next_scan_line.decoders,
1108 })) {
1109 return;
1111 }
1112
1113 trace!("Finished scheduling {} ranges", ranges.len());
1114 }
1115 }
1116
1117 fn do_schedule_ranges(
1118 &mut self,
1119 ranges: &[Range<u64>],
1120 filter: &FilterExpression,
1121 io: Arc<dyn EncodingsIo>,
1122 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1123 priority: Option<Box<dyn PriorityRange>>,
1127 ) {
1128 match &self.root_scheduler {
1129 RootScheduler::Legacy(_) => {
1130 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1131 }
1132 RootScheduler::Structural(_) => {
1133 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1134 }
1135 }
1136 }
1137
1138 pub fn schedule_ranges_to_vec(
1141 &mut self,
1142 ranges: &[Range<u64>],
1143 filter: &FilterExpression,
1144 io: Arc<dyn EncodingsIo>,
1145 priority: Option<Box<dyn PriorityRange>>,
1146 ) -> Result<Vec<DecoderMessage>> {
1147 let mut decode_messages = Vec::new();
1148 self.do_schedule_ranges(
1149 ranges,
1150 filter,
1151 io,
1152 |msg| {
1153 decode_messages.push(msg);
1154 true
1155 },
1156 priority,
1157 );
1158 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1159 }
1160
1161 #[instrument(skip_all)]
1171 pub fn schedule_ranges(
1172 &mut self,
1173 ranges: &[Range<u64>],
1174 filter: &FilterExpression,
1175 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1176 scheduler: Arc<dyn EncodingsIo>,
1177 ) {
1178 self.do_schedule_ranges(
1179 ranges,
1180 filter,
1181 scheduler,
1182 |msg| {
1183 match sink.send(msg) {
1184 Ok(_) => true,
1185 Err(SendError { .. }) => {
1186 debug!(
1189 "schedule_ranges aborting early since decoder appears to have been dropped"
1190 );
1191 false
1192 }
1193 }
1194 },
1195 None,
1196 )
1197 }
1198
1199 #[instrument(skip_all)]
1207 pub fn schedule_range(
1208 &mut self,
1209 range: Range<u64>,
1210 filter: &FilterExpression,
1211 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1212 scheduler: Arc<dyn EncodingsIo>,
1213 ) {
1214 self.schedule_ranges(&[range], filter, sink, scheduler)
1215 }
1216
1217 pub fn schedule_take(
1225 &mut self,
1226 indices: &[u64],
1227 filter: &FilterExpression,
1228 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1229 scheduler: Arc<dyn EncodingsIo>,
1230 ) {
1231 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1232 if indices.is_empty() {
1233 return;
1234 }
1235 trace!("Scheduling take of {} rows", indices.len());
1236 let ranges = Self::indices_to_ranges(indices);
1237 self.schedule_ranges(&ranges, filter, sink, scheduler)
1238 }
1239
1240 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1242 let mut ranges = Vec::new();
1243 let mut start = indices[0];
1244
1245 for window in indices.windows(2) {
1246 if window[1] != window[0] + 1 {
1247 ranges.push(start..window[0] + 1);
1248 start = window[1];
1249 }
1250 }
1251
1252 ranges.push(start..*indices.last().unwrap() + 1);
1253 ranges
1254 }
1255}
1256
1257pub struct ReadBatchTask {
1258 pub task: BoxFuture<'static, Result<RecordBatch>>,
1259 pub num_rows: u32,
1260}
1261
1262pub struct BatchDecodeStream {
1264 context: DecoderContext,
1265 root_decoder: SimpleStructDecoder,
1266 rows_remaining: u64,
1267 rows_per_batch: u32,
1268 rows_scheduled: u64,
1269 rows_drained: u64,
1270 scheduler_exhausted: bool,
1271 emitted_batch_size_warning: Arc<Once>,
1272}
1273
1274impl BatchDecodeStream {
1275 pub fn new(
1286 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1287 rows_per_batch: u32,
1288 num_rows: u64,
1289 root_decoder: SimpleStructDecoder,
1290 ) -> Self {
1291 Self {
1292 context: DecoderContext::new(scheduled),
1293 root_decoder,
1294 rows_remaining: num_rows,
1295 rows_per_batch,
1296 rows_scheduled: 0,
1297 rows_drained: 0,
1298 scheduler_exhausted: false,
1299 emitted_batch_size_warning: Arc::new(Once::new()),
1300 }
1301 }
1302
1303 fn accept_decoder(&mut self, decoder: crate::v2::decoder::DecoderReady) -> Result<()> {
1304 if decoder.path.is_empty() {
1305 Ok(())
1307 } else {
1308 self.root_decoder.accept_child(decoder)
1309 }
1310 }
1311
1312 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1313 if self.scheduler_exhausted {
1314 return Ok(self.rows_scheduled);
1315 }
1316 while self.rows_scheduled < scheduled_need {
1317 let next_message = self.context.source.recv().await;
1318 match next_message {
1319 Some(scan_line) => {
1320 let scan_line = scan_line?;
1321 self.rows_scheduled = scan_line.scheduled_so_far;
1322 for message in scan_line.decoders {
1323 self.accept_decoder(message.into_legacy())?;
1324 }
1325 }
1326 None => {
1327 self.scheduler_exhausted = true;
1331 return Ok(self.rows_scheduled);
1332 }
1333 }
1334 }
1335 Ok(scheduled_need)
1336 }
1337
1338 #[instrument(level = "debug", skip_all)]
1339 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1340 trace!(
1341 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1342 self.rows_remaining,
1343 self.rows_drained,
1344 self.rows_scheduled,
1345 );
1346 if self.rows_remaining == 0 {
1347 return Ok(None);
1348 }
1349
1350 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1351 self.rows_remaining -= to_take;
1352
1353 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1354 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1355 if scheduled_need > 0 {
1356 let desired_scheduled = scheduled_need + self.rows_scheduled;
1357 trace!(
1358 "Draining from scheduler (desire at least {} scheduled rows)",
1359 desired_scheduled
1360 );
1361 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1362 if actually_scheduled < desired_scheduled {
1363 let under_scheduled = desired_scheduled - actually_scheduled;
1364 to_take -= under_scheduled;
1365 }
1366 }
1367
1368 if to_take == 0 {
1369 return Ok(None);
1370 }
1371
1372 let loaded_need = self.rows_drained + to_take - 1;
1374 trace!(
1375 "Waiting for I/O (desire at least {} fully loaded rows)",
1376 loaded_need
1377 );
1378 self.root_decoder.wait_for_loaded(loaded_need).await?;
1379
1380 let next_task = self.root_decoder.drain(to_take)?;
1381 self.rows_drained += to_take;
1382 Ok(Some(next_task))
1383 }
1384
1385 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1386 let stream = futures::stream::unfold(self, |mut slf| async move {
1387 let next_task = slf.next_batch_task().await;
1388 let next_task = next_task.transpose().map(|next_task| {
1389 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1390 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1391 let task = async move {
1392 let next_task = next_task?;
1393 next_task.into_batch(emitted_batch_size_warning)
1394 };
1395 (task, num_rows)
1396 });
1397 next_task.map(|(task, num_rows)| {
1398 debug_assert!(num_rows <= u32::MAX as u64);
1400 let next_task = ReadBatchTask {
1401 task: task.boxed(),
1402 num_rows: num_rows as u32,
1403 };
1404 (next_task, slf)
1405 })
1406 });
1407 stream.boxed()
1408 }
1409}
1410
1411enum RootDecoderMessage {
1414 LoadedPage(LoadedPage),
1415 LegacyPage(crate::v2::decoder::DecoderReady),
1416}
1417trait RootDecoderType {
1418 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1419 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1420 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1421}
1422impl RootDecoderType for StructuralStructDecoder {
1423 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1424 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1425 unreachable!()
1426 };
1427 self.accept_page(loaded_page)
1428 }
1429 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1430 self.drain_batch_task(num_rows)
1431 }
1432 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1433 Ok(())
1435 }
1436}
1437impl RootDecoderType for SimpleStructDecoder {
1438 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1439 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1440 unreachable!()
1441 };
1442 self.accept_child(legacy_page)
1443 }
1444 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1445 self.drain(num_rows)
1446 }
1447 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1448 runtime.block_on(self.wait_for_loaded(loaded_need))
1449 }
1450}
1451
1452struct BatchDecodeIterator<T: RootDecoderType> {
1454 messages: VecDeque<Result<DecoderMessage>>,
1455 root_decoder: T,
1456 rows_remaining: u64,
1457 rows_per_batch: u32,
1458 rows_scheduled: u64,
1459 rows_drained: u64,
1460 emitted_batch_size_warning: Arc<Once>,
1461 wait_for_io_runtime: tokio::runtime::Runtime,
1465 schema: Arc<ArrowSchema>,
1466}
1467
1468impl<T: RootDecoderType> BatchDecodeIterator<T> {
1469 pub fn new(
1471 messages: VecDeque<Result<DecoderMessage>>,
1472 rows_per_batch: u32,
1473 num_rows: u64,
1474 root_decoder: T,
1475 schema: Arc<ArrowSchema>,
1476 ) -> Self {
1477 Self {
1478 messages,
1479 root_decoder,
1480 rows_remaining: num_rows,
1481 rows_per_batch,
1482 rows_scheduled: 0,
1483 rows_drained: 0,
1484 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1485 .build()
1486 .unwrap(),
1487 emitted_batch_size_warning: Arc::new(Once::new()),
1488 schema,
1489 }
1490 }
1491
1492 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1497 match maybe_done(unloaded_page.0) {
1498 MaybeDone::Done(loaded_page) => loaded_page,
1500 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1502 MaybeDone::Gone => unreachable!(),
1503 }
1504 }
1505
1506 #[instrument(skip_all)]
1511 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1512 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1513 let message = self.messages.pop_front().unwrap()?;
1514 self.rows_scheduled = message.scheduled_so_far;
1515 for decoder_message in message.decoders {
1516 match decoder_message {
1517 MessageType::UnloadedPage(unloaded_page) => {
1518 let loaded_page = self.wait_for_page(unloaded_page)?;
1519 self.root_decoder
1520 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1521 }
1522 MessageType::DecoderReady(decoder_ready) => {
1523 if !decoder_ready.path.is_empty() {
1525 self.root_decoder
1526 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1527 }
1528 }
1529 }
1530 }
1531 }
1532
1533 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1534
1535 self.root_decoder
1536 .wait(loaded_need, &self.wait_for_io_runtime)?;
1537 Ok(self.rows_scheduled)
1538 }
1539
1540 #[instrument(level = "debug", skip_all)]
1541 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1542 trace!(
1543 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1544 self.rows_remaining,
1545 self.rows_drained,
1546 self.rows_scheduled,
1547 );
1548 if self.rows_remaining == 0 {
1549 return Ok(None);
1550 }
1551
1552 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1553 self.rows_remaining -= to_take;
1554
1555 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1556 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1557 if scheduled_need > 0 {
1558 let desired_scheduled = scheduled_need + self.rows_scheduled;
1559 trace!(
1560 "Draining from scheduler (desire at least {} scheduled rows)",
1561 desired_scheduled
1562 );
1563 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1564 if actually_scheduled < desired_scheduled {
1565 let under_scheduled = desired_scheduled - actually_scheduled;
1566 to_take -= under_scheduled;
1567 }
1568 }
1569
1570 if to_take == 0 {
1571 return Ok(None);
1572 }
1573
1574 let next_task = self.root_decoder.drain_batch(to_take)?;
1575
1576 self.rows_drained += to_take;
1577
1578 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1579
1580 Ok(Some(batch))
1581 }
1582}
1583
1584impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1585 type Item = ArrowResult<RecordBatch>;
1586
1587 fn next(&mut self) -> Option<Self::Item> {
1588 self.next_batch_task()
1589 .transpose()
1590 .map(|r| r.map_err(ArrowError::from))
1591 }
1592}
1593
1594impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1595 fn schema(&self) -> Arc<ArrowSchema> {
1596 self.schema.clone()
1597 }
1598}
1599
1600pub struct StructuralBatchDecodeStream {
1602 context: DecoderContext,
1603 root_decoder: StructuralStructDecoder,
1604 rows_remaining: u64,
1605 rows_per_batch: u32,
1606 rows_scheduled: u64,
1607 rows_drained: u64,
1608 scheduler_exhausted: bool,
1609 emitted_batch_size_warning: Arc<Once>,
1610}
1611
1612impl StructuralBatchDecodeStream {
1613 pub fn new(
1624 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1625 rows_per_batch: u32,
1626 num_rows: u64,
1627 root_decoder: StructuralStructDecoder,
1628 ) -> Self {
1629 Self {
1630 context: DecoderContext::new(scheduled),
1631 root_decoder,
1632 rows_remaining: num_rows,
1633 rows_per_batch,
1634 rows_scheduled: 0,
1635 rows_drained: 0,
1636 scheduler_exhausted: false,
1637 emitted_batch_size_warning: Arc::new(Once::new()),
1638 }
1639 }
1640
1641 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1642 if self.scheduler_exhausted {
1643 return Ok(self.rows_scheduled);
1644 }
1645 while self.rows_scheduled < scheduled_need {
1646 let next_message = self.context.source.recv().await;
1647 match next_message {
1648 Some(scan_line) => {
1649 let scan_line = scan_line?;
1650 self.rows_scheduled = scan_line.scheduled_so_far;
1651 for message in scan_line.decoders {
1652 let unloaded_page = message.into_structural();
1653 let loaded_page = unloaded_page.0.await?;
1654 self.root_decoder.accept_page(loaded_page)?;
1655 }
1656 }
1657 None => {
1658 self.scheduler_exhausted = true;
1662 return Ok(self.rows_scheduled);
1663 }
1664 }
1665 }
1666 Ok(scheduled_need)
1667 }
1668
1669 #[instrument(level = "debug", skip_all)]
1670 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1671 trace!(
1672 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1673 self.rows_remaining,
1674 self.rows_drained,
1675 self.rows_scheduled,
1676 );
1677 if self.rows_remaining == 0 {
1678 return Ok(None);
1679 }
1680
1681 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1682 self.rows_remaining -= to_take;
1683
1684 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1685 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1686 if scheduled_need > 0 {
1687 let desired_scheduled = scheduled_need + self.rows_scheduled;
1688 trace!(
1689 "Draining from scheduler (desire at least {} scheduled rows)",
1690 desired_scheduled
1691 );
1692 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1693 if actually_scheduled < desired_scheduled {
1694 let under_scheduled = desired_scheduled - actually_scheduled;
1695 to_take -= under_scheduled;
1696 }
1697 }
1698
1699 if to_take == 0 {
1700 return Ok(None);
1701 }
1702
1703 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1704 self.rows_drained += to_take;
1705 Ok(Some(next_task))
1706 }
1707
1708 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1709 let stream = futures::stream::unfold(self, |mut slf| async move {
1710 let next_task = slf.next_batch_task().await;
1711 let next_task = next_task.transpose().map(|next_task| {
1712 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1713 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1714 let task = async move {
1715 let next_task = next_task?;
1716 next_task.into_batch(emitted_batch_size_warning)
1717 };
1718 (task, num_rows)
1719 });
1720 next_task.map(|(task, num_rows)| {
1721 debug_assert!(num_rows <= u32::MAX as u64);
1723 let next_task = ReadBatchTask {
1724 task: task.boxed(),
1725 num_rows: num_rows as u32,
1726 };
1727 (next_task, slf)
1728 })
1729 });
1730 stream.boxed()
1731 }
1732}
1733
1734#[derive(Debug)]
1735pub enum RequestedRows {
1736 Ranges(Vec<Range<u64>>),
1737 Indices(Vec<u64>),
1738}
1739
1740impl RequestedRows {
1741 pub fn num_rows(&self) -> u64 {
1742 match self {
1743 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1744 Self::Indices(indices) => indices.len() as u64,
1745 }
1746 }
1747}
1748
1749#[derive(Debug, Clone)]
1750pub struct SchedulerDecoderConfig {
1751 pub decoder_plugins: Arc<DecoderPlugins>,
1752 pub batch_size: u32,
1753 pub io: Arc<dyn EncodingsIo>,
1754 pub cache: Arc<LanceCache>,
1755 pub should_validate: bool,
1756 pub cache_repetition_index: bool,
1758}
1759
1760fn check_scheduler_on_drop(
1761 stream: BoxStream<'static, ReadBatchTask>,
1762 scheduler_handle: tokio::task::JoinHandle<()>,
1763) -> BoxStream<'static, ReadBatchTask> {
1764 let mut scheduler_handle = Some(scheduler_handle);
1768 let check_scheduler = stream::unfold((), move |_| {
1769 let handle = scheduler_handle.take();
1770 async move {
1771 if let Some(handle) = handle {
1772 handle.await.unwrap();
1773 }
1774 None
1775 }
1776 });
1777 stream.chain(check_scheduler).boxed()
1778}
1779
1780pub fn create_decode_stream(
1781 schema: &Schema,
1782 num_rows: u64,
1783 batch_size: u32,
1784 is_structural: bool,
1785 should_validate: bool,
1786 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1787) -> BoxStream<'static, ReadBatchTask> {
1788 if is_structural {
1789 let arrow_schema = ArrowSchema::from(schema);
1790 let structural_decoder = StructuralStructDecoder::new(
1791 arrow_schema.fields,
1792 should_validate,
1793 true,
1794 );
1795 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1796 } else {
1797 let arrow_schema = ArrowSchema::from(schema);
1798 let root_fields = arrow_schema.fields;
1799
1800 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1801 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1802 }
1803}
1804
1805pub fn create_decode_iterator(
1809 schema: &Schema,
1810 num_rows: u64,
1811 batch_size: u32,
1812 should_validate: bool,
1813 is_structural: bool,
1814 messages: VecDeque<Result<DecoderMessage>>,
1815) -> Box<dyn RecordBatchReader + Send + 'static> {
1816 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1817 let root_fields = arrow_schema.fields.clone();
1818 if is_structural {
1819 let simple_struct_decoder =
1820 StructuralStructDecoder::new(root_fields, should_validate, true);
1821 Box::new(BatchDecodeIterator::new(
1822 messages,
1823 batch_size,
1824 num_rows,
1825 simple_struct_decoder,
1826 arrow_schema,
1827 ))
1828 } else {
1829 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1830 Box::new(BatchDecodeIterator::new(
1831 messages,
1832 batch_size,
1833 num_rows,
1834 root_decoder,
1835 arrow_schema,
1836 ))
1837 }
1838}
1839
1840fn create_scheduler_decoder(
1841 column_infos: Vec<Arc<ColumnInfo>>,
1842 requested_rows: RequestedRows,
1843 filter: FilterExpression,
1844 column_indices: Vec<u32>,
1845 target_schema: Arc<Schema>,
1846 config: SchedulerDecoderConfig,
1847) -> Result<BoxStream<'static, ReadBatchTask>> {
1848 let num_rows = requested_rows.num_rows();
1849
1850 let is_structural = column_infos[0].is_structural();
1851
1852 let (tx, rx) = mpsc::unbounded_channel();
1853
1854 let decode_stream = create_decode_stream(
1855 &target_schema,
1856 num_rows,
1857 config.batch_size,
1858 is_structural,
1859 config.should_validate,
1860 rx,
1861 );
1862
1863 let scheduler_handle = tokio::task::spawn(async move {
1864 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1865 target_schema.as_ref(),
1866 &column_indices,
1867 &column_infos,
1868 &vec![],
1869 num_rows,
1870 config.decoder_plugins,
1871 config.io.clone(),
1872 config.cache,
1873 &filter,
1874 config.cache_repetition_index,
1875 )
1876 .await
1877 {
1878 Ok(scheduler) => scheduler,
1879 Err(e) => {
1880 let _ = tx.send(Err(e));
1881 return;
1882 }
1883 };
1884
1885 match requested_rows {
1886 RequestedRows::Ranges(ranges) => {
1887 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1888 }
1889 RequestedRows::Indices(indices) => {
1890 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1891 }
1892 }
1893 });
1894
1895 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1896}
1897
1898pub fn schedule_and_decode(
1904 column_infos: Vec<Arc<ColumnInfo>>,
1905 requested_rows: RequestedRows,
1906 filter: FilterExpression,
1907 column_indices: Vec<u32>,
1908 target_schema: Arc<Schema>,
1909 config: SchedulerDecoderConfig,
1910) -> BoxStream<'static, ReadBatchTask> {
1911 if requested_rows.num_rows() == 0 {
1912 return stream::empty().boxed();
1913 }
1914 match create_scheduler_decoder(
1918 column_infos,
1919 requested_rows,
1920 filter,
1921 column_indices,
1922 target_schema,
1923 config,
1924 ) {
1925 Ok(stream) => stream,
1927 Err(e) => stream::once(std::future::ready(ReadBatchTask {
1928 num_rows: 0,
1929 task: std::future::ready(Err(e)).boxed(),
1930 }))
1931 .boxed(),
1932 }
1933}
1934
1935pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
1936 tokio::runtime::Builder::new_current_thread()
1937 .build()
1938 .unwrap()
1939});
1940
1941pub fn schedule_and_decode_blocking(
1956 column_infos: Vec<Arc<ColumnInfo>>,
1957 requested_rows: RequestedRows,
1958 filter: FilterExpression,
1959 column_indices: Vec<u32>,
1960 target_schema: Arc<Schema>,
1961 config: SchedulerDecoderConfig,
1962) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1963 if requested_rows.num_rows() == 0 {
1964 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
1965 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
1966 }
1967
1968 let num_rows = requested_rows.num_rows();
1969 let is_structural = column_infos[0].is_structural();
1970
1971 let (tx, mut rx) = mpsc::unbounded_channel();
1972
1973 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
1976 target_schema.as_ref(),
1977 &column_indices,
1978 &column_infos,
1979 &vec![],
1980 num_rows,
1981 config.decoder_plugins,
1982 config.io.clone(),
1983 config.cache,
1984 &filter,
1985 config.cache_repetition_index,
1986 ))?;
1987
1988 match requested_rows {
1990 RequestedRows::Ranges(ranges) => {
1991 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1992 }
1993 RequestedRows::Indices(indices) => {
1994 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1995 }
1996 }
1997
1998 let mut messages = Vec::new();
2000 while rx
2001 .recv_many(&mut messages, usize::MAX)
2002 .now_or_never()
2003 .unwrap()
2004 != 0
2005 {}
2006
2007 let decode_iterator = create_decode_iterator(
2009 &target_schema,
2010 num_rows,
2011 config.batch_size,
2012 config.should_validate,
2013 is_structural,
2014 messages.into(),
2015 );
2016
2017 Ok(decode_iterator)
2018}
2019
2020pub trait PrimitivePageDecoder: Send + Sync {
2032 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2064}
2065
2066pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2075 fn schedule_ranges(
2087 &self,
2088 ranges: &[Range<u64>],
2089 scheduler: &Arc<dyn EncodingsIo>,
2090 top_level_row: u64,
2091 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2092}
2093
2094pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2096 fn advance(&mut self, num_rows: u64);
2097 fn current_priority(&self) -> u64;
2098 fn box_clone(&self) -> Box<dyn PriorityRange>;
2099}
2100
2101#[derive(Debug)]
2104pub struct SimplePriorityRange {
2105 priority: u64,
2106}
2107
2108impl SimplePriorityRange {
2109 fn new(priority: u64) -> Self {
2110 Self { priority }
2111 }
2112}
2113
2114impl PriorityRange for SimplePriorityRange {
2115 fn advance(&mut self, num_rows: u64) {
2116 self.priority += num_rows;
2117 }
2118
2119 fn current_priority(&self) -> u64 {
2120 self.priority
2121 }
2122
2123 fn box_clone(&self) -> Box<dyn PriorityRange> {
2124 Box::new(Self {
2125 priority: self.priority,
2126 })
2127 }
2128}
2129
2130pub struct ListPriorityRange {
2143 base: Box<dyn PriorityRange>,
2144 offsets: Arc<[u64]>,
2145 cur_index_into_offsets: usize,
2146 cur_position: u64,
2147}
2148
2149impl ListPriorityRange {
2150 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2151 Self {
2152 base,
2153 offsets,
2154 cur_index_into_offsets: 0,
2155 cur_position: 0,
2156 }
2157 }
2158}
2159
2160impl std::fmt::Debug for ListPriorityRange {
2161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2162 f.debug_struct("ListPriorityRange")
2163 .field("base", &self.base)
2164 .field("offsets.len()", &self.offsets.len())
2165 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2166 .field("cur_position", &self.cur_position)
2167 .finish()
2168 }
2169}
2170
2171impl PriorityRange for ListPriorityRange {
2172 fn advance(&mut self, num_rows: u64) {
2173 self.cur_position += num_rows;
2176 let mut idx_into_offsets = self.cur_index_into_offsets;
2177 while idx_into_offsets + 1 < self.offsets.len()
2178 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2179 {
2180 idx_into_offsets += 1;
2181 }
2182 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2183 self.cur_index_into_offsets = idx_into_offsets;
2184 self.base.advance(base_rows_advanced as u64);
2185 }
2186
2187 fn current_priority(&self) -> u64 {
2188 self.base.current_priority()
2189 }
2190
2191 fn box_clone(&self) -> Box<dyn PriorityRange> {
2192 Box::new(Self {
2193 base: self.base.box_clone(),
2194 offsets: self.offsets.clone(),
2195 cur_index_into_offsets: self.cur_index_into_offsets,
2196 cur_position: self.cur_position,
2197 })
2198 }
2199}
2200
2201pub struct SchedulerContext {
2203 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2204 io: Arc<dyn EncodingsIo>,
2205 cache: Arc<LanceCache>,
2206 name: String,
2207 path: Vec<u32>,
2208 path_names: Vec<String>,
2209}
2210
2211pub struct ScopedSchedulerContext<'a> {
2212 pub context: &'a mut SchedulerContext,
2213}
2214
2215impl<'a> ScopedSchedulerContext<'a> {
2216 pub fn pop(self) -> &'a mut SchedulerContext {
2217 self.context.pop();
2218 self.context
2219 }
2220}
2221
2222impl SchedulerContext {
2223 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2224 Self {
2225 io,
2226 cache,
2227 recv: None,
2228 name: "".to_string(),
2229 path: Vec::new(),
2230 path_names: Vec::new(),
2231 }
2232 }
2233
2234 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2235 &self.io
2236 }
2237
2238 pub fn cache(&self) -> &Arc<LanceCache> {
2239 &self.cache
2240 }
2241
2242 pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2243 self.path.push(index);
2244 self.path_names.push(name.to_string());
2245 ScopedSchedulerContext { context: self }
2246 }
2247
2248 pub fn pop(&mut self) {
2249 self.path.pop();
2250 self.path_names.pop();
2251 }
2252
2253 pub fn path_name(&self) -> String {
2254 let path = self.path_names.join("/");
2255 if self.recv.is_some() {
2256 format!("TEMP({}){}", self.name, path)
2257 } else {
2258 format!("ROOT{}", path)
2259 }
2260 }
2261
2262 pub fn current_path(&self) -> VecDeque<u32> {
2263 VecDeque::from_iter(self.path.iter().copied())
2264 }
2265
2266 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2267 pub fn locate_decoder(
2268 &mut self,
2269 decoder: Box<dyn crate::v2::decoder::LogicalPageDecoder>,
2270 ) -> crate::v2::decoder::DecoderReady {
2271 trace!(
2272 "Scheduling decoder of type {:?} for {:?}",
2273 decoder.data_type(),
2274 self.path,
2275 );
2276 crate::v2::decoder::DecoderReady {
2277 decoder,
2278 path: self.current_path(),
2279 }
2280 }
2281}
2282
2283pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2284
2285impl std::fmt::Debug for UnloadedPage {
2286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2287 f.debug_struct("UnloadedPage").finish()
2288 }
2289}
2290
2291#[derive(Debug)]
2292pub struct ScheduledScanLine {
2293 pub rows_scheduled: u64,
2294 pub decoders: Vec<MessageType>,
2295}
2296
2297pub trait StructuralSchedulingJob: std::fmt::Debug {
2298 fn schedule_next(
2299 &mut self,
2300 context: &mut SchedulerContext,
2301 ) -> Result<Option<ScheduledScanLine>>;
2302}
2303
2304pub struct FilterExpression(pub Bytes);
2312
2313impl FilterExpression {
2314 pub fn no_filter() -> Self {
2319 Self(Bytes::new())
2320 }
2321
2322 pub fn is_noop(&self) -> bool {
2324 self.0.is_empty()
2325 }
2326}
2327
2328pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2329 fn initialize<'a>(
2330 &'a mut self,
2331 filter: &'a FilterExpression,
2332 context: &'a SchedulerContext,
2333 ) -> BoxFuture<'a, Result<()>>;
2334 fn schedule_ranges<'a>(
2335 &'a self,
2336 ranges: &[Range<u64>],
2337 filter: &FilterExpression,
2338 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2339}
2340
2341pub trait DecodeArrayTask: Send {
2343 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2345}
2346
2347impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2348 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2349 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2350 }
2351}
2352
2353pub struct NextDecodeTask {
2358 pub task: Box<dyn DecodeArrayTask>,
2360 pub num_rows: u64,
2362}
2363
2364impl NextDecodeTask {
2365 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2370 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2371 let struct_arr = self.task.decode();
2372 match struct_arr {
2373 Ok(struct_arr) => {
2374 let batch = RecordBatch::from(struct_arr.as_struct());
2375 let size_bytes = batch.get_array_memory_size() as u64;
2376 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2377 emitted_batch_size_warning.call_once(|| {
2378 let size_mb = size_bytes / 1024 / 1024;
2379 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2380 });
2381 }
2382 Ok(batch)
2383 }
2384 Err(e) => {
2385 let e = Error::Internal {
2386 message: format!("Error decoding batch: {}", e),
2387 location: location!(),
2388 };
2389 Err(e)
2390 }
2391 }
2392 }
2393}
2394
2395#[derive(Debug)]
2399pub enum MessageType {
2400 DecoderReady(crate::v2::decoder::DecoderReady),
2405 UnloadedPage(UnloadedPage),
2409}
2410
2411impl MessageType {
2412 pub fn into_legacy(self) -> crate::v2::decoder::DecoderReady {
2413 match self {
2414 Self::DecoderReady(decoder) => decoder,
2415 Self::UnloadedPage(_) => {
2416 panic!("Expected DecoderReady but got UnloadedPage")
2417 }
2418 }
2419 }
2420
2421 pub fn into_structural(self) -> UnloadedPage {
2422 match self {
2423 Self::UnloadedPage(unloaded) => unloaded,
2424 Self::DecoderReady(_) => {
2425 panic!("Expected UnloadedPage but got DecoderReady")
2426 }
2427 }
2428 }
2429}
2430
2431pub struct DecoderMessage {
2432 pub scheduled_so_far: u64,
2433 pub decoders: Vec<MessageType>,
2434}
2435
2436pub struct DecoderContext {
2437 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2438}
2439
2440impl DecoderContext {
2441 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2442 Self { source }
2443 }
2444}
2445
2446pub struct DecodedPage {
2447 pub data: DataBlock,
2448 pub repdef: RepDefUnraveler,
2449}
2450
2451pub trait DecodePageTask: Send + std::fmt::Debug {
2452 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2454}
2455
2456pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2457 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2458 fn num_rows(&self) -> u64;
2459}
2460
2461#[derive(Debug)]
2462pub struct LoadedPage {
2463 pub decoder: Box<dyn StructuralPageDecoder>,
2465 pub path: VecDeque<u32>,
2484 pub page_index: usize,
2485}
2486
2487pub struct DecodedArray {
2488 pub array: ArrayRef,
2489 pub repdef: CompositeRepDefUnraveler,
2490}
2491
2492pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2493 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2494}
2495
2496pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2497 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2502 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2504 fn data_type(&self) -> &DataType;
2506}
2507
2508#[derive(Debug, Default)]
2509pub struct DecoderPlugins {}
2510
2511pub async fn decode_batch(
2513 batch: &EncodedBatch,
2514 filter: &FilterExpression,
2515 decoder_plugins: Arc<DecoderPlugins>,
2516 should_validate: bool,
2517 version: LanceFileVersion,
2518 cache: Option<Arc<LanceCache>>,
2519) -> Result<RecordBatch> {
2520 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2525 let cache = cache.unwrap_or_else(|| Arc::new(LanceCache::with_capacity(128 * 1024 * 1024)));
2526 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2527 batch.schema.as_ref(),
2528 &batch.top_level_columns,
2529 &batch.page_table,
2530 &vec![],
2531 batch.num_rows,
2532 decoder_plugins,
2533 io_scheduler.clone(),
2534 cache,
2535 filter,
2536 false, )
2538 .await?;
2539 let (tx, rx) = unbounded_channel();
2540 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2541 let is_structural = version >= LanceFileVersion::V2_1;
2542 let mut decode_stream = create_decode_stream(
2543 &batch.schema,
2544 batch.num_rows,
2545 batch.num_rows as u32,
2546 is_structural,
2547 should_validate,
2548 rx,
2549 );
2550 decode_stream.next().await.unwrap().task.await
2551}
2552
2553#[cfg(test)]
2554mod tests {
2556 use super::*;
2557
2558 #[test]
2559 fn test_coalesce_indices_to_ranges_with_single_index() {
2560 let indices = vec![1];
2561 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2562 assert_eq!(ranges, vec![1..2]);
2563 }
2564
2565 #[test]
2566 fn test_coalesce_indices_to_ranges() {
2567 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2568 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2569 assert_eq!(ranges, vec![1..10]);
2570 }
2571
2572 #[test]
2573 fn test_coalesce_indices_to_ranges_with_gaps() {
2574 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2575 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2576 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2577 }
2578}