1use std::collections::VecDeque;
216use std::sync::Once;
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::{CapacityMode, FileMetadataCache};
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::buffer::LanceBuffer;
238use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239use crate::encoder::{values_column_encoding, EncodedBatch};
240use crate::encodings::logical::binary::BinaryFieldScheduler;
241use crate::encodings::logical::blob::BlobFieldScheduler;
242use crate::encodings::logical::list::{
243 ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
244};
245use crate::encodings::logical::primitive::{
246 PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
247};
248use crate::encodings::logical::r#struct::{
249 SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250};
251use crate::encodings::physical::binary::{
252 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253};
254use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
255use crate::encodings::physical::fsst::{FsstMiniBlockDecompressor, FsstPerValueDecompressor};
256use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
257use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
258use crate::encodings::physical::{ColumnBuffers, FileBuffers};
259use crate::format::pb::{self, column_encoding};
260use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
261use crate::version::LanceFileVersion;
262use crate::{BufferScheduler, EncodingsIo};
263
264const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
266
267#[derive(Debug)]
274pub enum PageEncoding {
275 Legacy(pb::ArrayEncoding),
276 Structural(pb::PageLayout),
277}
278
279impl PageEncoding {
280 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
281 match self {
282 Self::Legacy(enc) => enc,
283 Self::Structural(_) => panic!("Expected a legacy encoding"),
284 }
285 }
286
287 pub fn as_structural(&self) -> &pb::PageLayout {
288 match self {
289 Self::Structural(enc) => enc,
290 Self::Legacy(_) => panic!("Expected a structural encoding"),
291 }
292 }
293
294 pub fn is_structural(&self) -> bool {
295 matches!(self, Self::Structural(_))
296 }
297}
298
299#[derive(Debug)]
303pub struct PageInfo {
304 pub num_rows: u64,
306 pub priority: u64,
310 pub encoding: PageEncoding,
312 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
314}
315
316#[derive(Debug, Clone)]
320pub struct ColumnInfo {
321 pub index: u32,
323 pub page_infos: Arc<[PageInfo]>,
325 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
327 pub encoding: pb::ColumnEncoding,
328}
329
330impl ColumnInfo {
331 pub fn new(
333 index: u32,
334 page_infos: Arc<[PageInfo]>,
335 buffer_offsets_and_sizes: Vec<(u64, u64)>,
336 encoding: pb::ColumnEncoding,
337 ) -> Self {
338 Self {
339 index,
340 page_infos,
341 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
342 encoding,
343 }
344 }
345
346 pub fn is_structural(&self) -> bool {
347 self.page_infos
348 .first()
350 .map(|page| page.encoding.is_structural())
351 .unwrap_or(false)
352 }
353}
354
355enum RootScheduler {
356 Structural(Box<dyn StructuralFieldScheduler>),
357 Legacy(Arc<dyn FieldScheduler>),
358}
359
360impl RootScheduler {
361 fn as_legacy(&self) -> &Arc<dyn FieldScheduler> {
362 match self {
363 Self::Structural(_) => panic!("Expected a legacy scheduler"),
364 Self::Legacy(s) => s,
365 }
366 }
367
368 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
369 match self {
370 Self::Structural(s) => s.as_ref(),
371 Self::Legacy(_) => panic!("Expected a structural scheduler"),
372 }
373 }
374}
375
376pub struct DecodeBatchScheduler {
398 root_scheduler: RootScheduler,
399 pub root_fields: Fields,
400 cache: Arc<FileMetadataCache>,
401}
402
403pub struct ColumnInfoIter<'a> {
404 column_infos: Vec<Arc<ColumnInfo>>,
405 column_indices: &'a [u32],
406 column_info_pos: usize,
407 column_indices_pos: usize,
408}
409
410impl<'a> ColumnInfoIter<'a> {
411 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
412 let initial_pos = column_indices[0] as usize;
413 Self {
414 column_infos,
415 column_indices,
416 column_info_pos: initial_pos,
417 column_indices_pos: 0,
418 }
419 }
420
421 pub fn peek(&self) -> &Arc<ColumnInfo> {
422 &self.column_infos[self.column_info_pos]
423 }
424
425 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
426 let column_info = self.column_infos[self.column_info_pos].clone();
427 let transformed = transform(column_info);
428 self.column_infos[self.column_info_pos] = transformed;
429 }
430
431 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
432 self.next().ok_or_else(|| {
433 Error::invalid_input(
434 "there were more fields in the schema than provided column indices / infos",
435 location!(),
436 )
437 })
438 }
439
440 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
441 if self.column_info_pos < self.column_infos.len() {
442 let info = &self.column_infos[self.column_info_pos];
443 self.column_info_pos += 1;
444 Some(info)
445 } else {
446 None
447 }
448 }
449
450 pub(crate) fn next_top_level(&mut self) {
451 self.column_indices_pos += 1;
452 if self.column_indices_pos < self.column_indices.len() {
453 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
454 } else {
455 self.column_info_pos = self.column_infos.len();
456 }
457 }
458}
459
460pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
461 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
462}
463
464pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
465 fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock>;
467 fn bits_per_value(&self) -> u64;
471}
472
473pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
474 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
476}
477
478pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
479 fn decompress(&self, data: LanceBuffer) -> Result<DataBlock>;
480}
481
482pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
483 fn create_miniblock_decompressor(
484 &self,
485 description: &pb::ArrayEncoding,
486 ) -> Result<Box<dyn MiniBlockDecompressor>>;
487
488 fn create_fixed_per_value_decompressor(
489 &self,
490 description: &pb::ArrayEncoding,
491 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
492
493 fn create_variable_per_value_decompressor(
494 &self,
495 description: &pb::ArrayEncoding,
496 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
497
498 fn create_block_decompressor(
499 &self,
500 description: &pb::ArrayEncoding,
501 ) -> Result<Box<dyn BlockDecompressor>>;
502}
503
504#[derive(Debug, Default)]
505pub struct CoreDecompressorStrategy {}
506
507impl DecompressorStrategy for CoreDecompressorStrategy {
508 fn create_miniblock_decompressor(
509 &self,
510 description: &pb::ArrayEncoding,
511 ) -> Result<Box<dyn MiniBlockDecompressor>> {
512 match description.array_encoding.as_ref().unwrap() {
513 pb::array_encoding::ArrayEncoding::Flat(flat) => {
514 Ok(Box::new(ValueDecompressor::new(flat)))
515 }
516 pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
517 Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
518 }
519 pb::array_encoding::ArrayEncoding::Variable(_) => {
520 Ok(Box::new(BinaryMiniBlockDecompressor::default()))
521 }
522 pb::array_encoding::ArrayEncoding::Fsst(description) => {
523 Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
524 }
525 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
526 Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
527 description,
528 )))
529 }
530 _ => todo!(),
531 }
532 }
533
534 fn create_fixed_per_value_decompressor(
535 &self,
536 description: &pb::ArrayEncoding,
537 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
538 match description.array_encoding.as_ref().unwrap() {
539 pb::array_encoding::ArrayEncoding::Flat(flat) => {
540 Ok(Box::new(ValueDecompressor::new(flat)))
541 }
542 _ => todo!("fixed-per-value decompressor for {:?}", description),
543 }
544 }
545
546 fn create_variable_per_value_decompressor(
547 &self,
548 description: &pb::ArrayEncoding,
549 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
550 match *description.array_encoding.as_ref().unwrap() {
551 pb::array_encoding::ArrayEncoding::Variable(variable) => {
552 assert!(variable.bits_per_offset < u8::MAX as u32);
553 Ok(Box::new(VariableDecoder::default()))
554 }
555 pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
556 Ok(Box::new(FsstPerValueDecompressor::new(
557 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
558 Box::new(VariableDecoder::default()),
559 )))
560 }
561 _ => todo!("variable-per-value decompressor for {:?}", description),
562 }
563 }
564
565 fn create_block_decompressor(
566 &self,
567 description: &pb::ArrayEncoding,
568 ) -> Result<Box<dyn BlockDecompressor>> {
569 match description.array_encoding.as_ref().unwrap() {
570 pb::array_encoding::ArrayEncoding::Flat(flat) => {
571 Ok(Box::new(ValueDecompressor::new(flat)))
572 }
573 pb::array_encoding::ArrayEncoding::Constant(constant) => {
574 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
575 Ok(Box::new(ConstantDecompressor::new(
576 scalar,
577 constant.num_values,
578 )))
579 }
580 pb::array_encoding::ArrayEncoding::Variable(_) => {
581 Ok(Box::new(BinaryBlockDecompressor::default()))
582 }
583 _ => todo!(),
584 }
585 }
586}
587
588#[derive(Debug)]
590pub struct CoreFieldDecoderStrategy {
591 pub validate_data: bool,
592 pub decompressor_strategy: Arc<dyn DecompressorStrategy>,
593}
594
595impl Default for CoreFieldDecoderStrategy {
596 fn default() -> Self {
597 Self {
598 validate_data: false,
599 decompressor_strategy: Arc::new(CoreDecompressorStrategy {}),
600 }
601 }
602}
603
604impl CoreFieldDecoderStrategy {
605 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
608 let column_encoding = column_info
609 .encoding
610 .column_encoding
611 .as_ref()
612 .ok_or_else(|| {
613 Error::invalid_input(
614 format!(
615 "the column at index {} was missing a ColumnEncoding",
616 column_info.index
617 ),
618 location!(),
619 )
620 })?;
621 if matches!(
622 column_encoding,
623 pb::column_encoding::ColumnEncoding::Values(_)
624 ) {
625 Ok(())
626 } else {
627 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
628 }
629 }
630
631 fn is_primitive(data_type: &DataType) -> bool {
632 if data_type.is_primitive() {
633 true
634 } else {
635 match data_type {
636 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
638 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
639 _ => false,
640 }
641 }
642 }
643
644 fn create_primitive_scheduler(
645 &self,
646 field: &Field,
647 column: &ColumnInfo,
648 buffers: FileBuffers,
649 ) -> Result<Box<dyn FieldScheduler>> {
650 Self::ensure_values_encoded(column, &field.name)?;
651 let column_buffers = ColumnBuffers {
653 file_buffers: buffers,
654 positions_and_sizes: &column.buffer_offsets_and_sizes,
655 };
656 Ok(Box::new(PrimitiveFieldScheduler::new(
657 column.index,
658 field.data_type(),
659 column.page_infos.clone(),
660 column_buffers,
661 self.validate_data,
662 )))
663 }
664
665 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
667 Self::ensure_values_encoded(column_info, field_name)?;
668 if column_info.page_infos.len() != 1 {
669 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
670 }
671 let encoding = &column_info.page_infos[0].encoding;
672 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
673 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
674 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
675 }
676 }
677
678 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
679 let encoding = &column_info.page_infos[0].encoding;
680 matches!(
681 encoding.as_legacy().array_encoding.as_ref().unwrap(),
682 pb::array_encoding::ArrayEncoding::PackedStruct(_)
683 )
684 }
685
686 fn create_list_scheduler(
687 &self,
688 list_field: &Field,
689 column_infos: &mut ColumnInfoIter,
690 buffers: FileBuffers,
691 offsets_column: &ColumnInfo,
692 ) -> Result<Box<dyn FieldScheduler>> {
693 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
694 let offsets_column_buffers = ColumnBuffers {
695 file_buffers: buffers,
696 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
697 };
698 let items_scheduler =
699 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
700
701 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
702 .page_infos
703 .iter()
704 .filter(|offsets_page| offsets_page.num_rows > 0)
705 .map(|offsets_page| {
706 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
707 &offsets_page.encoding.as_legacy().array_encoding
708 {
709 let inner = PageInfo {
710 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
711 encoding: PageEncoding::Legacy(
712 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
713 ),
714 num_rows: offsets_page.num_rows,
715 priority: 0,
716 };
717 (
718 inner,
719 OffsetPageInfo {
720 offsets_in_page: offsets_page.num_rows,
721 null_offset_adjustment: list_encoding.null_offset_adjustment,
722 num_items_referenced_by_page: list_encoding.num_items,
723 },
724 )
725 } else {
726 panic!("Expected a list column");
728 }
729 })
730 .unzip();
731 let inner = Arc::new(PrimitiveFieldScheduler::new(
732 offsets_column.index,
733 DataType::UInt64,
734 Arc::from(inner_infos.into_boxed_slice()),
735 offsets_column_buffers,
736 self.validate_data,
737 )) as Arc<dyn FieldScheduler>;
738 let items_field = match list_field.data_type() {
739 DataType::List(inner) => inner,
740 DataType::LargeList(inner) => inner,
741 _ => unreachable!(),
742 };
743 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
744 DataType::Int32
745 } else {
746 DataType::Int64
747 };
748 Ok(Box::new(ListFieldScheduler::new(
749 inner,
750 items_scheduler.into(),
751 items_field,
752 offset_type,
753 null_offset_adjustments,
754 )))
755 }
756
757 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
758 if let column_encoding::ColumnEncoding::Blob(blob) =
759 column_info.encoding.column_encoding.as_ref().unwrap()
760 {
761 let mut column_info = column_info.clone();
762 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
763 Some(column_info)
764 } else {
765 None
766 }
767 }
768
769 fn items_per_row(data_type: &DataType) -> u64 {
770 match data_type {
771 DataType::FixedSizeList(inner, dimension) => {
772 Self::items_per_row(inner.data_type()) * *dimension as u64
773 }
774 _ => 1,
775 }
776 }
777
778 fn create_structural_field_scheduler(
779 &self,
780 field: &Field,
781 column_infos: &mut ColumnInfoIter,
782 ) -> Result<Box<dyn StructuralFieldScheduler>> {
783 let data_type = field.data_type();
784 if Self::is_primitive(&data_type) {
785 let column_info = column_infos.expect_next()?;
786 let items_per_row = Self::items_per_row(&data_type);
787 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
788 column_info.as_ref(),
789 items_per_row,
790 self.decompressor_strategy.as_ref(),
791 )?);
792
793 column_infos.next_top_level();
795
796 return Ok(scheduler);
797 }
798 match &data_type {
799 DataType::Struct(fields) => {
800 if field.is_packed_struct() {
801 let column_info = column_infos.expect_next()?;
802 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
803 column_info.as_ref(),
804 1, self.decompressor_strategy.as_ref(),
806 )?);
807
808 column_infos.next_top_level();
810
811 return Ok(scheduler);
812 }
813 let mut child_schedulers = Vec::with_capacity(field.children.len());
814 for field in field.children.iter() {
815 let field_scheduler =
816 self.create_structural_field_scheduler(field, column_infos)?;
817 child_schedulers.push(field_scheduler);
818 }
819
820 let fields = fields.clone();
821 Ok(
822 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
823 as Box<dyn StructuralFieldScheduler>,
824 )
825 }
826 DataType::Binary | DataType::Utf8 => {
827 let column_info = column_infos.expect_next()?;
828 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
829 column_info.as_ref(),
830 1,
831 self.decompressor_strategy.as_ref(),
832 )?);
833 column_infos.next_top_level();
834 Ok(scheduler)
835 }
836 DataType::List(_) | DataType::LargeList(_) => {
837 let child = field
838 .children
839 .first()
840 .expect("List field must have a child");
841 let child_scheduler =
842 self.create_structural_field_scheduler(child, column_infos)?;
843 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
844 as Box<dyn StructuralFieldScheduler>)
845 }
846 _ => todo!(),
847 }
848 }
849
850 fn create_legacy_field_scheduler(
851 &self,
852 field: &Field,
853 column_infos: &mut ColumnInfoIter,
854 buffers: FileBuffers,
855 ) -> Result<Box<dyn FieldScheduler>> {
856 let data_type = field.data_type();
857 if Self::is_primitive(&data_type) {
858 let column_info = column_infos.expect_next()?;
859 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
860 return Ok(scheduler);
861 } else if data_type.is_binary_like() {
862 let column_info = column_infos.next().unwrap().clone();
863 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
865 let desc_scheduler =
866 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
867 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
868 return Ok(blob_scheduler);
869 }
870 if let Some(page_info) = column_info.page_infos.first() {
871 if matches!(
872 page_info.encoding.as_legacy(),
873 pb::ArrayEncoding {
874 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
875 }
876 ) {
877 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
878 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
879 } else {
880 DataType::LargeList(Arc::new(ArrowField::new(
881 "item",
882 DataType::UInt8,
883 false,
884 )))
885 };
886 let list_field = Field::try_from(ArrowField::new(
887 field.name.clone(),
888 list_type,
889 field.nullable,
890 ))
891 .unwrap();
892 let list_scheduler = self.create_list_scheduler(
893 &list_field,
894 column_infos,
895 buffers,
896 &column_info,
897 )?;
898 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
899 list_scheduler.into(),
900 field.data_type(),
901 ));
902 return Ok(binary_scheduler);
903 } else {
904 let scheduler =
905 self.create_primitive_scheduler(field, &column_info, buffers)?;
906 return Ok(scheduler);
907 }
908 } else {
909 return self.create_primitive_scheduler(field, &column_info, buffers);
910 }
911 }
912 match &data_type {
913 DataType::FixedSizeList(inner, _dimension) => {
914 if Self::is_primitive(inner.data_type()) {
917 let primitive_col = column_infos.expect_next()?;
918 let scheduler =
919 self.create_primitive_scheduler(field, primitive_col, buffers)?;
920 Ok(scheduler)
921 } else {
922 todo!()
923 }
924 }
925 DataType::Dictionary(_key_type, value_type) => {
926 if Self::is_primitive(value_type) || value_type.is_binary_like() {
927 let primitive_col = column_infos.expect_next()?;
928 let scheduler =
929 self.create_primitive_scheduler(field, primitive_col, buffers)?;
930 Ok(scheduler)
931 } else {
932 Err(Error::NotSupported {
933 source: format!(
934 "No way to decode into a dictionary field of type {}",
935 value_type
936 )
937 .into(),
938 location: location!(),
939 })
940 }
941 }
942 DataType::List(_) | DataType::LargeList(_) => {
943 let offsets_column = column_infos.expect_next()?.clone();
944 column_infos.next_top_level();
945 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
946 }
947 DataType::Struct(fields) => {
948 let column_info = column_infos.expect_next()?;
949
950 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
952 return self.create_primitive_scheduler(field, &blob_col, buffers);
954 }
955
956 if Self::check_packed_struct(column_info) {
957 self.create_primitive_scheduler(field, column_info, buffers)
959 } else {
960 Self::check_simple_struct(column_info, &field.name).unwrap();
962 let num_rows = column_info
963 .page_infos
964 .iter()
965 .map(|page| page.num_rows)
966 .sum();
967 let mut child_schedulers = Vec::with_capacity(field.children.len());
968 for field in &field.children {
969 column_infos.next_top_level();
970 let field_scheduler =
971 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
972 child_schedulers.push(Arc::from(field_scheduler));
973 }
974
975 let fields = fields.clone();
976 Ok(Box::new(SimpleStructScheduler::new(
977 child_schedulers,
978 fields,
979 num_rows,
980 )))
981 }
982 }
983 _ => todo!(),
985 }
986 }
987}
988
989fn root_column(num_rows: u64) -> ColumnInfo {
991 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
992 let final_page_num_rows = num_rows % (u32::MAX as u64);
993 let root_pages = (0..num_root_pages)
994 .map(|i| PageInfo {
995 num_rows: if i == num_root_pages - 1 {
996 final_page_num_rows
997 } else {
998 u64::MAX
999 },
1000 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
1001 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
1002 pb::SimpleStruct {},
1003 )),
1004 }),
1005 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
1007 })
1008 .collect::<Vec<_>>();
1009 ColumnInfo {
1010 buffer_offsets_and_sizes: Arc::new([]),
1011 encoding: values_column_encoding(),
1012 index: u32::MAX,
1013 page_infos: Arc::from(root_pages),
1014 }
1015}
1016
1017pub enum RootDecoder {
1018 Structural(StructuralStructDecoder),
1019 Legacy(SimpleStructDecoder),
1020}
1021
1022impl RootDecoder {
1023 pub fn into_structural(self) -> StructuralStructDecoder {
1024 match self {
1025 Self::Structural(decoder) => decoder,
1026 Self::Legacy(_) => panic!("Expected a structural decoder"),
1027 }
1028 }
1029
1030 pub fn into_legacy(self) -> SimpleStructDecoder {
1031 match self {
1032 Self::Legacy(decoder) => decoder,
1033 Self::Structural(_) => panic!("Expected a legacy decoder"),
1034 }
1035 }
1036}
1037
1038impl DecodeBatchScheduler {
1039 #[allow(clippy::too_many_arguments)]
1042 pub async fn try_new<'a>(
1043 schema: &'a Schema,
1044 column_indices: &[u32],
1045 column_infos: &[Arc<ColumnInfo>],
1046 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1047 num_rows: u64,
1048 _decoder_plugins: Arc<DecoderPlugins>,
1049 io: Arc<dyn EncodingsIo>,
1050 cache: Arc<FileMetadataCache>,
1051 filter: &FilterExpression,
1052 ) -> Result<Self> {
1053 assert!(num_rows > 0);
1054 let buffers = FileBuffers {
1055 positions_and_sizes: file_buffer_positions_and_sizes,
1056 };
1057 let arrow_schema = ArrowSchema::from(schema);
1058 let root_fields = arrow_schema.fields().clone();
1059 let root_type = DataType::Struct(root_fields.clone());
1060 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1061 root_field.children.clone_from(&schema.fields);
1065 root_field
1066 .metadata
1067 .insert("__lance_decoder_root".to_string(), "true".to_string());
1068
1069 if column_infos[0].is_structural() {
1070 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1071
1072 let mut root_scheduler = CoreFieldDecoderStrategy::default()
1073 .create_structural_field_scheduler(&root_field, &mut column_iter)?;
1074
1075 let context = SchedulerContext::new(io, cache.clone());
1076 root_scheduler.initialize(filter, &context).await?;
1077
1078 Ok(Self {
1079 root_scheduler: RootScheduler::Structural(root_scheduler),
1080 root_fields,
1081 cache,
1082 })
1083 } else {
1084 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1087 columns.push(Arc::new(root_column(num_rows)));
1088 columns.extend(column_infos.iter().cloned());
1089
1090 let adjusted_column_indices = [0_u32]
1091 .into_iter()
1092 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1093 .collect::<Vec<_>>();
1094 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1095 let root_scheduler = CoreFieldDecoderStrategy::default()
1096 .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1097
1098 let context = SchedulerContext::new(io, cache.clone());
1099 root_scheduler.initialize(filter, &context).await?;
1100
1101 Ok(Self {
1102 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1103 root_fields,
1104 cache,
1105 })
1106 }
1107 }
1108
1109 pub fn from_scheduler(
1110 root_scheduler: Arc<dyn FieldScheduler>,
1111 root_fields: Fields,
1112 cache: Arc<FileMetadataCache>,
1113 ) -> Self {
1114 Self {
1115 root_scheduler: RootScheduler::Legacy(root_scheduler),
1116 root_fields,
1117 cache,
1118 }
1119 }
1120
1121 fn do_schedule_ranges_structural(
1122 &mut self,
1123 ranges: &[Range<u64>],
1124 filter: &FilterExpression,
1125 io: Arc<dyn EncodingsIo>,
1126 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1127 ) {
1128 let root_scheduler = self.root_scheduler.as_structural();
1129 let mut context = SchedulerContext::new(io, self.cache.clone());
1130 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1131 if let Err(schedule_ranges_err) = maybe_root_job {
1132 schedule_action(Err(schedule_ranges_err));
1133 return;
1134 }
1135 let mut root_job = maybe_root_job.unwrap();
1136 let mut num_rows_scheduled = 0;
1137 loop {
1138 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1139 if let Err(err) = maybe_next_scan_line {
1140 schedule_action(Err(err));
1141 return;
1142 }
1143 let next_scan_line = maybe_next_scan_line.unwrap();
1144 match next_scan_line {
1145 Some(next_scan_line) => {
1146 trace!(
1147 "Scheduled scan line of {} rows and {} decoders",
1148 next_scan_line.rows_scheduled,
1149 next_scan_line.decoders.len()
1150 );
1151 num_rows_scheduled += next_scan_line.rows_scheduled;
1152 if !schedule_action(Ok(DecoderMessage {
1153 scheduled_so_far: num_rows_scheduled,
1154 decoders: next_scan_line.decoders,
1155 })) {
1156 return;
1158 }
1159 }
1160 None => return,
1161 }
1162 }
1163 }
1164
1165 fn do_schedule_ranges_legacy(
1166 &mut self,
1167 ranges: &[Range<u64>],
1168 filter: &FilterExpression,
1169 io: Arc<dyn EncodingsIo>,
1170 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1171 priority: Option<Box<dyn PriorityRange>>,
1175 ) {
1176 let root_scheduler = self.root_scheduler.as_legacy();
1177 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1178 trace!(
1179 "Scheduling {} ranges across {}..{} ({} rows){}",
1180 ranges.len(),
1181 ranges.first().unwrap().start,
1182 ranges.last().unwrap().end,
1183 rows_requested,
1184 priority
1185 .as_ref()
1186 .map(|p| format!(" (priority={:?})", p))
1187 .unwrap_or_default()
1188 );
1189
1190 let mut context = SchedulerContext::new(io, self.cache.clone());
1191 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1192 if let Err(schedule_ranges_err) = maybe_root_job {
1193 schedule_action(Err(schedule_ranges_err));
1194 return;
1195 }
1196 let mut root_job = maybe_root_job.unwrap();
1197 let mut num_rows_scheduled = 0;
1198 let mut rows_to_schedule = root_job.num_rows();
1199 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1200 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1201 while rows_to_schedule > 0 {
1202 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1203 if let Err(schedule_next_err) = maybe_next_scan_line {
1204 schedule_action(Err(schedule_next_err));
1205 return;
1206 }
1207 let next_scan_line = maybe_next_scan_line.unwrap();
1208 priority.advance(next_scan_line.rows_scheduled);
1209 num_rows_scheduled += next_scan_line.rows_scheduled;
1210 rows_to_schedule -= next_scan_line.rows_scheduled;
1211 trace!(
1212 "Scheduled scan line of {} rows and {} decoders",
1213 next_scan_line.rows_scheduled,
1214 next_scan_line.decoders.len()
1215 );
1216 if !schedule_action(Ok(DecoderMessage {
1217 scheduled_so_far: num_rows_scheduled,
1218 decoders: next_scan_line.decoders,
1219 })) {
1220 return;
1222 }
1223
1224 trace!("Finished scheduling {} ranges", ranges.len());
1225 }
1226 }
1227
1228 fn do_schedule_ranges(
1229 &mut self,
1230 ranges: &[Range<u64>],
1231 filter: &FilterExpression,
1232 io: Arc<dyn EncodingsIo>,
1233 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1234 priority: Option<Box<dyn PriorityRange>>,
1238 ) {
1239 match &self.root_scheduler {
1240 RootScheduler::Legacy(_) => {
1241 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1242 }
1243 RootScheduler::Structural(_) => {
1244 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1245 }
1246 }
1247 }
1248
1249 pub fn schedule_ranges_to_vec(
1252 &mut self,
1253 ranges: &[Range<u64>],
1254 filter: &FilterExpression,
1255 io: Arc<dyn EncodingsIo>,
1256 priority: Option<Box<dyn PriorityRange>>,
1257 ) -> Result<Vec<DecoderMessage>> {
1258 let mut decode_messages = Vec::new();
1259 self.do_schedule_ranges(
1260 ranges,
1261 filter,
1262 io,
1263 |msg| {
1264 decode_messages.push(msg);
1265 true
1266 },
1267 priority,
1268 );
1269 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1270 }
1271
1272 #[instrument(skip_all)]
1282 pub fn schedule_ranges(
1283 &mut self,
1284 ranges: &[Range<u64>],
1285 filter: &FilterExpression,
1286 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1287 scheduler: Arc<dyn EncodingsIo>,
1288 ) {
1289 self.do_schedule_ranges(
1290 ranges,
1291 filter,
1292 scheduler,
1293 |msg| {
1294 match sink.send(msg) {
1295 Ok(_) => true,
1296 Err(SendError { .. }) => {
1297 debug!(
1300 "schedule_ranges aborting early since decoder appears to have been dropped"
1301 );
1302 false
1303 }
1304 }
1305 },
1306 None,
1307 )
1308 }
1309
1310 #[instrument(skip_all)]
1318 pub fn schedule_range(
1319 &mut self,
1320 range: Range<u64>,
1321 filter: &FilterExpression,
1322 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1323 scheduler: Arc<dyn EncodingsIo>,
1324 ) {
1325 self.schedule_ranges(&[range], filter, sink, scheduler)
1326 }
1327
1328 pub fn schedule_take(
1336 &mut self,
1337 indices: &[u64],
1338 filter: &FilterExpression,
1339 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1340 scheduler: Arc<dyn EncodingsIo>,
1341 ) {
1342 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1343 if indices.is_empty() {
1344 return;
1345 }
1346 trace!("Scheduling take of {} rows", indices.len());
1347 let ranges = Self::indices_to_ranges(indices);
1348 self.schedule_ranges(&ranges, filter, sink, scheduler)
1349 }
1350
1351 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1353 let mut ranges = Vec::new();
1354 let mut start = indices[0];
1355
1356 for window in indices.windows(2) {
1357 if window[1] != window[0] + 1 {
1358 ranges.push(start..window[0] + 1);
1359 start = window[1];
1360 }
1361 }
1362
1363 ranges.push(start..*indices.last().unwrap() + 1);
1364 ranges
1365 }
1366}
1367
1368pub struct ReadBatchTask {
1369 pub task: BoxFuture<'static, Result<RecordBatch>>,
1370 pub num_rows: u32,
1371}
1372
1373pub struct BatchDecodeStream {
1375 context: DecoderContext,
1376 root_decoder: SimpleStructDecoder,
1377 rows_remaining: u64,
1378 rows_per_batch: u32,
1379 rows_scheduled: u64,
1380 rows_drained: u64,
1381 scheduler_exhausted: bool,
1382 emitted_batch_size_warning: Arc<Once>,
1383}
1384
1385impl BatchDecodeStream {
1386 pub fn new(
1397 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1398 rows_per_batch: u32,
1399 num_rows: u64,
1400 root_decoder: SimpleStructDecoder,
1401 ) -> Self {
1402 Self {
1403 context: DecoderContext::new(scheduled),
1404 root_decoder,
1405 rows_remaining: num_rows,
1406 rows_per_batch,
1407 rows_scheduled: 0,
1408 rows_drained: 0,
1409 scheduler_exhausted: false,
1410 emitted_batch_size_warning: Arc::new(Once::new()),
1411 }
1412 }
1413
1414 fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> {
1415 if decoder.path.is_empty() {
1416 Ok(())
1418 } else {
1419 self.root_decoder.accept_child(decoder)
1420 }
1421 }
1422
1423 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1424 if self.scheduler_exhausted {
1425 return Ok(self.rows_scheduled);
1426 }
1427 while self.rows_scheduled < scheduled_need {
1428 let next_message = self.context.source.recv().await;
1429 match next_message {
1430 Some(scan_line) => {
1431 let scan_line = scan_line?;
1432 self.rows_scheduled = scan_line.scheduled_so_far;
1433 for message in scan_line.decoders {
1434 self.accept_decoder(message.into_legacy())?;
1435 }
1436 }
1437 None => {
1438 self.scheduler_exhausted = true;
1442 return Ok(self.rows_scheduled);
1443 }
1444 }
1445 }
1446 Ok(scheduled_need)
1447 }
1448
1449 #[instrument(level = "debug", skip_all)]
1450 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1451 trace!(
1452 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1453 self.rows_remaining,
1454 self.rows_drained,
1455 self.rows_scheduled,
1456 );
1457 if self.rows_remaining == 0 {
1458 return Ok(None);
1459 }
1460
1461 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1462 self.rows_remaining -= to_take;
1463
1464 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1465 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1466 if scheduled_need > 0 {
1467 let desired_scheduled = scheduled_need + self.rows_scheduled;
1468 trace!(
1469 "Draining from scheduler (desire at least {} scheduled rows)",
1470 desired_scheduled
1471 );
1472 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1473 if actually_scheduled < desired_scheduled {
1474 let under_scheduled = desired_scheduled - actually_scheduled;
1475 to_take -= under_scheduled;
1476 }
1477 }
1478
1479 if to_take == 0 {
1480 return Ok(None);
1481 }
1482
1483 let loaded_need = self.rows_drained + to_take - 1;
1485 trace!(
1486 "Waiting for I/O (desire at least {} fully loaded rows)",
1487 loaded_need
1488 );
1489 self.root_decoder.wait_for_loaded(loaded_need).await?;
1490
1491 let next_task = self.root_decoder.drain(to_take)?;
1492 self.rows_drained += to_take;
1493 Ok(Some(next_task))
1494 }
1495
1496 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1497 let stream = futures::stream::unfold(self, |mut slf| async move {
1498 let next_task = slf.next_batch_task().await;
1499 let next_task = next_task.transpose().map(|next_task| {
1500 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1501 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1502 let task = tokio::spawn(async move {
1503 let next_task = next_task?;
1504 next_task.into_batch(emitted_batch_size_warning)
1505 });
1506 (task, num_rows)
1507 });
1508 next_task.map(|(task, num_rows)| {
1509 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1510 debug_assert!(num_rows <= u32::MAX as u64);
1512 let next_task = ReadBatchTask {
1513 task,
1514 num_rows: num_rows as u32,
1515 };
1516 (next_task, slf)
1517 })
1518 });
1519 stream.boxed()
1520 }
1521}
1522
1523enum RootDecoderMessage {
1526 LoadedPage(LoadedPage),
1527 LegacyPage(DecoderReady),
1528}
1529trait RootDecoderType {
1530 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1531 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1532 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1533}
1534impl RootDecoderType for StructuralStructDecoder {
1535 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1536 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1537 unreachable!()
1538 };
1539 self.accept_page(loaded_page)
1540 }
1541 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1542 self.drain_batch_task(num_rows)
1543 }
1544 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1545 Ok(())
1547 }
1548}
1549impl RootDecoderType for SimpleStructDecoder {
1550 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1551 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1552 unreachable!()
1553 };
1554 self.accept_child(legacy_page)
1555 }
1556 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1557 self.drain(num_rows)
1558 }
1559 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1560 runtime.block_on(self.wait_for_loaded(loaded_need))
1561 }
1562}
1563
1564struct BatchDecodeIterator<T: RootDecoderType> {
1566 messages: VecDeque<Result<DecoderMessage>>,
1567 root_decoder: T,
1568 rows_remaining: u64,
1569 rows_per_batch: u32,
1570 rows_scheduled: u64,
1571 rows_drained: u64,
1572 emitted_batch_size_warning: Arc<Once>,
1573 wait_for_io_runtime: tokio::runtime::Runtime,
1577 schema: Arc<ArrowSchema>,
1578}
1579
1580impl<T: RootDecoderType> BatchDecodeIterator<T> {
1581 pub fn new(
1583 messages: VecDeque<Result<DecoderMessage>>,
1584 rows_per_batch: u32,
1585 num_rows: u64,
1586 root_decoder: T,
1587 schema: Arc<ArrowSchema>,
1588 ) -> Self {
1589 Self {
1590 messages,
1591 root_decoder,
1592 rows_remaining: num_rows,
1593 rows_per_batch,
1594 rows_scheduled: 0,
1595 rows_drained: 0,
1596 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1597 .build()
1598 .unwrap(),
1599 emitted_batch_size_warning: Arc::new(Once::new()),
1600 schema,
1601 }
1602 }
1603
1604 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1609 match maybe_done(unloaded_page.0) {
1610 MaybeDone::Done(loaded_page) => loaded_page,
1612 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1614 MaybeDone::Gone => unreachable!(),
1615 }
1616 }
1617
1618 #[instrument(skip_all)]
1623 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1624 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1625 let message = self.messages.pop_front().unwrap()?;
1626 self.rows_scheduled = message.scheduled_so_far;
1627 for decoder_message in message.decoders {
1628 match decoder_message {
1629 MessageType::UnloadedPage(unloaded_page) => {
1630 let loaded_page = self.wait_for_page(unloaded_page)?;
1631 self.root_decoder
1632 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1633 }
1634 MessageType::DecoderReady(decoder_ready) => {
1635 if !decoder_ready.path.is_empty() {
1637 self.root_decoder
1638 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1639 }
1640 }
1641 }
1642 }
1643 }
1644
1645 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1646
1647 self.root_decoder
1648 .wait(loaded_need, &self.wait_for_io_runtime)?;
1649 Ok(self.rows_scheduled)
1650 }
1651
1652 #[instrument(level = "debug", skip_all)]
1653 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1654 trace!(
1655 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1656 self.rows_remaining,
1657 self.rows_drained,
1658 self.rows_scheduled,
1659 );
1660 if self.rows_remaining == 0 {
1661 return Ok(None);
1662 }
1663
1664 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1665 self.rows_remaining -= to_take;
1666
1667 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1668 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1669 if scheduled_need > 0 {
1670 let desired_scheduled = scheduled_need + self.rows_scheduled;
1671 trace!(
1672 "Draining from scheduler (desire at least {} scheduled rows)",
1673 desired_scheduled
1674 );
1675 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1676 if actually_scheduled < desired_scheduled {
1677 let under_scheduled = desired_scheduled - actually_scheduled;
1678 to_take -= under_scheduled;
1679 }
1680 }
1681
1682 if to_take == 0 {
1683 return Ok(None);
1684 }
1685
1686 let next_task = self.root_decoder.drain_batch(to_take)?;
1687
1688 self.rows_drained += to_take;
1689
1690 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1691
1692 Ok(Some(batch))
1693 }
1694}
1695
1696impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1697 type Item = ArrowResult<RecordBatch>;
1698
1699 fn next(&mut self) -> Option<Self::Item> {
1700 self.next_batch_task()
1701 .transpose()
1702 .map(|r| r.map_err(ArrowError::from))
1703 }
1704}
1705
1706impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1707 fn schema(&self) -> Arc<ArrowSchema> {
1708 self.schema.clone()
1709 }
1710}
1711
1712pub struct StructuralBatchDecodeStream {
1714 context: DecoderContext,
1715 root_decoder: StructuralStructDecoder,
1716 rows_remaining: u64,
1717 rows_per_batch: u32,
1718 rows_scheduled: u64,
1719 rows_drained: u64,
1720 scheduler_exhausted: bool,
1721 emitted_batch_size_warning: Arc<Once>,
1722}
1723
1724impl StructuralBatchDecodeStream {
1725 pub fn new(
1736 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1737 rows_per_batch: u32,
1738 num_rows: u64,
1739 root_decoder: StructuralStructDecoder,
1740 ) -> Self {
1741 Self {
1742 context: DecoderContext::new(scheduled),
1743 root_decoder,
1744 rows_remaining: num_rows,
1745 rows_per_batch,
1746 rows_scheduled: 0,
1747 rows_drained: 0,
1748 scheduler_exhausted: false,
1749 emitted_batch_size_warning: Arc::new(Once::new()),
1750 }
1751 }
1752
1753 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1754 if self.scheduler_exhausted {
1755 return Ok(self.rows_scheduled);
1756 }
1757 while self.rows_scheduled < scheduled_need {
1758 let next_message = self.context.source.recv().await;
1759 match next_message {
1760 Some(scan_line) => {
1761 let scan_line = scan_line?;
1762 self.rows_scheduled = scan_line.scheduled_so_far;
1763 for message in scan_line.decoders {
1764 let unloaded_page = message.into_structural();
1765 let loaded_page = unloaded_page.0.await?;
1766 self.root_decoder.accept_page(loaded_page)?;
1767 }
1768 }
1769 None => {
1770 self.scheduler_exhausted = true;
1774 return Ok(self.rows_scheduled);
1775 }
1776 }
1777 }
1778 Ok(scheduled_need)
1779 }
1780
1781 #[instrument(level = "debug", skip_all)]
1782 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1783 trace!(
1784 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1785 self.rows_remaining,
1786 self.rows_drained,
1787 self.rows_scheduled,
1788 );
1789 if self.rows_remaining == 0 {
1790 return Ok(None);
1791 }
1792
1793 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1794 self.rows_remaining -= to_take;
1795
1796 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1797 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1798 if scheduled_need > 0 {
1799 let desired_scheduled = scheduled_need + self.rows_scheduled;
1800 trace!(
1801 "Draining from scheduler (desire at least {} scheduled rows)",
1802 desired_scheduled
1803 );
1804 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1805 if actually_scheduled < desired_scheduled {
1806 let under_scheduled = desired_scheduled - actually_scheduled;
1807 to_take -= under_scheduled;
1808 }
1809 }
1810
1811 if to_take == 0 {
1812 return Ok(None);
1813 }
1814
1815 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1816 self.rows_drained += to_take;
1817 Ok(Some(next_task))
1818 }
1819
1820 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1821 let stream = futures::stream::unfold(self, |mut slf| async move {
1822 let next_task = slf.next_batch_task().await;
1823 let next_task = next_task.transpose().map(|next_task| {
1824 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1825 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1826 let task = tokio::spawn(async move {
1827 let next_task = next_task?;
1828 next_task.into_batch(emitted_batch_size_warning)
1829 });
1830 (task, num_rows)
1831 });
1832 next_task.map(|(task, num_rows)| {
1833 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1834 debug_assert!(num_rows <= u32::MAX as u64);
1836 let next_task = ReadBatchTask {
1837 task,
1838 num_rows: num_rows as u32,
1839 };
1840 (next_task, slf)
1841 })
1842 });
1843 stream.boxed()
1844 }
1845}
1846
1847#[derive(Debug)]
1848pub enum RequestedRows {
1849 Ranges(Vec<Range<u64>>),
1850 Indices(Vec<u64>),
1851}
1852
1853impl RequestedRows {
1854 pub fn num_rows(&self) -> u64 {
1855 match self {
1856 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1857 Self::Indices(indices) => indices.len() as u64,
1858 }
1859 }
1860}
1861
1862#[derive(Debug, Clone)]
1863pub struct SchedulerDecoderConfig {
1864 pub decoder_plugins: Arc<DecoderPlugins>,
1865 pub batch_size: u32,
1866 pub io: Arc<dyn EncodingsIo>,
1867 pub cache: Arc<FileMetadataCache>,
1868 pub should_validate: bool,
1869}
1870
1871fn check_scheduler_on_drop(
1872 stream: BoxStream<'static, ReadBatchTask>,
1873 scheduler_handle: tokio::task::JoinHandle<()>,
1874) -> BoxStream<'static, ReadBatchTask> {
1875 let mut scheduler_handle = Some(scheduler_handle);
1879 let check_scheduler = stream::unfold((), move |_| {
1880 let handle = scheduler_handle.take();
1881 async move {
1882 if let Some(handle) = handle {
1883 handle.await.unwrap();
1884 }
1885 None
1886 }
1887 });
1888 stream.chain(check_scheduler).boxed()
1889}
1890
1891pub fn create_decode_stream(
1892 schema: &Schema,
1893 num_rows: u64,
1894 batch_size: u32,
1895 is_structural: bool,
1896 should_validate: bool,
1897 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1898) -> BoxStream<'static, ReadBatchTask> {
1899 if is_structural {
1900 let arrow_schema = ArrowSchema::from(schema);
1901 let structural_decoder = StructuralStructDecoder::new(
1902 arrow_schema.fields,
1903 should_validate,
1904 true,
1905 );
1906 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1907 } else {
1908 let arrow_schema = ArrowSchema::from(schema);
1909 let root_fields = arrow_schema.fields;
1910
1911 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1912 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1913 }
1914}
1915
1916pub fn create_decode_iterator(
1920 schema: &Schema,
1921 num_rows: u64,
1922 batch_size: u32,
1923 should_validate: bool,
1924 is_structural: bool,
1925 messages: VecDeque<Result<DecoderMessage>>,
1926) -> Box<dyn RecordBatchReader + Send + 'static> {
1927 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1928 let root_fields = arrow_schema.fields.clone();
1929 if is_structural {
1930 let simple_struct_decoder =
1931 StructuralStructDecoder::new(root_fields, should_validate, true);
1932 Box::new(BatchDecodeIterator::new(
1933 messages,
1934 batch_size,
1935 num_rows,
1936 simple_struct_decoder,
1937 arrow_schema,
1938 ))
1939 } else {
1940 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1941 Box::new(BatchDecodeIterator::new(
1942 messages,
1943 batch_size,
1944 num_rows,
1945 root_decoder,
1946 arrow_schema,
1947 ))
1948 }
1949}
1950
1951fn create_scheduler_decoder(
1952 column_infos: Vec<Arc<ColumnInfo>>,
1953 requested_rows: RequestedRows,
1954 filter: FilterExpression,
1955 column_indices: Vec<u32>,
1956 target_schema: Arc<Schema>,
1957 config: SchedulerDecoderConfig,
1958) -> Result<BoxStream<'static, ReadBatchTask>> {
1959 let num_rows = requested_rows.num_rows();
1960
1961 let is_structural = column_infos[0].is_structural();
1962
1963 let (tx, rx) = mpsc::unbounded_channel();
1964
1965 let decode_stream = create_decode_stream(
1966 &target_schema,
1967 num_rows,
1968 config.batch_size,
1969 is_structural,
1970 config.should_validate,
1971 rx,
1972 );
1973
1974 let scheduler_handle = tokio::task::spawn(async move {
1975 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1976 target_schema.as_ref(),
1977 &column_indices,
1978 &column_infos,
1979 &vec![],
1980 num_rows,
1981 config.decoder_plugins,
1982 config.io.clone(),
1983 config.cache,
1984 &filter,
1985 )
1986 .await
1987 {
1988 Ok(scheduler) => scheduler,
1989 Err(e) => {
1990 let _ = tx.send(Err(e));
1991 return;
1992 }
1993 };
1994
1995 match requested_rows {
1996 RequestedRows::Ranges(ranges) => {
1997 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1998 }
1999 RequestedRows::Indices(indices) => {
2000 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2001 }
2002 }
2003 });
2004
2005 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2006}
2007
2008pub fn schedule_and_decode(
2014 column_infos: Vec<Arc<ColumnInfo>>,
2015 requested_rows: RequestedRows,
2016 filter: FilterExpression,
2017 column_indices: Vec<u32>,
2018 target_schema: Arc<Schema>,
2019 config: SchedulerDecoderConfig,
2020) -> BoxStream<'static, ReadBatchTask> {
2021 if requested_rows.num_rows() == 0 {
2022 return stream::empty().boxed();
2023 }
2024 match create_scheduler_decoder(
2028 column_infos,
2029 requested_rows,
2030 filter,
2031 column_indices,
2032 target_schema,
2033 config,
2034 ) {
2035 Ok(stream) => stream,
2037 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2038 num_rows: 0,
2039 task: std::future::ready(Err(e)).boxed(),
2040 }))
2041 .boxed(),
2042 }
2043}
2044
2045lazy_static::lazy_static! {
2046 pub static ref WAITER_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
2047 .build()
2048 .unwrap();
2049}
2050
2051pub fn schedule_and_decode_blocking(
2066 column_infos: Vec<Arc<ColumnInfo>>,
2067 requested_rows: RequestedRows,
2068 filter: FilterExpression,
2069 column_indices: Vec<u32>,
2070 target_schema: Arc<Schema>,
2071 config: SchedulerDecoderConfig,
2072) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2073 if requested_rows.num_rows() == 0 {
2074 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2075 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2076 }
2077
2078 let num_rows = requested_rows.num_rows();
2079 let is_structural = column_infos[0].is_structural();
2080
2081 let (tx, mut rx) = mpsc::unbounded_channel();
2082
2083 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2086 target_schema.as_ref(),
2087 &column_indices,
2088 &column_infos,
2089 &vec![],
2090 num_rows,
2091 config.decoder_plugins,
2092 config.io.clone(),
2093 config.cache,
2094 &filter,
2095 ))?;
2096
2097 match requested_rows {
2099 RequestedRows::Ranges(ranges) => {
2100 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2101 }
2102 RequestedRows::Indices(indices) => {
2103 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2104 }
2105 }
2106
2107 let mut messages = Vec::new();
2109 while rx
2110 .recv_many(&mut messages, usize::MAX)
2111 .now_or_never()
2112 .unwrap()
2113 != 0
2114 {}
2115
2116 let decode_iterator = create_decode_iterator(
2118 &target_schema,
2119 num_rows,
2120 config.batch_size,
2121 config.should_validate,
2122 is_structural,
2123 messages.into(),
2124 );
2125
2126 Ok(decode_iterator)
2127}
2128
2129pub trait PrimitivePageDecoder: Send + Sync {
2141 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2173}
2174
2175pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2184 fn schedule_ranges(
2196 &self,
2197 ranges: &[Range<u64>],
2198 scheduler: &Arc<dyn EncodingsIo>,
2199 top_level_row: u64,
2200 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2201}
2202
2203pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2205 fn advance(&mut self, num_rows: u64);
2206 fn current_priority(&self) -> u64;
2207 fn box_clone(&self) -> Box<dyn PriorityRange>;
2208}
2209
2210#[derive(Debug)]
2213pub struct SimplePriorityRange {
2214 priority: u64,
2215}
2216
2217impl SimplePriorityRange {
2218 fn new(priority: u64) -> Self {
2219 Self { priority }
2220 }
2221}
2222
2223impl PriorityRange for SimplePriorityRange {
2224 fn advance(&mut self, num_rows: u64) {
2225 self.priority += num_rows;
2226 }
2227
2228 fn current_priority(&self) -> u64 {
2229 self.priority
2230 }
2231
2232 fn box_clone(&self) -> Box<dyn PriorityRange> {
2233 Box::new(Self {
2234 priority: self.priority,
2235 })
2236 }
2237}
2238
2239pub struct ListPriorityRange {
2252 base: Box<dyn PriorityRange>,
2253 offsets: Arc<[u64]>,
2254 cur_index_into_offsets: usize,
2255 cur_position: u64,
2256}
2257
2258impl ListPriorityRange {
2259 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2260 Self {
2261 base,
2262 offsets,
2263 cur_index_into_offsets: 0,
2264 cur_position: 0,
2265 }
2266 }
2267}
2268
2269impl std::fmt::Debug for ListPriorityRange {
2270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2271 f.debug_struct("ListPriorityRange")
2272 .field("base", &self.base)
2273 .field("offsets.len()", &self.offsets.len())
2274 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2275 .field("cur_position", &self.cur_position)
2276 .finish()
2277 }
2278}
2279
2280impl PriorityRange for ListPriorityRange {
2281 fn advance(&mut self, num_rows: u64) {
2282 self.cur_position += num_rows;
2285 let mut idx_into_offsets = self.cur_index_into_offsets;
2286 while idx_into_offsets + 1 < self.offsets.len()
2287 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2288 {
2289 idx_into_offsets += 1;
2290 }
2291 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2292 self.cur_index_into_offsets = idx_into_offsets;
2293 self.base.advance(base_rows_advanced as u64);
2294 }
2295
2296 fn current_priority(&self) -> u64 {
2297 self.base.current_priority()
2298 }
2299
2300 fn box_clone(&self) -> Box<dyn PriorityRange> {
2301 Box::new(Self {
2302 base: self.base.box_clone(),
2303 offsets: self.offsets.clone(),
2304 cur_index_into_offsets: self.cur_index_into_offsets,
2305 cur_position: self.cur_position,
2306 })
2307 }
2308}
2309
2310pub struct SchedulerContext {
2312 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2313 io: Arc<dyn EncodingsIo>,
2314 cache: Arc<FileMetadataCache>,
2315 name: String,
2316 path: Vec<u32>,
2317 path_names: Vec<String>,
2318}
2319
2320pub struct ScopedSchedulerContext<'a> {
2321 pub context: &'a mut SchedulerContext,
2322}
2323
2324impl<'a> ScopedSchedulerContext<'a> {
2325 pub fn pop(self) -> &'a mut SchedulerContext {
2326 self.context.pop();
2327 self.context
2328 }
2329}
2330
2331impl SchedulerContext {
2332 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<FileMetadataCache>) -> Self {
2333 Self {
2334 io,
2335 cache,
2336 recv: None,
2337 name: "".to_string(),
2338 path: Vec::new(),
2339 path_names: Vec::new(),
2340 }
2341 }
2342
2343 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2344 &self.io
2345 }
2346
2347 pub fn cache(&self) -> &Arc<FileMetadataCache> {
2348 &self.cache
2349 }
2350
2351 pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2352 self.path.push(index);
2353 self.path_names.push(name.to_string());
2354 ScopedSchedulerContext { context: self }
2355 }
2356
2357 pub fn pop(&mut self) {
2358 self.path.pop();
2359 self.path_names.pop();
2360 }
2361
2362 pub fn path_name(&self) -> String {
2363 let path = self.path_names.join("/");
2364 if self.recv.is_some() {
2365 format!("TEMP({}){}", self.name, path)
2366 } else {
2367 format!("ROOT{}", path)
2368 }
2369 }
2370
2371 pub fn current_path(&self) -> VecDeque<u32> {
2372 VecDeque::from_iter(self.path.iter().copied())
2373 }
2374
2375 pub fn locate_decoder(&mut self, decoder: Box<dyn LogicalPageDecoder>) -> DecoderReady {
2376 trace!(
2377 "Scheduling decoder of type {:?} for {:?}",
2378 decoder.data_type(),
2379 self.path,
2380 );
2381 DecoderReady {
2382 decoder,
2383 path: self.current_path(),
2384 }
2385 }
2386}
2387
2388pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2389
2390impl std::fmt::Debug for UnloadedPage {
2391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2392 f.debug_struct("UnloadedPage").finish()
2393 }
2394}
2395
2396#[derive(Debug)]
2397pub struct ScheduledScanLine {
2398 pub rows_scheduled: u64,
2399 pub decoders: Vec<MessageType>,
2400}
2401
2402pub trait SchedulingJob: std::fmt::Debug {
2403 fn schedule_next(
2404 &mut self,
2405 context: &mut SchedulerContext,
2406 priority: &dyn PriorityRange,
2407 ) -> Result<ScheduledScanLine>;
2408
2409 fn num_rows(&self) -> u64;
2410}
2411
2412pub trait StructuralSchedulingJob: std::fmt::Debug {
2413 fn schedule_next(
2414 &mut self,
2415 context: &mut SchedulerContext,
2416 ) -> Result<Option<ScheduledScanLine>>;
2417}
2418
2419pub struct FilterExpression(pub Bytes);
2427
2428impl FilterExpression {
2429 pub fn no_filter() -> Self {
2434 Self(Bytes::new())
2435 }
2436
2437 pub fn is_noop(&self) -> bool {
2439 self.0.is_empty()
2440 }
2441}
2442
2443pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
2468 fn initialize<'a>(
2470 &'a self,
2471 filter: &'a FilterExpression,
2472 context: &'a SchedulerContext,
2473 ) -> BoxFuture<'a, Result<()>>;
2474 fn schedule_ranges<'a>(
2479 &'a self,
2480 ranges: &[Range<u64>],
2481 filter: &FilterExpression,
2482 ) -> Result<Box<dyn SchedulingJob + 'a>>;
2483 fn num_rows(&self) -> u64;
2485}
2486
2487pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2488 fn initialize<'a>(
2489 &'a mut self,
2490 filter: &'a FilterExpression,
2491 context: &'a SchedulerContext,
2492 ) -> BoxFuture<'a, Result<()>>;
2493 fn schedule_ranges<'a>(
2494 &'a self,
2495 ranges: &[Range<u64>],
2496 filter: &FilterExpression,
2497 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2498}
2499
2500pub trait DecodeArrayTask: Send {
2502 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2504}
2505
2506impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2507 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2508 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2509 }
2510}
2511
2512pub struct NextDecodeTask {
2517 pub task: Box<dyn DecodeArrayTask>,
2519 pub num_rows: u64,
2521}
2522
2523impl NextDecodeTask {
2524 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2529 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2530 let struct_arr = self.task.decode();
2531 match struct_arr {
2532 Ok(struct_arr) => {
2533 let batch = RecordBatch::from(struct_arr.as_struct());
2534 let size_bytes = batch.get_array_memory_size() as u64;
2535 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2536 emitted_batch_size_warning.call_once(|| {
2537 let size_mb = size_bytes / 1024 / 1024;
2538 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2539 });
2540 }
2541 Ok(batch)
2542 }
2543 Err(e) => {
2544 let e = Error::Internal {
2545 message: format!("Error decoding batch: {}", e),
2546 location: location!(),
2547 };
2548 Err(e)
2549 }
2550 }
2551 }
2552}
2553
2554#[derive(Debug)]
2555pub struct DecoderReady {
2556 pub decoder: Box<dyn LogicalPageDecoder>,
2558 pub path: VecDeque<u32>,
2577}
2578
2579#[derive(Debug)]
2583pub enum MessageType {
2584 DecoderReady(DecoderReady),
2589 UnloadedPage(UnloadedPage),
2593}
2594
2595impl MessageType {
2596 pub fn into_legacy(self) -> DecoderReady {
2597 match self {
2598 Self::DecoderReady(decoder) => decoder,
2599 Self::UnloadedPage(_) => {
2600 panic!("Expected DecoderReady but got UnloadedPage")
2601 }
2602 }
2603 }
2604
2605 pub fn into_structural(self) -> UnloadedPage {
2606 match self {
2607 Self::UnloadedPage(unloaded) => unloaded,
2608 Self::DecoderReady(_) => {
2609 panic!("Expected UnloadedPage but got DecoderReady")
2610 }
2611 }
2612 }
2613}
2614
2615pub struct DecoderMessage {
2616 pub scheduled_so_far: u64,
2617 pub decoders: Vec<MessageType>,
2618}
2619
2620pub struct DecoderContext {
2621 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2622}
2623
2624impl DecoderContext {
2625 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2626 Self { source }
2627 }
2628}
2629
2630pub trait LogicalPageDecoder: std::fmt::Debug + Send {
2639 fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
2644 Err(Error::Internal {
2645 message: format!(
2646 "The decoder {:?} does not expect children but received a child",
2647 self
2648 ),
2649 location: location!(),
2650 })
2651 }
2652 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>>;
2654 fn rows_loaded(&self) -> u64;
2656 fn rows_unloaded(&self) -> u64 {
2658 self.num_rows() - self.rows_loaded()
2659 }
2660 fn num_rows(&self) -> u64;
2662 fn rows_drained(&self) -> u64;
2664 fn rows_left(&self) -> u64 {
2666 self.num_rows() - self.rows_drained()
2667 }
2668 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
2670 fn data_type(&self) -> &DataType;
2672}
2673
2674pub struct DecodedPage {
2675 pub data: DataBlock,
2676 pub repdef: RepDefUnraveler,
2677}
2678
2679pub trait DecodePageTask: Send + std::fmt::Debug {
2680 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2682}
2683
2684pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2685 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2686 fn num_rows(&self) -> u64;
2687}
2688
2689#[derive(Debug)]
2690pub struct LoadedPage {
2691 pub decoder: Box<dyn StructuralPageDecoder>,
2693 pub path: VecDeque<u32>,
2712 pub page_index: usize,
2713}
2714
2715pub struct DecodedArray {
2716 pub array: ArrayRef,
2717 pub repdef: CompositeRepDefUnraveler,
2718}
2719
2720pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2721 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2722}
2723
2724pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2725 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2730 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2732 fn data_type(&self) -> &DataType;
2734}
2735
2736#[derive(Debug, Default)]
2737pub struct DecoderPlugins {}
2738
2739pub async fn decode_batch(
2741 batch: &EncodedBatch,
2742 filter: &FilterExpression,
2743 decoder_plugins: Arc<DecoderPlugins>,
2744 should_validate: bool,
2745 version: LanceFileVersion,
2746 cache: Option<Arc<FileMetadataCache>>,
2747) -> Result<RecordBatch> {
2748 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2753 let cache = cache.unwrap_or_else(|| {
2754 Arc::new(FileMetadataCache::with_capacity(
2755 128 * 1024 * 1024,
2756 CapacityMode::Bytes,
2757 ))
2758 });
2759 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2760 batch.schema.as_ref(),
2761 &batch.top_level_columns,
2762 &batch.page_table,
2763 &vec![],
2764 batch.num_rows,
2765 decoder_plugins,
2766 io_scheduler.clone(),
2767 cache,
2768 filter,
2769 )
2770 .await?;
2771 let (tx, rx) = unbounded_channel();
2772 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2773 let is_structural = version >= LanceFileVersion::V2_1;
2774 let mut decode_stream = create_decode_stream(
2775 &batch.schema,
2776 batch.num_rows,
2777 batch.num_rows as u32,
2778 is_structural,
2779 should_validate,
2780 rx,
2781 );
2782 decode_stream.next().await.unwrap().task.await
2783}
2784
2785#[cfg(test)]
2786mod tests {
2788 use super::*;
2789
2790 #[test]
2791 fn test_coalesce_indices_to_ranges_with_single_index() {
2792 let indices = vec![1];
2793 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2794 assert_eq!(ranges, vec![1..2]);
2795 }
2796
2797 #[test]
2798 fn test_coalesce_indices_to_ranges() {
2799 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2800 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2801 assert_eq!(ranges, vec![1..10]);
2802 }
2803
2804 #[test]
2805 fn test_coalesce_indices_to_ranges_with_gaps() {
2806 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2807 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2808 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2809 }
2810}