1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::{instrument, Instrument};
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
245use crate::v2::decoder::LogicalPageDecoder;
246use crate::v2::encodings::logical::list::OffsetPageInfo;
247use crate::v2::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::v2::encodings::logical::{
249 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250 primitive::PrimitiveFieldScheduler,
251};
252use crate::version::LanceFileVersion;
253use crate::{BufferScheduler, EncodingsIo};
254
255const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
257
258#[derive(Debug)]
265pub enum PageEncoding {
266 Legacy(pb::ArrayEncoding),
267 Structural(pb::PageLayout),
268}
269
270impl PageEncoding {
271 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
272 match self {
273 Self::Legacy(enc) => enc,
274 Self::Structural(_) => panic!("Expected a legacy encoding"),
275 }
276 }
277
278 pub fn as_structural(&self) -> &pb::PageLayout {
279 match self {
280 Self::Structural(enc) => enc,
281 Self::Legacy(_) => panic!("Expected a structural encoding"),
282 }
283 }
284
285 pub fn is_structural(&self) -> bool {
286 matches!(self, Self::Structural(_))
287 }
288}
289
290#[derive(Debug)]
294pub struct PageInfo {
295 pub num_rows: u64,
297 pub priority: u64,
301 pub encoding: PageEncoding,
303 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
305}
306
307#[derive(Debug, Clone)]
311pub struct ColumnInfo {
312 pub index: u32,
314 pub page_infos: Arc<[PageInfo]>,
316 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
318 pub encoding: pb::ColumnEncoding,
319}
320
321impl ColumnInfo {
322 pub fn new(
324 index: u32,
325 page_infos: Arc<[PageInfo]>,
326 buffer_offsets_and_sizes: Vec<(u64, u64)>,
327 encoding: pb::ColumnEncoding,
328 ) -> Self {
329 Self {
330 index,
331 page_infos,
332 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
333 encoding,
334 }
335 }
336
337 pub fn is_structural(&self) -> bool {
338 self.page_infos
339 .first()
341 .map(|page| page.encoding.is_structural())
342 .unwrap_or(false)
343 }
344}
345
346enum RootScheduler {
347 Structural(Box<dyn StructuralFieldScheduler>),
348 Legacy(Arc<dyn crate::v2::decoder::FieldScheduler>),
349}
350
351impl RootScheduler {
352 fn as_legacy(&self) -> &Arc<dyn crate::v2::decoder::FieldScheduler> {
353 match self {
354 Self::Structural(_) => panic!("Expected a legacy scheduler"),
355 Self::Legacy(s) => s,
356 }
357 }
358
359 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
360 match self {
361 Self::Structural(s) => s.as_ref(),
362 Self::Legacy(_) => panic!("Expected a structural scheduler"),
363 }
364 }
365}
366
367pub struct DecodeBatchScheduler {
389 root_scheduler: RootScheduler,
390 pub root_fields: Fields,
391 cache: Arc<LanceCache>,
392}
393
394pub struct ColumnInfoIter<'a> {
395 column_infos: Vec<Arc<ColumnInfo>>,
396 column_indices: &'a [u32],
397 column_info_pos: usize,
398 column_indices_pos: usize,
399}
400
401impl<'a> ColumnInfoIter<'a> {
402 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
403 let initial_pos = column_indices[0] as usize;
404 Self {
405 column_infos,
406 column_indices,
407 column_info_pos: initial_pos,
408 column_indices_pos: 0,
409 }
410 }
411
412 pub fn peek(&self) -> &Arc<ColumnInfo> {
413 &self.column_infos[self.column_info_pos]
414 }
415
416 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
417 let column_info = self.column_infos[self.column_info_pos].clone();
418 let transformed = transform(column_info);
419 self.column_infos[self.column_info_pos] = transformed;
420 }
421
422 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
423 self.next().ok_or_else(|| {
424 Error::invalid_input(
425 "there were more fields in the schema than provided column indices / infos",
426 location!(),
427 )
428 })
429 }
430
431 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
432 if self.column_info_pos < self.column_infos.len() {
433 let info = &self.column_infos[self.column_info_pos];
434 self.column_info_pos += 1;
435 Some(info)
436 } else {
437 None
438 }
439 }
440
441 pub(crate) fn next_top_level(&mut self) {
442 self.column_indices_pos += 1;
443 if self.column_indices_pos < self.column_indices.len() {
444 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
445 } else {
446 self.column_info_pos = self.column_infos.len();
447 }
448 }
449}
450
451#[derive(Clone, Copy, Debug)]
453pub struct FileBuffers<'a> {
454 pub positions_and_sizes: &'a [(u64, u64)],
455}
456
457#[derive(Clone, Copy, Debug)]
459pub struct ColumnBuffers<'a, 'b> {
460 pub file_buffers: FileBuffers<'a>,
461 pub positions_and_sizes: &'b [(u64, u64)],
462}
463
464#[derive(Clone, Copy, Debug)]
466pub struct PageBuffers<'a, 'b, 'c> {
467 pub column_buffers: ColumnBuffers<'a, 'b>,
468 pub positions_and_sizes: &'c [(u64, u64)],
469}
470
471#[derive(Debug)]
473pub struct CoreFieldDecoderStrategy {
474 pub validate_data: bool,
475 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
476}
477
478impl Default for CoreFieldDecoderStrategy {
479 fn default() -> Self {
480 Self {
481 validate_data: false,
482 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
483 }
484 }
485}
486
487impl CoreFieldDecoderStrategy {
488 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
491 let column_encoding = column_info
492 .encoding
493 .column_encoding
494 .as_ref()
495 .ok_or_else(|| {
496 Error::invalid_input(
497 format!(
498 "the column at index {} was missing a ColumnEncoding",
499 column_info.index
500 ),
501 location!(),
502 )
503 })?;
504 if matches!(
505 column_encoding,
506 pb::column_encoding::ColumnEncoding::Values(_)
507 ) {
508 Ok(())
509 } else {
510 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
511 }
512 }
513
514 fn is_primitive(data_type: &DataType) -> bool {
515 if data_type.is_primitive() {
516 true
517 } else {
518 match data_type {
519 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
521 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
522 _ => false,
523 }
524 }
525 }
526
527 fn create_primitive_scheduler(
528 &self,
529 field: &Field,
530 column: &ColumnInfo,
531 buffers: FileBuffers,
532 ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
533 Self::ensure_values_encoded(column, &field.name)?;
534 let column_buffers = ColumnBuffers {
536 file_buffers: buffers,
537 positions_and_sizes: &column.buffer_offsets_and_sizes,
538 };
539 Ok(Box::new(PrimitiveFieldScheduler::new(
540 column.index,
541 field.data_type(),
542 column.page_infos.clone(),
543 column_buffers,
544 self.validate_data,
545 )))
546 }
547
548 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
550 Self::ensure_values_encoded(column_info, field_name)?;
551 if column_info.page_infos.len() != 1 {
552 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
553 }
554 let encoding = &column_info.page_infos[0].encoding;
555 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
556 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
557 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
558 }
559 }
560
561 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
562 let encoding = &column_info.page_infos[0].encoding;
563 matches!(
564 encoding.as_legacy().array_encoding.as_ref().unwrap(),
565 pb::array_encoding::ArrayEncoding::PackedStruct(_)
566 )
567 }
568
569 fn create_list_scheduler(
570 &self,
571 list_field: &Field,
572 column_infos: &mut ColumnInfoIter,
573 buffers: FileBuffers,
574 offsets_column: &ColumnInfo,
575 ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
576 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
577 let offsets_column_buffers = ColumnBuffers {
578 file_buffers: buffers,
579 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
580 };
581 let items_scheduler =
582 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
583
584 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
585 .page_infos
586 .iter()
587 .filter(|offsets_page| offsets_page.num_rows > 0)
588 .map(|offsets_page| {
589 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
590 &offsets_page.encoding.as_legacy().array_encoding
591 {
592 let inner = PageInfo {
593 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
594 encoding: PageEncoding::Legacy(
595 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
596 ),
597 num_rows: offsets_page.num_rows,
598 priority: 0,
599 };
600 (
601 inner,
602 OffsetPageInfo {
603 offsets_in_page: offsets_page.num_rows,
604 null_offset_adjustment: list_encoding.null_offset_adjustment,
605 num_items_referenced_by_page: list_encoding.num_items,
606 },
607 )
608 } else {
609 panic!("Expected a list column");
611 }
612 })
613 .unzip();
614 let inner = Arc::new(PrimitiveFieldScheduler::new(
615 offsets_column.index,
616 DataType::UInt64,
617 Arc::from(inner_infos.into_boxed_slice()),
618 offsets_column_buffers,
619 self.validate_data,
620 )) as Arc<dyn crate::v2::decoder::FieldScheduler>;
621 let items_field = match list_field.data_type() {
622 DataType::List(inner) => inner,
623 DataType::LargeList(inner) => inner,
624 _ => unreachable!(),
625 };
626 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
627 DataType::Int32
628 } else {
629 DataType::Int64
630 };
631 Ok(Box::new(ListFieldScheduler::new(
632 inner,
633 items_scheduler.into(),
634 items_field,
635 offset_type,
636 null_offset_adjustments,
637 )))
638 }
639
640 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
641 if let column_encoding::ColumnEncoding::Blob(blob) =
642 column_info.encoding.column_encoding.as_ref().unwrap()
643 {
644 let mut column_info = column_info.clone();
645 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
646 Some(column_info)
647 } else {
648 None
649 }
650 }
651
652 fn create_structural_field_scheduler(
653 &self,
654 field: &Field,
655 column_infos: &mut ColumnInfoIter,
656 ) -> Result<Box<dyn StructuralFieldScheduler>> {
657 let data_type = field.data_type();
658 if Self::is_primitive(&data_type) {
659 let column_info = column_infos.expect_next()?;
660 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
661 column_info.as_ref(),
662 self.decompressor_strategy.as_ref(),
663 )?);
664
665 column_infos.next_top_level();
667
668 return Ok(scheduler);
669 }
670 match &data_type {
671 DataType::Struct(fields) => {
672 if field.is_packed_struct() {
673 let column_info = column_infos.expect_next()?;
674 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
675 column_info.as_ref(),
676 self.decompressor_strategy.as_ref(),
677 )?);
678
679 column_infos.next_top_level();
681
682 return Ok(scheduler);
683 }
684 let mut child_schedulers = Vec::with_capacity(field.children.len());
685 for field in field.children.iter() {
686 let field_scheduler =
687 self.create_structural_field_scheduler(field, column_infos)?;
688 child_schedulers.push(field_scheduler);
689 }
690
691 let fields = fields.clone();
692 Ok(
693 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
694 as Box<dyn StructuralFieldScheduler>,
695 )
696 }
697 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
698 let column_info = column_infos.expect_next()?;
699 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
700 column_info.as_ref(),
701 self.decompressor_strategy.as_ref(),
702 )?);
703 column_infos.next_top_level();
704 Ok(scheduler)
705 }
706 DataType::List(_) | DataType::LargeList(_) => {
707 let child = field
708 .children
709 .first()
710 .expect("List field must have a child");
711 let child_scheduler =
712 self.create_structural_field_scheduler(child, column_infos)?;
713 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
714 as Box<dyn StructuralFieldScheduler>)
715 }
716 _ => todo!(),
717 }
718 }
719
720 fn create_legacy_field_scheduler(
721 &self,
722 field: &Field,
723 column_infos: &mut ColumnInfoIter,
724 buffers: FileBuffers,
725 ) -> Result<Box<dyn crate::v2::decoder::FieldScheduler>> {
726 let data_type = field.data_type();
727 if Self::is_primitive(&data_type) {
728 let column_info = column_infos.expect_next()?;
729 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
730 return Ok(scheduler);
731 } else if data_type.is_binary_like() {
732 let column_info = column_infos.next().unwrap().clone();
733 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
735 let desc_scheduler =
736 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
737 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
738 return Ok(blob_scheduler);
739 }
740 if let Some(page_info) = column_info.page_infos.first() {
741 if matches!(
742 page_info.encoding.as_legacy(),
743 pb::ArrayEncoding {
744 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
745 }
746 ) {
747 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
748 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
749 } else {
750 DataType::LargeList(Arc::new(ArrowField::new(
751 "item",
752 DataType::UInt8,
753 false,
754 )))
755 };
756 let list_field = Field::try_from(ArrowField::new(
757 field.name.clone(),
758 list_type,
759 field.nullable,
760 ))
761 .unwrap();
762 let list_scheduler = self.create_list_scheduler(
763 &list_field,
764 column_infos,
765 buffers,
766 &column_info,
767 )?;
768 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
769 list_scheduler.into(),
770 field.data_type(),
771 ));
772 return Ok(binary_scheduler);
773 } else {
774 let scheduler =
775 self.create_primitive_scheduler(field, &column_info, buffers)?;
776 return Ok(scheduler);
777 }
778 } else {
779 return self.create_primitive_scheduler(field, &column_info, buffers);
780 }
781 }
782 match &data_type {
783 DataType::FixedSizeList(inner, _dimension) => {
784 if Self::is_primitive(inner.data_type()) {
787 let primitive_col = column_infos.expect_next()?;
788 let scheduler =
789 self.create_primitive_scheduler(field, primitive_col, buffers)?;
790 Ok(scheduler)
791 } else {
792 todo!()
793 }
794 }
795 DataType::Dictionary(_key_type, value_type) => {
796 if Self::is_primitive(value_type) || value_type.is_binary_like() {
797 let primitive_col = column_infos.expect_next()?;
798 let scheduler =
799 self.create_primitive_scheduler(field, primitive_col, buffers)?;
800 Ok(scheduler)
801 } else {
802 Err(Error::NotSupported {
803 source: format!(
804 "No way to decode into a dictionary field of type {}",
805 value_type
806 )
807 .into(),
808 location: location!(),
809 })
810 }
811 }
812 DataType::List(_) | DataType::LargeList(_) => {
813 let offsets_column = column_infos.expect_next()?.clone();
814 column_infos.next_top_level();
815 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
816 }
817 DataType::Struct(fields) => {
818 let column_info = column_infos.expect_next()?;
819
820 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
822 return self.create_primitive_scheduler(field, &blob_col, buffers);
824 }
825
826 if Self::check_packed_struct(column_info) {
827 self.create_primitive_scheduler(field, column_info, buffers)
829 } else {
830 Self::check_simple_struct(column_info, &field.name).unwrap();
832 let num_rows = column_info
833 .page_infos
834 .iter()
835 .map(|page| page.num_rows)
836 .sum();
837 let mut child_schedulers = Vec::with_capacity(field.children.len());
838 for field in &field.children {
839 column_infos.next_top_level();
840 let field_scheduler =
841 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
842 child_schedulers.push(Arc::from(field_scheduler));
843 }
844
845 let fields = fields.clone();
846 Ok(Box::new(SimpleStructScheduler::new(
847 child_schedulers,
848 fields,
849 num_rows,
850 )))
851 }
852 }
853 _ => todo!(),
855 }
856 }
857}
858
859fn root_column(num_rows: u64) -> ColumnInfo {
861 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
862 let final_page_num_rows = num_rows % (u32::MAX as u64);
863 let root_pages = (0..num_root_pages)
864 .map(|i| PageInfo {
865 num_rows: if i == num_root_pages - 1 {
866 final_page_num_rows
867 } else {
868 u64::MAX
869 },
870 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
871 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
872 pb::SimpleStruct {},
873 )),
874 }),
875 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
877 })
878 .collect::<Vec<_>>();
879 ColumnInfo {
880 buffer_offsets_and_sizes: Arc::new([]),
881 encoding: pb::ColumnEncoding {
882 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
883 },
884 index: u32::MAX,
885 page_infos: Arc::from(root_pages),
886 }
887}
888
889pub enum RootDecoder {
890 Structural(StructuralStructDecoder),
891 Legacy(SimpleStructDecoder),
892}
893
894impl RootDecoder {
895 pub fn into_structural(self) -> StructuralStructDecoder {
896 match self {
897 Self::Structural(decoder) => decoder,
898 Self::Legacy(_) => panic!("Expected a structural decoder"),
899 }
900 }
901
902 pub fn into_legacy(self) -> SimpleStructDecoder {
903 match self {
904 Self::Legacy(decoder) => decoder,
905 Self::Structural(_) => panic!("Expected a legacy decoder"),
906 }
907 }
908}
909
910impl DecodeBatchScheduler {
911 #[allow(clippy::too_many_arguments)]
914 pub async fn try_new<'a>(
915 schema: &'a Schema,
916 column_indices: &[u32],
917 column_infos: &[Arc<ColumnInfo>],
918 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
919 num_rows: u64,
920 _decoder_plugins: Arc<DecoderPlugins>,
921 io: Arc<dyn EncodingsIo>,
922 cache: Arc<LanceCache>,
923 filter: &FilterExpression,
924 ) -> Result<Self> {
925 assert!(num_rows > 0);
926 let buffers = FileBuffers {
927 positions_and_sizes: file_buffer_positions_and_sizes,
928 };
929 let arrow_schema = ArrowSchema::from(schema);
930 let root_fields = arrow_schema.fields().clone();
931 let root_type = DataType::Struct(root_fields.clone());
932 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
933 root_field.children.clone_from(&schema.fields);
937 root_field
938 .metadata
939 .insert("__lance_decoder_root".to_string(), "true".to_string());
940
941 if column_infos[0].is_structural() {
942 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
943
944 let mut root_scheduler = CoreFieldDecoderStrategy::default()
945 .create_structural_field_scheduler(&root_field, &mut column_iter)?;
946
947 let context = SchedulerContext::new(io, cache.clone());
948 root_scheduler.initialize(filter, &context).await?;
949
950 Ok(Self {
951 root_scheduler: RootScheduler::Structural(root_scheduler),
952 root_fields,
953 cache,
954 })
955 } else {
956 let mut columns = Vec::with_capacity(column_infos.len() + 1);
959 columns.push(Arc::new(root_column(num_rows)));
960 columns.extend(column_infos.iter().cloned());
961
962 let adjusted_column_indices = [0_u32]
963 .into_iter()
964 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
965 .collect::<Vec<_>>();
966 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
967 let root_scheduler = CoreFieldDecoderStrategy::default()
968 .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
969
970 let context = SchedulerContext::new(io, cache.clone());
971 root_scheduler.initialize(filter, &context).await?;
972
973 Ok(Self {
974 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
975 root_fields,
976 cache,
977 })
978 }
979 }
980
981 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
982 pub fn from_scheduler(
983 root_scheduler: Arc<dyn crate::v2::decoder::FieldScheduler>,
984 root_fields: Fields,
985 cache: Arc<LanceCache>,
986 ) -> Self {
987 Self {
988 root_scheduler: RootScheduler::Legacy(root_scheduler),
989 root_fields,
990 cache,
991 }
992 }
993
994 fn do_schedule_ranges_structural(
995 &mut self,
996 ranges: &[Range<u64>],
997 filter: &FilterExpression,
998 io: Arc<dyn EncodingsIo>,
999 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1000 ) {
1001 let root_scheduler = self.root_scheduler.as_structural();
1002 let mut context = SchedulerContext::new(io, self.cache.clone());
1003 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1004 if let Err(schedule_ranges_err) = maybe_root_job {
1005 schedule_action(Err(schedule_ranges_err));
1006 return;
1007 }
1008 let mut root_job = maybe_root_job.unwrap();
1009 let mut num_rows_scheduled = 0;
1010 loop {
1011 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1012 if let Err(err) = maybe_next_scan_line {
1013 schedule_action(Err(err));
1014 return;
1015 }
1016 let next_scan_line = maybe_next_scan_line.unwrap();
1017 match next_scan_line {
1018 Some(next_scan_line) => {
1019 trace!(
1020 "Scheduled scan line of {} rows and {} decoders",
1021 next_scan_line.rows_scheduled,
1022 next_scan_line.decoders.len()
1023 );
1024 num_rows_scheduled += next_scan_line.rows_scheduled;
1025 if !schedule_action(Ok(DecoderMessage {
1026 scheduled_so_far: num_rows_scheduled,
1027 decoders: next_scan_line.decoders,
1028 })) {
1029 return;
1031 }
1032 }
1033 None => return,
1034 }
1035 }
1036 }
1037
1038 fn do_schedule_ranges_legacy(
1039 &mut self,
1040 ranges: &[Range<u64>],
1041 filter: &FilterExpression,
1042 io: Arc<dyn EncodingsIo>,
1043 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1044 priority: Option<Box<dyn PriorityRange>>,
1048 ) {
1049 let root_scheduler = self.root_scheduler.as_legacy();
1050 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1051 trace!(
1052 "Scheduling {} ranges across {}..{} ({} rows){}",
1053 ranges.len(),
1054 ranges.first().unwrap().start,
1055 ranges.last().unwrap().end,
1056 rows_requested,
1057 priority
1058 .as_ref()
1059 .map(|p| format!(" (priority={:?})", p))
1060 .unwrap_or_default()
1061 );
1062
1063 let mut context = SchedulerContext::new(io, self.cache.clone());
1064 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1065 if let Err(schedule_ranges_err) = maybe_root_job {
1066 schedule_action(Err(schedule_ranges_err));
1067 return;
1068 }
1069 let mut root_job = maybe_root_job.unwrap();
1070 let mut num_rows_scheduled = 0;
1071 let mut rows_to_schedule = root_job.num_rows();
1072 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1073 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1074 while rows_to_schedule > 0 {
1075 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1076 if let Err(schedule_next_err) = maybe_next_scan_line {
1077 schedule_action(Err(schedule_next_err));
1078 return;
1079 }
1080 let next_scan_line = maybe_next_scan_line.unwrap();
1081 priority.advance(next_scan_line.rows_scheduled);
1082 num_rows_scheduled += next_scan_line.rows_scheduled;
1083 rows_to_schedule -= next_scan_line.rows_scheduled;
1084 trace!(
1085 "Scheduled scan line of {} rows and {} decoders",
1086 next_scan_line.rows_scheduled,
1087 next_scan_line.decoders.len()
1088 );
1089 if !schedule_action(Ok(DecoderMessage {
1090 scheduled_so_far: num_rows_scheduled,
1091 decoders: next_scan_line.decoders,
1092 })) {
1093 return;
1095 }
1096
1097 trace!("Finished scheduling {} ranges", ranges.len());
1098 }
1099 }
1100
1101 fn do_schedule_ranges(
1102 &mut self,
1103 ranges: &[Range<u64>],
1104 filter: &FilterExpression,
1105 io: Arc<dyn EncodingsIo>,
1106 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1107 priority: Option<Box<dyn PriorityRange>>,
1111 ) {
1112 match &self.root_scheduler {
1113 RootScheduler::Legacy(_) => {
1114 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1115 }
1116 RootScheduler::Structural(_) => {
1117 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1118 }
1119 }
1120 }
1121
1122 pub fn schedule_ranges_to_vec(
1125 &mut self,
1126 ranges: &[Range<u64>],
1127 filter: &FilterExpression,
1128 io: Arc<dyn EncodingsIo>,
1129 priority: Option<Box<dyn PriorityRange>>,
1130 ) -> Result<Vec<DecoderMessage>> {
1131 let mut decode_messages = Vec::new();
1132 self.do_schedule_ranges(
1133 ranges,
1134 filter,
1135 io,
1136 |msg| {
1137 decode_messages.push(msg);
1138 true
1139 },
1140 priority,
1141 );
1142 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1143 }
1144
1145 #[instrument(skip_all)]
1155 pub fn schedule_ranges(
1156 &mut self,
1157 ranges: &[Range<u64>],
1158 filter: &FilterExpression,
1159 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1160 scheduler: Arc<dyn EncodingsIo>,
1161 ) {
1162 self.do_schedule_ranges(
1163 ranges,
1164 filter,
1165 scheduler,
1166 |msg| {
1167 match sink.send(msg) {
1168 Ok(_) => true,
1169 Err(SendError { .. }) => {
1170 debug!(
1173 "schedule_ranges aborting early since decoder appears to have been dropped"
1174 );
1175 false
1176 }
1177 }
1178 },
1179 None,
1180 )
1181 }
1182
1183 #[instrument(skip_all)]
1191 pub fn schedule_range(
1192 &mut self,
1193 range: Range<u64>,
1194 filter: &FilterExpression,
1195 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1196 scheduler: Arc<dyn EncodingsIo>,
1197 ) {
1198 self.schedule_ranges(&[range], filter, sink, scheduler)
1199 }
1200
1201 pub fn schedule_take(
1209 &mut self,
1210 indices: &[u64],
1211 filter: &FilterExpression,
1212 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1213 scheduler: Arc<dyn EncodingsIo>,
1214 ) {
1215 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1216 if indices.is_empty() {
1217 return;
1218 }
1219 trace!("Scheduling take of {} rows", indices.len());
1220 let ranges = Self::indices_to_ranges(indices);
1221 self.schedule_ranges(&ranges, filter, sink, scheduler)
1222 }
1223
1224 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1226 let mut ranges = Vec::new();
1227 let mut start = indices[0];
1228
1229 for window in indices.windows(2) {
1230 if window[1] != window[0] + 1 {
1231 ranges.push(start..window[0] + 1);
1232 start = window[1];
1233 }
1234 }
1235
1236 ranges.push(start..*indices.last().unwrap() + 1);
1237 ranges
1238 }
1239}
1240
1241pub struct ReadBatchTask {
1242 pub task: BoxFuture<'static, Result<RecordBatch>>,
1243 pub num_rows: u32,
1244}
1245
1246pub struct BatchDecodeStream {
1248 context: DecoderContext,
1249 root_decoder: SimpleStructDecoder,
1250 rows_remaining: u64,
1251 rows_per_batch: u32,
1252 rows_scheduled: u64,
1253 rows_drained: u64,
1254 scheduler_exhausted: bool,
1255 emitted_batch_size_warning: Arc<Once>,
1256}
1257
1258impl BatchDecodeStream {
1259 pub fn new(
1270 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1271 rows_per_batch: u32,
1272 num_rows: u64,
1273 root_decoder: SimpleStructDecoder,
1274 ) -> Self {
1275 Self {
1276 context: DecoderContext::new(scheduled),
1277 root_decoder,
1278 rows_remaining: num_rows,
1279 rows_per_batch,
1280 rows_scheduled: 0,
1281 rows_drained: 0,
1282 scheduler_exhausted: false,
1283 emitted_batch_size_warning: Arc::new(Once::new()),
1284 }
1285 }
1286
1287 fn accept_decoder(&mut self, decoder: crate::v2::decoder::DecoderReady) -> Result<()> {
1288 if decoder.path.is_empty() {
1289 Ok(())
1291 } else {
1292 self.root_decoder.accept_child(decoder)
1293 }
1294 }
1295
1296 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1297 if self.scheduler_exhausted {
1298 return Ok(self.rows_scheduled);
1299 }
1300 while self.rows_scheduled < scheduled_need {
1301 let next_message = self.context.source.recv().await;
1302 match next_message {
1303 Some(scan_line) => {
1304 let scan_line = scan_line?;
1305 self.rows_scheduled = scan_line.scheduled_so_far;
1306 for message in scan_line.decoders {
1307 self.accept_decoder(message.into_legacy())?;
1308 }
1309 }
1310 None => {
1311 self.scheduler_exhausted = true;
1315 return Ok(self.rows_scheduled);
1316 }
1317 }
1318 }
1319 Ok(scheduled_need)
1320 }
1321
1322 #[instrument(level = "debug", skip_all)]
1323 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1324 trace!(
1325 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1326 self.rows_remaining,
1327 self.rows_drained,
1328 self.rows_scheduled,
1329 );
1330 if self.rows_remaining == 0 {
1331 return Ok(None);
1332 }
1333
1334 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1335 self.rows_remaining -= to_take;
1336
1337 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1338 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1339 if scheduled_need > 0 {
1340 let desired_scheduled = scheduled_need + self.rows_scheduled;
1341 trace!(
1342 "Draining from scheduler (desire at least {} scheduled rows)",
1343 desired_scheduled
1344 );
1345 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1346 if actually_scheduled < desired_scheduled {
1347 let under_scheduled = desired_scheduled - actually_scheduled;
1348 to_take -= under_scheduled;
1349 }
1350 }
1351
1352 if to_take == 0 {
1353 return Ok(None);
1354 }
1355
1356 let loaded_need = self.rows_drained + to_take - 1;
1358 trace!(
1359 "Waiting for I/O (desire at least {} fully loaded rows)",
1360 loaded_need
1361 );
1362 self.root_decoder.wait_for_loaded(loaded_need).await?;
1363
1364 let next_task = self.root_decoder.drain(to_take)?;
1365 self.rows_drained += to_take;
1366 Ok(Some(next_task))
1367 }
1368
1369 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1370 let stream = futures::stream::unfold(self, |mut slf| async move {
1371 let next_task = slf.next_batch_task().await;
1372 let next_task = next_task.transpose().map(|next_task| {
1373 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1374 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1375 let task = tokio::spawn(
1376 (async move {
1377 let next_task = next_task?;
1378 next_task.into_batch(emitted_batch_size_warning)
1379 })
1380 .in_current_span(),
1381 );
1382 (task, num_rows)
1383 });
1384 next_task.map(|(task, num_rows)| {
1385 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1386 debug_assert!(num_rows <= u32::MAX as u64);
1388 let next_task = ReadBatchTask {
1389 task,
1390 num_rows: num_rows as u32,
1391 };
1392 (next_task, slf)
1393 })
1394 });
1395 stream.boxed()
1396 }
1397}
1398
1399enum RootDecoderMessage {
1402 LoadedPage(LoadedPage),
1403 LegacyPage(crate::v2::decoder::DecoderReady),
1404}
1405trait RootDecoderType {
1406 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1407 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1408 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1409}
1410impl RootDecoderType for StructuralStructDecoder {
1411 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1412 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1413 unreachable!()
1414 };
1415 self.accept_page(loaded_page)
1416 }
1417 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1418 self.drain_batch_task(num_rows)
1419 }
1420 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1421 Ok(())
1423 }
1424}
1425impl RootDecoderType for SimpleStructDecoder {
1426 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1427 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1428 unreachable!()
1429 };
1430 self.accept_child(legacy_page)
1431 }
1432 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1433 self.drain(num_rows)
1434 }
1435 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1436 runtime.block_on(self.wait_for_loaded(loaded_need))
1437 }
1438}
1439
1440struct BatchDecodeIterator<T: RootDecoderType> {
1442 messages: VecDeque<Result<DecoderMessage>>,
1443 root_decoder: T,
1444 rows_remaining: u64,
1445 rows_per_batch: u32,
1446 rows_scheduled: u64,
1447 rows_drained: u64,
1448 emitted_batch_size_warning: Arc<Once>,
1449 wait_for_io_runtime: tokio::runtime::Runtime,
1453 schema: Arc<ArrowSchema>,
1454}
1455
1456impl<T: RootDecoderType> BatchDecodeIterator<T> {
1457 pub fn new(
1459 messages: VecDeque<Result<DecoderMessage>>,
1460 rows_per_batch: u32,
1461 num_rows: u64,
1462 root_decoder: T,
1463 schema: Arc<ArrowSchema>,
1464 ) -> Self {
1465 Self {
1466 messages,
1467 root_decoder,
1468 rows_remaining: num_rows,
1469 rows_per_batch,
1470 rows_scheduled: 0,
1471 rows_drained: 0,
1472 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1473 .build()
1474 .unwrap(),
1475 emitted_batch_size_warning: Arc::new(Once::new()),
1476 schema,
1477 }
1478 }
1479
1480 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1485 match maybe_done(unloaded_page.0) {
1486 MaybeDone::Done(loaded_page) => loaded_page,
1488 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1490 MaybeDone::Gone => unreachable!(),
1491 }
1492 }
1493
1494 #[instrument(skip_all)]
1499 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1500 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1501 let message = self.messages.pop_front().unwrap()?;
1502 self.rows_scheduled = message.scheduled_so_far;
1503 for decoder_message in message.decoders {
1504 match decoder_message {
1505 MessageType::UnloadedPage(unloaded_page) => {
1506 let loaded_page = self.wait_for_page(unloaded_page)?;
1507 self.root_decoder
1508 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1509 }
1510 MessageType::DecoderReady(decoder_ready) => {
1511 if !decoder_ready.path.is_empty() {
1513 self.root_decoder
1514 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1515 }
1516 }
1517 }
1518 }
1519 }
1520
1521 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1522
1523 self.root_decoder
1524 .wait(loaded_need, &self.wait_for_io_runtime)?;
1525 Ok(self.rows_scheduled)
1526 }
1527
1528 #[instrument(level = "debug", skip_all)]
1529 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1530 trace!(
1531 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1532 self.rows_remaining,
1533 self.rows_drained,
1534 self.rows_scheduled,
1535 );
1536 if self.rows_remaining == 0 {
1537 return Ok(None);
1538 }
1539
1540 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1541 self.rows_remaining -= to_take;
1542
1543 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1544 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1545 if scheduled_need > 0 {
1546 let desired_scheduled = scheduled_need + self.rows_scheduled;
1547 trace!(
1548 "Draining from scheduler (desire at least {} scheduled rows)",
1549 desired_scheduled
1550 );
1551 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1552 if actually_scheduled < desired_scheduled {
1553 let under_scheduled = desired_scheduled - actually_scheduled;
1554 to_take -= under_scheduled;
1555 }
1556 }
1557
1558 if to_take == 0 {
1559 return Ok(None);
1560 }
1561
1562 let next_task = self.root_decoder.drain_batch(to_take)?;
1563
1564 self.rows_drained += to_take;
1565
1566 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1567
1568 Ok(Some(batch))
1569 }
1570}
1571
1572impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1573 type Item = ArrowResult<RecordBatch>;
1574
1575 fn next(&mut self) -> Option<Self::Item> {
1576 self.next_batch_task()
1577 .transpose()
1578 .map(|r| r.map_err(ArrowError::from))
1579 }
1580}
1581
1582impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1583 fn schema(&self) -> Arc<ArrowSchema> {
1584 self.schema.clone()
1585 }
1586}
1587
1588pub struct StructuralBatchDecodeStream {
1590 context: DecoderContext,
1591 root_decoder: StructuralStructDecoder,
1592 rows_remaining: u64,
1593 rows_per_batch: u32,
1594 rows_scheduled: u64,
1595 rows_drained: u64,
1596 scheduler_exhausted: bool,
1597 emitted_batch_size_warning: Arc<Once>,
1598}
1599
1600impl StructuralBatchDecodeStream {
1601 pub fn new(
1612 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1613 rows_per_batch: u32,
1614 num_rows: u64,
1615 root_decoder: StructuralStructDecoder,
1616 ) -> Self {
1617 Self {
1618 context: DecoderContext::new(scheduled),
1619 root_decoder,
1620 rows_remaining: num_rows,
1621 rows_per_batch,
1622 rows_scheduled: 0,
1623 rows_drained: 0,
1624 scheduler_exhausted: false,
1625 emitted_batch_size_warning: Arc::new(Once::new()),
1626 }
1627 }
1628
1629 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1630 if self.scheduler_exhausted {
1631 return Ok(self.rows_scheduled);
1632 }
1633 while self.rows_scheduled < scheduled_need {
1634 let next_message = self.context.source.recv().await;
1635 match next_message {
1636 Some(scan_line) => {
1637 let scan_line = scan_line?;
1638 self.rows_scheduled = scan_line.scheduled_so_far;
1639 for message in scan_line.decoders {
1640 let unloaded_page = message.into_structural();
1641 let loaded_page = unloaded_page.0.await?;
1642 self.root_decoder.accept_page(loaded_page)?;
1643 }
1644 }
1645 None => {
1646 self.scheduler_exhausted = true;
1650 return Ok(self.rows_scheduled);
1651 }
1652 }
1653 }
1654 Ok(scheduled_need)
1655 }
1656
1657 #[instrument(level = "debug", skip_all)]
1658 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1659 trace!(
1660 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1661 self.rows_remaining,
1662 self.rows_drained,
1663 self.rows_scheduled,
1664 );
1665 if self.rows_remaining == 0 {
1666 return Ok(None);
1667 }
1668
1669 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1670 self.rows_remaining -= to_take;
1671
1672 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1673 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1674 if scheduled_need > 0 {
1675 let desired_scheduled = scheduled_need + self.rows_scheduled;
1676 trace!(
1677 "Draining from scheduler (desire at least {} scheduled rows)",
1678 desired_scheduled
1679 );
1680 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1681 if actually_scheduled < desired_scheduled {
1682 let under_scheduled = desired_scheduled - actually_scheduled;
1683 to_take -= under_scheduled;
1684 }
1685 }
1686
1687 if to_take == 0 {
1688 return Ok(None);
1689 }
1690
1691 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1692 self.rows_drained += to_take;
1693 Ok(Some(next_task))
1694 }
1695
1696 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1697 let stream = futures::stream::unfold(self, |mut slf| async move {
1698 let next_task = slf.next_batch_task().await;
1699 let next_task = next_task.transpose().map(|next_task| {
1700 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1701 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1702 let task = tokio::spawn(async move {
1703 let next_task = next_task?;
1704 next_task.into_batch(emitted_batch_size_warning)
1705 });
1706 (task, num_rows)
1707 });
1708 next_task.map(|(task, num_rows)| {
1709 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1710 debug_assert!(num_rows <= u32::MAX as u64);
1712 let next_task = ReadBatchTask {
1713 task,
1714 num_rows: num_rows as u32,
1715 };
1716 (next_task, slf)
1717 })
1718 });
1719 stream.boxed()
1720 }
1721}
1722
1723#[derive(Debug)]
1724pub enum RequestedRows {
1725 Ranges(Vec<Range<u64>>),
1726 Indices(Vec<u64>),
1727}
1728
1729impl RequestedRows {
1730 pub fn num_rows(&self) -> u64 {
1731 match self {
1732 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1733 Self::Indices(indices) => indices.len() as u64,
1734 }
1735 }
1736}
1737
1738#[derive(Debug, Clone)]
1739pub struct SchedulerDecoderConfig {
1740 pub decoder_plugins: Arc<DecoderPlugins>,
1741 pub batch_size: u32,
1742 pub io: Arc<dyn EncodingsIo>,
1743 pub cache: Arc<LanceCache>,
1744 pub should_validate: bool,
1745}
1746
1747fn check_scheduler_on_drop(
1748 stream: BoxStream<'static, ReadBatchTask>,
1749 scheduler_handle: tokio::task::JoinHandle<()>,
1750) -> BoxStream<'static, ReadBatchTask> {
1751 let mut scheduler_handle = Some(scheduler_handle);
1755 let check_scheduler = stream::unfold((), move |_| {
1756 let handle = scheduler_handle.take();
1757 async move {
1758 if let Some(handle) = handle {
1759 handle.await.unwrap();
1760 }
1761 None
1762 }
1763 });
1764 stream.chain(check_scheduler).boxed()
1765}
1766
1767pub fn create_decode_stream(
1768 schema: &Schema,
1769 num_rows: u64,
1770 batch_size: u32,
1771 is_structural: bool,
1772 should_validate: bool,
1773 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1774) -> BoxStream<'static, ReadBatchTask> {
1775 if is_structural {
1776 let arrow_schema = ArrowSchema::from(schema);
1777 let structural_decoder = StructuralStructDecoder::new(
1778 arrow_schema.fields,
1779 should_validate,
1780 true,
1781 );
1782 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1783 } else {
1784 let arrow_schema = ArrowSchema::from(schema);
1785 let root_fields = arrow_schema.fields;
1786
1787 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1788 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1789 }
1790}
1791
1792pub fn create_decode_iterator(
1796 schema: &Schema,
1797 num_rows: u64,
1798 batch_size: u32,
1799 should_validate: bool,
1800 is_structural: bool,
1801 messages: VecDeque<Result<DecoderMessage>>,
1802) -> Box<dyn RecordBatchReader + Send + 'static> {
1803 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1804 let root_fields = arrow_schema.fields.clone();
1805 if is_structural {
1806 let simple_struct_decoder =
1807 StructuralStructDecoder::new(root_fields, should_validate, true);
1808 Box::new(BatchDecodeIterator::new(
1809 messages,
1810 batch_size,
1811 num_rows,
1812 simple_struct_decoder,
1813 arrow_schema,
1814 ))
1815 } else {
1816 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1817 Box::new(BatchDecodeIterator::new(
1818 messages,
1819 batch_size,
1820 num_rows,
1821 root_decoder,
1822 arrow_schema,
1823 ))
1824 }
1825}
1826
1827fn create_scheduler_decoder(
1828 column_infos: Vec<Arc<ColumnInfo>>,
1829 requested_rows: RequestedRows,
1830 filter: FilterExpression,
1831 column_indices: Vec<u32>,
1832 target_schema: Arc<Schema>,
1833 config: SchedulerDecoderConfig,
1834) -> Result<BoxStream<'static, ReadBatchTask>> {
1835 let num_rows = requested_rows.num_rows();
1836
1837 let is_structural = column_infos[0].is_structural();
1838
1839 let (tx, rx) = mpsc::unbounded_channel();
1840
1841 let decode_stream = create_decode_stream(
1842 &target_schema,
1843 num_rows,
1844 config.batch_size,
1845 is_structural,
1846 config.should_validate,
1847 rx,
1848 );
1849
1850 let scheduler_handle = tokio::task::spawn(async move {
1851 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1852 target_schema.as_ref(),
1853 &column_indices,
1854 &column_infos,
1855 &vec![],
1856 num_rows,
1857 config.decoder_plugins,
1858 config.io.clone(),
1859 config.cache,
1860 &filter,
1861 )
1862 .await
1863 {
1864 Ok(scheduler) => scheduler,
1865 Err(e) => {
1866 let _ = tx.send(Err(e));
1867 return;
1868 }
1869 };
1870
1871 match requested_rows {
1872 RequestedRows::Ranges(ranges) => {
1873 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1874 }
1875 RequestedRows::Indices(indices) => {
1876 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1877 }
1878 }
1879 });
1880
1881 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1882}
1883
1884pub fn schedule_and_decode(
1890 column_infos: Vec<Arc<ColumnInfo>>,
1891 requested_rows: RequestedRows,
1892 filter: FilterExpression,
1893 column_indices: Vec<u32>,
1894 target_schema: Arc<Schema>,
1895 config: SchedulerDecoderConfig,
1896) -> BoxStream<'static, ReadBatchTask> {
1897 if requested_rows.num_rows() == 0 {
1898 return stream::empty().boxed();
1899 }
1900 match create_scheduler_decoder(
1904 column_infos,
1905 requested_rows,
1906 filter,
1907 column_indices,
1908 target_schema,
1909 config,
1910 ) {
1911 Ok(stream) => stream,
1913 Err(e) => stream::once(std::future::ready(ReadBatchTask {
1914 num_rows: 0,
1915 task: std::future::ready(Err(e)).boxed(),
1916 }))
1917 .boxed(),
1918 }
1919}
1920
1921pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
1922 tokio::runtime::Builder::new_current_thread()
1923 .build()
1924 .unwrap()
1925});
1926
1927pub fn schedule_and_decode_blocking(
1942 column_infos: Vec<Arc<ColumnInfo>>,
1943 requested_rows: RequestedRows,
1944 filter: FilterExpression,
1945 column_indices: Vec<u32>,
1946 target_schema: Arc<Schema>,
1947 config: SchedulerDecoderConfig,
1948) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1949 if requested_rows.num_rows() == 0 {
1950 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
1951 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
1952 }
1953
1954 let num_rows = requested_rows.num_rows();
1955 let is_structural = column_infos[0].is_structural();
1956
1957 let (tx, mut rx) = mpsc::unbounded_channel();
1958
1959 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
1962 target_schema.as_ref(),
1963 &column_indices,
1964 &column_infos,
1965 &vec![],
1966 num_rows,
1967 config.decoder_plugins,
1968 config.io.clone(),
1969 config.cache,
1970 &filter,
1971 ))?;
1972
1973 match requested_rows {
1975 RequestedRows::Ranges(ranges) => {
1976 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1977 }
1978 RequestedRows::Indices(indices) => {
1979 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1980 }
1981 }
1982
1983 let mut messages = Vec::new();
1985 while rx
1986 .recv_many(&mut messages, usize::MAX)
1987 .now_or_never()
1988 .unwrap()
1989 != 0
1990 {}
1991
1992 let decode_iterator = create_decode_iterator(
1994 &target_schema,
1995 num_rows,
1996 config.batch_size,
1997 config.should_validate,
1998 is_structural,
1999 messages.into(),
2000 );
2001
2002 Ok(decode_iterator)
2003}
2004
2005pub trait PrimitivePageDecoder: Send + Sync {
2017 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2049}
2050
2051pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2060 fn schedule_ranges(
2072 &self,
2073 ranges: &[Range<u64>],
2074 scheduler: &Arc<dyn EncodingsIo>,
2075 top_level_row: u64,
2076 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2077}
2078
2079pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2081 fn advance(&mut self, num_rows: u64);
2082 fn current_priority(&self) -> u64;
2083 fn box_clone(&self) -> Box<dyn PriorityRange>;
2084}
2085
2086#[derive(Debug)]
2089pub struct SimplePriorityRange {
2090 priority: u64,
2091}
2092
2093impl SimplePriorityRange {
2094 fn new(priority: u64) -> Self {
2095 Self { priority }
2096 }
2097}
2098
2099impl PriorityRange for SimplePriorityRange {
2100 fn advance(&mut self, num_rows: u64) {
2101 self.priority += num_rows;
2102 }
2103
2104 fn current_priority(&self) -> u64 {
2105 self.priority
2106 }
2107
2108 fn box_clone(&self) -> Box<dyn PriorityRange> {
2109 Box::new(Self {
2110 priority: self.priority,
2111 })
2112 }
2113}
2114
2115pub struct ListPriorityRange {
2128 base: Box<dyn PriorityRange>,
2129 offsets: Arc<[u64]>,
2130 cur_index_into_offsets: usize,
2131 cur_position: u64,
2132}
2133
2134impl ListPriorityRange {
2135 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2136 Self {
2137 base,
2138 offsets,
2139 cur_index_into_offsets: 0,
2140 cur_position: 0,
2141 }
2142 }
2143}
2144
2145impl std::fmt::Debug for ListPriorityRange {
2146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2147 f.debug_struct("ListPriorityRange")
2148 .field("base", &self.base)
2149 .field("offsets.len()", &self.offsets.len())
2150 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2151 .field("cur_position", &self.cur_position)
2152 .finish()
2153 }
2154}
2155
2156impl PriorityRange for ListPriorityRange {
2157 fn advance(&mut self, num_rows: u64) {
2158 self.cur_position += num_rows;
2161 let mut idx_into_offsets = self.cur_index_into_offsets;
2162 while idx_into_offsets + 1 < self.offsets.len()
2163 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2164 {
2165 idx_into_offsets += 1;
2166 }
2167 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2168 self.cur_index_into_offsets = idx_into_offsets;
2169 self.base.advance(base_rows_advanced as u64);
2170 }
2171
2172 fn current_priority(&self) -> u64 {
2173 self.base.current_priority()
2174 }
2175
2176 fn box_clone(&self) -> Box<dyn PriorityRange> {
2177 Box::new(Self {
2178 base: self.base.box_clone(),
2179 offsets: self.offsets.clone(),
2180 cur_index_into_offsets: self.cur_index_into_offsets,
2181 cur_position: self.cur_position,
2182 })
2183 }
2184}
2185
2186pub struct SchedulerContext {
2188 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2189 io: Arc<dyn EncodingsIo>,
2190 cache: Arc<LanceCache>,
2191 name: String,
2192 path: Vec<u32>,
2193 path_names: Vec<String>,
2194}
2195
2196pub struct ScopedSchedulerContext<'a> {
2197 pub context: &'a mut SchedulerContext,
2198}
2199
2200impl<'a> ScopedSchedulerContext<'a> {
2201 pub fn pop(self) -> &'a mut SchedulerContext {
2202 self.context.pop();
2203 self.context
2204 }
2205}
2206
2207impl SchedulerContext {
2208 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2209 Self {
2210 io,
2211 cache,
2212 recv: None,
2213 name: "".to_string(),
2214 path: Vec::new(),
2215 path_names: Vec::new(),
2216 }
2217 }
2218
2219 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2220 &self.io
2221 }
2222
2223 pub fn cache(&self) -> &Arc<LanceCache> {
2224 &self.cache
2225 }
2226
2227 pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2228 self.path.push(index);
2229 self.path_names.push(name.to_string());
2230 ScopedSchedulerContext { context: self }
2231 }
2232
2233 pub fn pop(&mut self) {
2234 self.path.pop();
2235 self.path_names.pop();
2236 }
2237
2238 pub fn path_name(&self) -> String {
2239 let path = self.path_names.join("/");
2240 if self.recv.is_some() {
2241 format!("TEMP({}){}", self.name, path)
2242 } else {
2243 format!("ROOT{}", path)
2244 }
2245 }
2246
2247 pub fn current_path(&self) -> VecDeque<u32> {
2248 VecDeque::from_iter(self.path.iter().copied())
2249 }
2250
2251 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2252 pub fn locate_decoder(
2253 &mut self,
2254 decoder: Box<dyn crate::v2::decoder::LogicalPageDecoder>,
2255 ) -> crate::v2::decoder::DecoderReady {
2256 trace!(
2257 "Scheduling decoder of type {:?} for {:?}",
2258 decoder.data_type(),
2259 self.path,
2260 );
2261 crate::v2::decoder::DecoderReady {
2262 decoder,
2263 path: self.current_path(),
2264 }
2265 }
2266}
2267
2268pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2269
2270impl std::fmt::Debug for UnloadedPage {
2271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2272 f.debug_struct("UnloadedPage").finish()
2273 }
2274}
2275
2276#[derive(Debug)]
2277pub struct ScheduledScanLine {
2278 pub rows_scheduled: u64,
2279 pub decoders: Vec<MessageType>,
2280}
2281
2282pub trait StructuralSchedulingJob: std::fmt::Debug {
2283 fn schedule_next(
2284 &mut self,
2285 context: &mut SchedulerContext,
2286 ) -> Result<Option<ScheduledScanLine>>;
2287}
2288
2289pub struct FilterExpression(pub Bytes);
2297
2298impl FilterExpression {
2299 pub fn no_filter() -> Self {
2304 Self(Bytes::new())
2305 }
2306
2307 pub fn is_noop(&self) -> bool {
2309 self.0.is_empty()
2310 }
2311}
2312
2313pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2314 fn initialize<'a>(
2315 &'a mut self,
2316 filter: &'a FilterExpression,
2317 context: &'a SchedulerContext,
2318 ) -> BoxFuture<'a, Result<()>>;
2319 fn schedule_ranges<'a>(
2320 &'a self,
2321 ranges: &[Range<u64>],
2322 filter: &FilterExpression,
2323 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2324}
2325
2326pub trait DecodeArrayTask: Send {
2328 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2330}
2331
2332impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2333 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2334 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2335 }
2336}
2337
2338pub struct NextDecodeTask {
2343 pub task: Box<dyn DecodeArrayTask>,
2345 pub num_rows: u64,
2347}
2348
2349impl NextDecodeTask {
2350 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2355 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2356 let struct_arr = self.task.decode();
2357 match struct_arr {
2358 Ok(struct_arr) => {
2359 let batch = RecordBatch::from(struct_arr.as_struct());
2360 let size_bytes = batch.get_array_memory_size() as u64;
2361 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2362 emitted_batch_size_warning.call_once(|| {
2363 let size_mb = size_bytes / 1024 / 1024;
2364 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2365 });
2366 }
2367 Ok(batch)
2368 }
2369 Err(e) => {
2370 let e = Error::Internal {
2371 message: format!("Error decoding batch: {}", e),
2372 location: location!(),
2373 };
2374 Err(e)
2375 }
2376 }
2377 }
2378}
2379
2380#[derive(Debug)]
2384pub enum MessageType {
2385 DecoderReady(crate::v2::decoder::DecoderReady),
2390 UnloadedPage(UnloadedPage),
2394}
2395
2396impl MessageType {
2397 pub fn into_legacy(self) -> crate::v2::decoder::DecoderReady {
2398 match self {
2399 Self::DecoderReady(decoder) => decoder,
2400 Self::UnloadedPage(_) => {
2401 panic!("Expected DecoderReady but got UnloadedPage")
2402 }
2403 }
2404 }
2405
2406 pub fn into_structural(self) -> UnloadedPage {
2407 match self {
2408 Self::UnloadedPage(unloaded) => unloaded,
2409 Self::DecoderReady(_) => {
2410 panic!("Expected UnloadedPage but got DecoderReady")
2411 }
2412 }
2413 }
2414}
2415
2416pub struct DecoderMessage {
2417 pub scheduled_so_far: u64,
2418 pub decoders: Vec<MessageType>,
2419}
2420
2421pub struct DecoderContext {
2422 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2423}
2424
2425impl DecoderContext {
2426 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2427 Self { source }
2428 }
2429}
2430
2431pub struct DecodedPage {
2432 pub data: DataBlock,
2433 pub repdef: RepDefUnraveler,
2434}
2435
2436pub trait DecodePageTask: Send + std::fmt::Debug {
2437 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2439}
2440
2441pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2442 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2443 fn num_rows(&self) -> u64;
2444}
2445
2446#[derive(Debug)]
2447pub struct LoadedPage {
2448 pub decoder: Box<dyn StructuralPageDecoder>,
2450 pub path: VecDeque<u32>,
2469 pub page_index: usize,
2470}
2471
2472pub struct DecodedArray {
2473 pub array: ArrayRef,
2474 pub repdef: CompositeRepDefUnraveler,
2475}
2476
2477pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2478 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2479}
2480
2481pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2482 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2487 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2489 fn data_type(&self) -> &DataType;
2491}
2492
2493#[derive(Debug, Default)]
2494pub struct DecoderPlugins {}
2495
2496pub async fn decode_batch(
2498 batch: &EncodedBatch,
2499 filter: &FilterExpression,
2500 decoder_plugins: Arc<DecoderPlugins>,
2501 should_validate: bool,
2502 version: LanceFileVersion,
2503 cache: Option<Arc<LanceCache>>,
2504) -> Result<RecordBatch> {
2505 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2510 let cache = cache.unwrap_or_else(|| Arc::new(LanceCache::with_capacity(128 * 1024 * 1024)));
2511 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2512 batch.schema.as_ref(),
2513 &batch.top_level_columns,
2514 &batch.page_table,
2515 &vec![],
2516 batch.num_rows,
2517 decoder_plugins,
2518 io_scheduler.clone(),
2519 cache,
2520 filter,
2521 )
2522 .await?;
2523 let (tx, rx) = unbounded_channel();
2524 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2525 let is_structural = version >= LanceFileVersion::V2_1;
2526 let mut decode_stream = create_decode_stream(
2527 &batch.schema,
2528 batch.num_rows,
2529 batch.num_rows as u32,
2530 is_structural,
2531 should_validate,
2532 rx,
2533 );
2534 decode_stream.next().await.unwrap().task.await
2535}
2536
2537#[cfg(test)]
2538mod tests {
2540 use super::*;
2541
2542 #[test]
2543 fn test_coalesce_indices_to_ranges_with_single_index() {
2544 let indices = vec![1];
2545 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2546 assert_eq!(ranges, vec![1..2]);
2547 }
2548
2549 #[test]
2550 fn test_coalesce_indices_to_ranges() {
2551 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2552 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2553 assert_eq!(ranges, vec![1..10]);
2554 }
2555
2556 #[test]
2557 fn test_coalesce_indices_to_ranges_with_gaps() {
2558 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2559 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2560 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2561 }
2562}