1use std::collections::VecDeque;
216use std::sync::Once;
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::{CapacityMode, FileMetadataCache};
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::buffer::LanceBuffer;
238use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239use crate::encoder::{values_column_encoding, EncodedBatch};
240use crate::encodings::logical::binary::BinaryFieldScheduler;
241use crate::encodings::logical::blob::BlobFieldScheduler;
242use crate::encodings::logical::list::{
243 ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
244};
245use crate::encodings::logical::primitive::{
246 PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
247};
248use crate::encodings::logical::r#struct::{
249 SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250};
251use crate::encodings::physical::binary::{
252 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253};
254use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
255use crate::encodings::physical::fsst::{FsstMiniBlockDecompressor, FsstPerValueDecompressor};
256use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
257use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
258use crate::encodings::physical::{ColumnBuffers, FileBuffers};
259use crate::format::pb::{self, column_encoding};
260use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
261use crate::version::LanceFileVersion;
262use crate::{BufferScheduler, EncodingsIo};
263
264const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
266
267#[derive(Debug)]
274pub enum PageEncoding {
275 Legacy(pb::ArrayEncoding),
276 Structural(pb::PageLayout),
277}
278
279impl PageEncoding {
280 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
281 match self {
282 Self::Legacy(enc) => enc,
283 Self::Structural(_) => panic!("Expected a legacy encoding"),
284 }
285 }
286
287 pub fn as_structural(&self) -> &pb::PageLayout {
288 match self {
289 Self::Structural(enc) => enc,
290 Self::Legacy(_) => panic!("Expected a structural encoding"),
291 }
292 }
293
294 pub fn is_structural(&self) -> bool {
295 matches!(self, Self::Structural(_))
296 }
297}
298
299#[derive(Debug)]
303pub struct PageInfo {
304 pub num_rows: u64,
306 pub priority: u64,
310 pub encoding: PageEncoding,
312 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
314}
315
316#[derive(Debug, Clone)]
320pub struct ColumnInfo {
321 pub index: u32,
323 pub page_infos: Arc<[PageInfo]>,
325 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
327 pub encoding: pb::ColumnEncoding,
328}
329
330impl ColumnInfo {
331 pub fn new(
333 index: u32,
334 page_infos: Arc<[PageInfo]>,
335 buffer_offsets_and_sizes: Vec<(u64, u64)>,
336 encoding: pb::ColumnEncoding,
337 ) -> Self {
338 Self {
339 index,
340 page_infos,
341 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
342 encoding,
343 }
344 }
345
346 pub fn is_structural(&self) -> bool {
347 self.page_infos
348 .first()
350 .map(|page| page.encoding.is_structural())
351 .unwrap_or(false)
352 }
353}
354
355enum RootScheduler {
356 Structural(Box<dyn StructuralFieldScheduler>),
357 Legacy(Arc<dyn FieldScheduler>),
358}
359
360impl RootScheduler {
361 fn as_legacy(&self) -> &Arc<dyn FieldScheduler> {
362 match self {
363 Self::Structural(_) => panic!("Expected a legacy scheduler"),
364 Self::Legacy(s) => s,
365 }
366 }
367
368 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
369 match self {
370 Self::Structural(s) => s.as_ref(),
371 Self::Legacy(_) => panic!("Expected a structural scheduler"),
372 }
373 }
374}
375
376pub struct DecodeBatchScheduler {
398 root_scheduler: RootScheduler,
399 pub root_fields: Fields,
400 cache: Arc<FileMetadataCache>,
401}
402
403pub struct ColumnInfoIter<'a> {
404 column_infos: Vec<Arc<ColumnInfo>>,
405 column_indices: &'a [u32],
406 column_info_pos: usize,
407 column_indices_pos: usize,
408}
409
410impl<'a> ColumnInfoIter<'a> {
411 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
412 let initial_pos = column_indices[0] as usize;
413 Self {
414 column_infos,
415 column_indices,
416 column_info_pos: initial_pos,
417 column_indices_pos: 0,
418 }
419 }
420
421 pub fn peek(&self) -> &Arc<ColumnInfo> {
422 &self.column_infos[self.column_info_pos]
423 }
424
425 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
426 let column_info = self.column_infos[self.column_info_pos].clone();
427 let transformed = transform(column_info);
428 self.column_infos[self.column_info_pos] = transformed;
429 }
430
431 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
432 self.next().ok_or_else(|| {
433 Error::invalid_input(
434 "there were more fields in the schema than provided column indices / infos",
435 location!(),
436 )
437 })
438 }
439
440 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
441 if self.column_info_pos < self.column_infos.len() {
442 let info = &self.column_infos[self.column_info_pos];
443 self.column_info_pos += 1;
444 Some(info)
445 } else {
446 None
447 }
448 }
449
450 pub(crate) fn next_top_level(&mut self) {
451 self.column_indices_pos += 1;
452 if self.column_indices_pos < self.column_indices.len() {
453 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
454 } else {
455 self.column_info_pos = self.column_infos.len();
456 }
457 }
458}
459
460pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
461 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
462}
463
464pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
465 fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock>;
467 fn bits_per_value(&self) -> u64;
471}
472
473pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
474 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
476}
477
478pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
479 fn decompress(&self, data: LanceBuffer) -> Result<DataBlock>;
480}
481
482pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
483 fn create_miniblock_decompressor(
484 &self,
485 description: &pb::ArrayEncoding,
486 ) -> Result<Box<dyn MiniBlockDecompressor>>;
487
488 fn create_fixed_per_value_decompressor(
489 &self,
490 description: &pb::ArrayEncoding,
491 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
492
493 fn create_variable_per_value_decompressor(
494 &self,
495 description: &pb::ArrayEncoding,
496 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
497
498 fn create_block_decompressor(
499 &self,
500 description: &pb::ArrayEncoding,
501 ) -> Result<Box<dyn BlockDecompressor>>;
502}
503
504#[derive(Debug, Default)]
505pub struct CoreDecompressorStrategy {}
506
507impl DecompressorStrategy for CoreDecompressorStrategy {
508 fn create_miniblock_decompressor(
509 &self,
510 description: &pb::ArrayEncoding,
511 ) -> Result<Box<dyn MiniBlockDecompressor>> {
512 match description.array_encoding.as_ref().unwrap() {
513 pb::array_encoding::ArrayEncoding::Flat(flat) => {
514 Ok(Box::new(ValueDecompressor::new(flat)))
515 }
516 pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
517 Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
518 }
519 pb::array_encoding::ArrayEncoding::Variable(_) => {
520 Ok(Box::new(BinaryMiniBlockDecompressor::default()))
521 }
522 pb::array_encoding::ArrayEncoding::Fsst(description) => {
523 Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
524 }
525 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
526 Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
527 description,
528 )))
529 }
530 _ => todo!(),
531 }
532 }
533
534 fn create_fixed_per_value_decompressor(
535 &self,
536 description: &pb::ArrayEncoding,
537 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
538 match description.array_encoding.as_ref().unwrap() {
539 pb::array_encoding::ArrayEncoding::Flat(flat) => {
540 Ok(Box::new(ValueDecompressor::new(flat)))
541 }
542 _ => todo!("fixed-per-value decompressor for {:?}", description),
543 }
544 }
545
546 fn create_variable_per_value_decompressor(
547 &self,
548 description: &pb::ArrayEncoding,
549 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
550 match *description.array_encoding.as_ref().unwrap() {
551 pb::array_encoding::ArrayEncoding::Variable(variable) => {
552 assert!(variable.bits_per_offset < u8::MAX as u32);
553 Ok(Box::new(VariableDecoder::default()))
554 }
555 pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
556 Ok(Box::new(FsstPerValueDecompressor::new(
557 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
558 Box::new(VariableDecoder::default()),
559 )))
560 }
561 _ => todo!("variable-per-value decompressor for {:?}", description),
562 }
563 }
564
565 fn create_block_decompressor(
566 &self,
567 description: &pb::ArrayEncoding,
568 ) -> Result<Box<dyn BlockDecompressor>> {
569 match description.array_encoding.as_ref().unwrap() {
570 pb::array_encoding::ArrayEncoding::Flat(flat) => {
571 Ok(Box::new(ValueDecompressor::new(flat)))
572 }
573 pb::array_encoding::ArrayEncoding::Constant(constant) => {
574 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
575 Ok(Box::new(ConstantDecompressor::new(
576 scalar,
577 constant.num_values,
578 )))
579 }
580 pb::array_encoding::ArrayEncoding::Variable(_) => {
581 Ok(Box::new(BinaryBlockDecompressor::default()))
582 }
583 _ => todo!(),
584 }
585 }
586}
587
588#[derive(Debug)]
590pub struct CoreFieldDecoderStrategy {
591 pub validate_data: bool,
592 pub decompressor_strategy: Arc<dyn DecompressorStrategy>,
593}
594
595impl Default for CoreFieldDecoderStrategy {
596 fn default() -> Self {
597 Self {
598 validate_data: false,
599 decompressor_strategy: Arc::new(CoreDecompressorStrategy {}),
600 }
601 }
602}
603
604impl CoreFieldDecoderStrategy {
605 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
608 let column_encoding = column_info
609 .encoding
610 .column_encoding
611 .as_ref()
612 .ok_or_else(|| {
613 Error::invalid_input(
614 format!(
615 "the column at index {} was missing a ColumnEncoding",
616 column_info.index
617 ),
618 location!(),
619 )
620 })?;
621 if matches!(
622 column_encoding,
623 pb::column_encoding::ColumnEncoding::Values(_)
624 ) {
625 Ok(())
626 } else {
627 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
628 }
629 }
630
631 fn is_primitive(data_type: &DataType) -> bool {
632 if data_type.is_primitive() {
633 true
634 } else {
635 match data_type {
636 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
638 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
639 _ => false,
640 }
641 }
642 }
643
644 fn create_primitive_scheduler(
645 &self,
646 field: &Field,
647 column: &ColumnInfo,
648 buffers: FileBuffers,
649 ) -> Result<Box<dyn FieldScheduler>> {
650 Self::ensure_values_encoded(column, &field.name)?;
651 let column_buffers = ColumnBuffers {
653 file_buffers: buffers,
654 positions_and_sizes: &column.buffer_offsets_and_sizes,
655 };
656 Ok(Box::new(PrimitiveFieldScheduler::new(
657 column.index,
658 field.data_type(),
659 column.page_infos.clone(),
660 column_buffers,
661 self.validate_data,
662 )))
663 }
664
665 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
667 Self::ensure_values_encoded(column_info, field_name)?;
668 if column_info.page_infos.len() != 1 {
669 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
670 }
671 let encoding = &column_info.page_infos[0].encoding;
672 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
673 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
674 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
675 }
676 }
677
678 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
679 let encoding = &column_info.page_infos[0].encoding;
680 matches!(
681 encoding.as_legacy().array_encoding.as_ref().unwrap(),
682 pb::array_encoding::ArrayEncoding::PackedStruct(_)
683 )
684 }
685
686 fn create_list_scheduler(
687 &self,
688 list_field: &Field,
689 column_infos: &mut ColumnInfoIter,
690 buffers: FileBuffers,
691 offsets_column: &ColumnInfo,
692 ) -> Result<Box<dyn FieldScheduler>> {
693 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
694 let offsets_column_buffers = ColumnBuffers {
695 file_buffers: buffers,
696 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
697 };
698 let items_scheduler =
699 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
700
701 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
702 .page_infos
703 .iter()
704 .filter(|offsets_page| offsets_page.num_rows > 0)
705 .map(|offsets_page| {
706 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
707 &offsets_page.encoding.as_legacy().array_encoding
708 {
709 let inner = PageInfo {
710 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
711 encoding: PageEncoding::Legacy(
712 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
713 ),
714 num_rows: offsets_page.num_rows,
715 priority: 0,
716 };
717 (
718 inner,
719 OffsetPageInfo {
720 offsets_in_page: offsets_page.num_rows,
721 null_offset_adjustment: list_encoding.null_offset_adjustment,
722 num_items_referenced_by_page: list_encoding.num_items,
723 },
724 )
725 } else {
726 panic!("Expected a list column");
728 }
729 })
730 .unzip();
731 let inner = Arc::new(PrimitiveFieldScheduler::new(
732 offsets_column.index,
733 DataType::UInt64,
734 Arc::from(inner_infos.into_boxed_slice()),
735 offsets_column_buffers,
736 self.validate_data,
737 )) as Arc<dyn FieldScheduler>;
738 let items_field = match list_field.data_type() {
739 DataType::List(inner) => inner,
740 DataType::LargeList(inner) => inner,
741 _ => unreachable!(),
742 };
743 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
744 DataType::Int32
745 } else {
746 DataType::Int64
747 };
748 Ok(Box::new(ListFieldScheduler::new(
749 inner,
750 items_scheduler.into(),
751 items_field,
752 offset_type,
753 null_offset_adjustments,
754 )))
755 }
756
757 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
758 if let column_encoding::ColumnEncoding::Blob(blob) =
759 column_info.encoding.column_encoding.as_ref().unwrap()
760 {
761 let mut column_info = column_info.clone();
762 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
763 Some(column_info)
764 } else {
765 None
766 }
767 }
768
769 fn items_per_row(data_type: &DataType) -> u64 {
770 match data_type {
771 DataType::FixedSizeList(inner, dimension) => {
772 Self::items_per_row(inner.data_type()) * *dimension as u64
773 }
774 _ => 1,
775 }
776 }
777
778 fn create_structural_field_scheduler(
779 &self,
780 field: &Field,
781 column_infos: &mut ColumnInfoIter,
782 ) -> Result<Box<dyn StructuralFieldScheduler>> {
783 let data_type = field.data_type();
784 if Self::is_primitive(&data_type) {
785 let column_info = column_infos.expect_next()?;
786 let items_per_row = Self::items_per_row(&data_type);
787 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
788 column_info.as_ref(),
789 items_per_row,
790 self.decompressor_strategy.as_ref(),
791 )?);
792
793 column_infos.next_top_level();
795
796 return Ok(scheduler);
797 }
798 match &data_type {
799 DataType::Struct(fields) => {
800 if field.is_packed_struct() {
801 let column_info = column_infos.expect_next()?;
802 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
803 column_info.as_ref(),
804 1, self.decompressor_strategy.as_ref(),
806 )?);
807
808 column_infos.next_top_level();
810
811 return Ok(scheduler);
812 }
813 let mut child_schedulers = Vec::with_capacity(field.children.len());
814 for field in field.children.iter() {
815 let field_scheduler =
816 self.create_structural_field_scheduler(field, column_infos)?;
817 child_schedulers.push(field_scheduler);
818 }
819
820 let fields = fields.clone();
821 Ok(
822 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
823 as Box<dyn StructuralFieldScheduler>,
824 )
825 }
826 DataType::Binary | DataType::Utf8 => {
827 let column_info = column_infos.expect_next()?;
828 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
829 column_info.as_ref(),
830 1,
831 self.decompressor_strategy.as_ref(),
832 )?);
833 column_infos.next_top_level();
834 Ok(scheduler)
835 }
836 DataType::List(_) | DataType::LargeList(_) => {
837 let child = field
838 .children
839 .first()
840 .expect("List field must have a child");
841 let child_scheduler =
842 self.create_structural_field_scheduler(child, column_infos)?;
843 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
844 as Box<dyn StructuralFieldScheduler>)
845 }
846 _ => todo!(),
847 }
848 }
849
850 fn create_legacy_field_scheduler(
851 &self,
852 field: &Field,
853 column_infos: &mut ColumnInfoIter,
854 buffers: FileBuffers,
855 ) -> Result<Box<dyn FieldScheduler>> {
856 let data_type = field.data_type();
857 if Self::is_primitive(&data_type) {
858 let column_info = column_infos.expect_next()?;
859 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
860 return Ok(scheduler);
861 } else if data_type.is_binary_like() {
862 let column_info = column_infos.next().unwrap().clone();
863 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
865 let desc_scheduler =
866 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
867 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
868 return Ok(blob_scheduler);
869 }
870 if let Some(page_info) = column_info.page_infos.first() {
871 if matches!(
872 page_info.encoding.as_legacy(),
873 pb::ArrayEncoding {
874 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
875 }
876 ) {
877 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
878 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
879 } else {
880 DataType::LargeList(Arc::new(ArrowField::new(
881 "item",
882 DataType::UInt8,
883 false,
884 )))
885 };
886 let list_field = Field::try_from(ArrowField::new(
887 field.name.clone(),
888 list_type,
889 field.nullable,
890 ))
891 .unwrap();
892 let list_scheduler = self.create_list_scheduler(
893 &list_field,
894 column_infos,
895 buffers,
896 &column_info,
897 )?;
898 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
899 list_scheduler.into(),
900 field.data_type(),
901 ));
902 return Ok(binary_scheduler);
903 } else {
904 let scheduler =
905 self.create_primitive_scheduler(field, &column_info, buffers)?;
906 return Ok(scheduler);
907 }
908 } else {
909 return self.create_primitive_scheduler(field, &column_info, buffers);
910 }
911 }
912 match &data_type {
913 DataType::FixedSizeList(inner, _dimension) => {
914 if Self::is_primitive(inner.data_type()) {
917 let primitive_col = column_infos.expect_next()?;
918 let scheduler =
919 self.create_primitive_scheduler(field, primitive_col, buffers)?;
920 Ok(scheduler)
921 } else {
922 todo!()
923 }
924 }
925 DataType::Dictionary(_key_type, value_type) => {
926 if Self::is_primitive(value_type) || value_type.is_binary_like() {
927 let primitive_col = column_infos.expect_next()?;
928 let scheduler =
929 self.create_primitive_scheduler(field, primitive_col, buffers)?;
930 Ok(scheduler)
931 } else {
932 Err(Error::NotSupported {
933 source: format!(
934 "No way to decode into a dictionary field of type {}",
935 value_type
936 )
937 .into(),
938 location: location!(),
939 })
940 }
941 }
942 DataType::List(_) | DataType::LargeList(_) => {
943 let offsets_column = column_infos.expect_next()?.clone();
944 column_infos.next_top_level();
945 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
946 }
947 DataType::Struct(fields) => {
948 let column_info = column_infos.expect_next()?;
949
950 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
952 return self.create_primitive_scheduler(field, &blob_col, buffers);
954 }
955
956 if Self::check_packed_struct(column_info) {
957 self.create_primitive_scheduler(field, column_info, buffers)
959 } else {
960 Self::check_simple_struct(column_info, &field.name).unwrap();
962 let mut child_schedulers = Vec::with_capacity(field.children.len());
963 for field in &field.children {
964 column_infos.next_top_level();
965 let field_scheduler =
966 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
967 child_schedulers.push(Arc::from(field_scheduler));
968 }
969
970 let fields = fields.clone();
971 Ok(Box::new(SimpleStructScheduler::new(
972 child_schedulers,
973 fields,
974 )))
975 }
976 }
977 _ => todo!(),
979 }
980 }
981}
982
983fn root_column(num_rows: u64) -> ColumnInfo {
985 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
986 let final_page_num_rows = num_rows % (u32::MAX as u64);
987 let root_pages = (0..num_root_pages)
988 .map(|i| PageInfo {
989 num_rows: if i == num_root_pages - 1 {
990 final_page_num_rows
991 } else {
992 u64::MAX
993 },
994 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
995 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
996 pb::SimpleStruct {},
997 )),
998 }),
999 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
1001 })
1002 .collect::<Vec<_>>();
1003 ColumnInfo {
1004 buffer_offsets_and_sizes: Arc::new([]),
1005 encoding: values_column_encoding(),
1006 index: u32::MAX,
1007 page_infos: Arc::from(root_pages),
1008 }
1009}
1010
1011pub enum RootDecoder {
1012 Structural(StructuralStructDecoder),
1013 Legacy(SimpleStructDecoder),
1014}
1015
1016impl RootDecoder {
1017 pub fn into_structural(self) -> StructuralStructDecoder {
1018 match self {
1019 Self::Structural(decoder) => decoder,
1020 Self::Legacy(_) => panic!("Expected a structural decoder"),
1021 }
1022 }
1023
1024 pub fn into_legacy(self) -> SimpleStructDecoder {
1025 match self {
1026 Self::Legacy(decoder) => decoder,
1027 Self::Structural(_) => panic!("Expected a legacy decoder"),
1028 }
1029 }
1030}
1031
1032impl DecodeBatchScheduler {
1033 #[allow(clippy::too_many_arguments)]
1036 pub async fn try_new<'a>(
1037 schema: &'a Schema,
1038 column_indices: &[u32],
1039 column_infos: &[Arc<ColumnInfo>],
1040 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1041 num_rows: u64,
1042 _decoder_plugins: Arc<DecoderPlugins>,
1043 io: Arc<dyn EncodingsIo>,
1044 cache: Arc<FileMetadataCache>,
1045 filter: &FilterExpression,
1046 ) -> Result<Self> {
1047 assert!(num_rows > 0);
1048 let buffers = FileBuffers {
1049 positions_and_sizes: file_buffer_positions_and_sizes,
1050 };
1051 let arrow_schema = ArrowSchema::from(schema);
1052 let root_fields = arrow_schema.fields().clone();
1053 let root_type = DataType::Struct(root_fields.clone());
1054 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1055 root_field.children.clone_from(&schema.fields);
1059 root_field
1060 .metadata
1061 .insert("__lance_decoder_root".to_string(), "true".to_string());
1062
1063 if column_infos[0].is_structural() {
1064 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1065
1066 let mut root_scheduler = CoreFieldDecoderStrategy::default()
1067 .create_structural_field_scheduler(&root_field, &mut column_iter)?;
1068
1069 let context = SchedulerContext::new(io, cache.clone());
1070 root_scheduler.initialize(filter, &context).await?;
1071
1072 Ok(Self {
1073 root_scheduler: RootScheduler::Structural(root_scheduler),
1074 root_fields,
1075 cache,
1076 })
1077 } else {
1078 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1081 columns.push(Arc::new(root_column(num_rows)));
1082 columns.extend(column_infos.iter().cloned());
1083
1084 let adjusted_column_indices = [0_u32]
1085 .into_iter()
1086 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1087 .collect::<Vec<_>>();
1088 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1089 let root_scheduler = CoreFieldDecoderStrategy::default()
1090 .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1091
1092 let context = SchedulerContext::new(io, cache.clone());
1093 root_scheduler.initialize(filter, &context).await?;
1094
1095 Ok(Self {
1096 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1097 root_fields,
1098 cache,
1099 })
1100 }
1101 }
1102
1103 pub fn from_scheduler(
1104 root_scheduler: Arc<dyn FieldScheduler>,
1105 root_fields: Fields,
1106 cache: Arc<FileMetadataCache>,
1107 ) -> Self {
1108 Self {
1109 root_scheduler: RootScheduler::Legacy(root_scheduler),
1110 root_fields,
1111 cache,
1112 }
1113 }
1114
1115 fn do_schedule_ranges_structural(
1116 &mut self,
1117 ranges: &[Range<u64>],
1118 filter: &FilterExpression,
1119 io: Arc<dyn EncodingsIo>,
1120 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1121 ) {
1122 let root_scheduler = self.root_scheduler.as_structural();
1123 let mut context = SchedulerContext::new(io, self.cache.clone());
1124 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1125 if let Err(schedule_ranges_err) = maybe_root_job {
1126 schedule_action(Err(schedule_ranges_err));
1127 return;
1128 }
1129 let mut root_job = maybe_root_job.unwrap();
1130 let mut num_rows_scheduled = 0;
1131 loop {
1132 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1133 if let Err(err) = maybe_next_scan_line {
1134 schedule_action(Err(err));
1135 return;
1136 }
1137 let next_scan_line = maybe_next_scan_line.unwrap();
1138 match next_scan_line {
1139 Some(next_scan_line) => {
1140 trace!(
1141 "Scheduled scan line of {} rows and {} decoders",
1142 next_scan_line.rows_scheduled,
1143 next_scan_line.decoders.len()
1144 );
1145 num_rows_scheduled += next_scan_line.rows_scheduled;
1146 if !schedule_action(Ok(DecoderMessage {
1147 scheduled_so_far: num_rows_scheduled,
1148 decoders: next_scan_line.decoders,
1149 })) {
1150 return;
1152 }
1153 }
1154 None => return,
1155 }
1156 }
1157 }
1158
1159 fn do_schedule_ranges_legacy(
1160 &mut self,
1161 ranges: &[Range<u64>],
1162 filter: &FilterExpression,
1163 io: Arc<dyn EncodingsIo>,
1164 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1165 priority: Option<Box<dyn PriorityRange>>,
1169 ) {
1170 let root_scheduler = self.root_scheduler.as_legacy();
1171 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1172 trace!(
1173 "Scheduling {} ranges across {}..{} ({} rows){}",
1174 ranges.len(),
1175 ranges.first().unwrap().start,
1176 ranges.last().unwrap().end,
1177 rows_requested,
1178 priority
1179 .as_ref()
1180 .map(|p| format!(" (priority={:?})", p))
1181 .unwrap_or_default()
1182 );
1183
1184 let mut context = SchedulerContext::new(io, self.cache.clone());
1185 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1186 if let Err(schedule_ranges_err) = maybe_root_job {
1187 schedule_action(Err(schedule_ranges_err));
1188 return;
1189 }
1190 let mut root_job = maybe_root_job.unwrap();
1191 let mut num_rows_scheduled = 0;
1192 let mut rows_to_schedule = root_job.num_rows();
1193 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1194 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1195 while rows_to_schedule > 0 {
1196 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1197 if let Err(schedule_next_err) = maybe_next_scan_line {
1198 schedule_action(Err(schedule_next_err));
1199 return;
1200 }
1201 let next_scan_line = maybe_next_scan_line.unwrap();
1202 priority.advance(next_scan_line.rows_scheduled);
1203 num_rows_scheduled += next_scan_line.rows_scheduled;
1204 rows_to_schedule -= next_scan_line.rows_scheduled;
1205 trace!(
1206 "Scheduled scan line of {} rows and {} decoders",
1207 next_scan_line.rows_scheduled,
1208 next_scan_line.decoders.len()
1209 );
1210 if !schedule_action(Ok(DecoderMessage {
1211 scheduled_so_far: num_rows_scheduled,
1212 decoders: next_scan_line.decoders,
1213 })) {
1214 return;
1216 }
1217
1218 trace!("Finished scheduling {} ranges", ranges.len());
1219 }
1220 }
1221
1222 fn do_schedule_ranges(
1223 &mut self,
1224 ranges: &[Range<u64>],
1225 filter: &FilterExpression,
1226 io: Arc<dyn EncodingsIo>,
1227 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1228 priority: Option<Box<dyn PriorityRange>>,
1232 ) {
1233 match &self.root_scheduler {
1234 RootScheduler::Legacy(_) => {
1235 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1236 }
1237 RootScheduler::Structural(_) => {
1238 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1239 }
1240 }
1241 }
1242
1243 pub fn schedule_ranges_to_vec(
1246 &mut self,
1247 ranges: &[Range<u64>],
1248 filter: &FilterExpression,
1249 io: Arc<dyn EncodingsIo>,
1250 priority: Option<Box<dyn PriorityRange>>,
1251 ) -> Result<Vec<DecoderMessage>> {
1252 let mut decode_messages = Vec::new();
1253 self.do_schedule_ranges(
1254 ranges,
1255 filter,
1256 io,
1257 |msg| {
1258 decode_messages.push(msg);
1259 true
1260 },
1261 priority,
1262 );
1263 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1264 }
1265
1266 #[instrument(skip_all)]
1276 pub fn schedule_ranges(
1277 &mut self,
1278 ranges: &[Range<u64>],
1279 filter: &FilterExpression,
1280 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1281 scheduler: Arc<dyn EncodingsIo>,
1282 ) {
1283 self.do_schedule_ranges(
1284 ranges,
1285 filter,
1286 scheduler,
1287 |msg| {
1288 match sink.send(msg) {
1289 Ok(_) => true,
1290 Err(SendError { .. }) => {
1291 debug!(
1294 "schedule_ranges aborting early since decoder appears to have been dropped"
1295 );
1296 false
1297 }
1298 }
1299 },
1300 None,
1301 )
1302 }
1303
1304 #[instrument(skip_all)]
1312 pub fn schedule_range(
1313 &mut self,
1314 range: Range<u64>,
1315 filter: &FilterExpression,
1316 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1317 scheduler: Arc<dyn EncodingsIo>,
1318 ) {
1319 self.schedule_ranges(&[range], filter, sink, scheduler)
1320 }
1321
1322 pub fn schedule_take(
1330 &mut self,
1331 indices: &[u64],
1332 filter: &FilterExpression,
1333 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1334 scheduler: Arc<dyn EncodingsIo>,
1335 ) {
1336 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1337 if indices.is_empty() {
1338 return;
1339 }
1340 trace!("Scheduling take of {} rows", indices.len());
1341 let ranges = indices
1342 .iter()
1343 .map(|&idx| idx..(idx + 1))
1344 .collect::<Vec<_>>();
1345 self.schedule_ranges(&ranges, filter, sink, scheduler)
1346 }
1347}
1348
1349pub struct ReadBatchTask {
1350 pub task: BoxFuture<'static, Result<RecordBatch>>,
1351 pub num_rows: u32,
1352}
1353
1354pub struct BatchDecodeStream {
1356 context: DecoderContext,
1357 root_decoder: SimpleStructDecoder,
1358 rows_remaining: u64,
1359 rows_per_batch: u32,
1360 rows_scheduled: u64,
1361 rows_drained: u64,
1362 scheduler_exhausted: bool,
1363 emitted_batch_size_warning: Arc<Once>,
1364}
1365
1366impl BatchDecodeStream {
1367 pub fn new(
1378 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1379 rows_per_batch: u32,
1380 num_rows: u64,
1381 root_decoder: SimpleStructDecoder,
1382 ) -> Self {
1383 Self {
1384 context: DecoderContext::new(scheduled),
1385 root_decoder,
1386 rows_remaining: num_rows,
1387 rows_per_batch,
1388 rows_scheduled: 0,
1389 rows_drained: 0,
1390 scheduler_exhausted: false,
1391 emitted_batch_size_warning: Arc::new(Once::new()),
1392 }
1393 }
1394
1395 fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> {
1396 if decoder.path.is_empty() {
1397 Ok(())
1399 } else {
1400 self.root_decoder.accept_child(decoder)
1401 }
1402 }
1403
1404 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1405 if self.scheduler_exhausted {
1406 return Ok(self.rows_scheduled);
1407 }
1408 while self.rows_scheduled < scheduled_need {
1409 let next_message = self.context.source.recv().await;
1410 match next_message {
1411 Some(scan_line) => {
1412 let scan_line = scan_line?;
1413 self.rows_scheduled = scan_line.scheduled_so_far;
1414 for message in scan_line.decoders {
1415 self.accept_decoder(message.into_legacy())?;
1416 }
1417 }
1418 None => {
1419 self.scheduler_exhausted = true;
1423 return Ok(self.rows_scheduled);
1424 }
1425 }
1426 }
1427 Ok(scheduled_need)
1428 }
1429
1430 #[instrument(level = "debug", skip_all)]
1431 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1432 trace!(
1433 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1434 self.rows_remaining,
1435 self.rows_drained,
1436 self.rows_scheduled,
1437 );
1438 if self.rows_remaining == 0 {
1439 return Ok(None);
1440 }
1441
1442 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1443 self.rows_remaining -= to_take;
1444
1445 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1446 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1447 if scheduled_need > 0 {
1448 let desired_scheduled = scheduled_need + self.rows_scheduled;
1449 trace!(
1450 "Draining from scheduler (desire at least {} scheduled rows)",
1451 desired_scheduled
1452 );
1453 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1454 if actually_scheduled < desired_scheduled {
1455 let under_scheduled = desired_scheduled - actually_scheduled;
1456 to_take -= under_scheduled;
1457 }
1458 }
1459
1460 if to_take == 0 {
1461 return Ok(None);
1462 }
1463
1464 let loaded_need = self.rows_drained + to_take - 1;
1466 trace!(
1467 "Waiting for I/O (desire at least {} fully loaded rows)",
1468 loaded_need
1469 );
1470 self.root_decoder.wait_for_loaded(loaded_need).await?;
1471
1472 let next_task = self.root_decoder.drain(to_take)?;
1473 self.rows_drained += to_take;
1474 Ok(Some(next_task))
1475 }
1476
1477 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1478 let stream = futures::stream::unfold(self, |mut slf| async move {
1479 let next_task = slf.next_batch_task().await;
1480 let next_task = next_task.transpose().map(|next_task| {
1481 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1482 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1483 let task = tokio::spawn(async move {
1484 let next_task = next_task?;
1485 next_task.into_batch(emitted_batch_size_warning)
1486 });
1487 (task, num_rows)
1488 });
1489 next_task.map(|(task, num_rows)| {
1490 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1491 debug_assert!(num_rows <= u32::MAX as u64);
1493 let next_task = ReadBatchTask {
1494 task,
1495 num_rows: num_rows as u32,
1496 };
1497 (next_task, slf)
1498 })
1499 });
1500 stream.boxed()
1501 }
1502}
1503
1504enum RootDecoderMessage {
1507 LoadedPage(LoadedPage),
1508 LegacyPage(DecoderReady),
1509}
1510trait RootDecoderType {
1511 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1512 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1513 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1514}
1515impl RootDecoderType for StructuralStructDecoder {
1516 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1517 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1518 unreachable!()
1519 };
1520 self.accept_page(loaded_page)
1521 }
1522 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1523 self.drain_batch_task(num_rows)
1524 }
1525 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1526 Ok(())
1528 }
1529}
1530impl RootDecoderType for SimpleStructDecoder {
1531 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1532 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1533 unreachable!()
1534 };
1535 self.accept_child(legacy_page)
1536 }
1537 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1538 self.drain(num_rows)
1539 }
1540 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1541 runtime.block_on(self.wait_for_loaded(loaded_need))
1542 }
1543}
1544
1545struct BatchDecodeIterator<T: RootDecoderType> {
1547 messages: VecDeque<Result<DecoderMessage>>,
1548 root_decoder: T,
1549 rows_remaining: u64,
1550 rows_per_batch: u32,
1551 rows_scheduled: u64,
1552 rows_drained: u64,
1553 emitted_batch_size_warning: Arc<Once>,
1554 wait_for_io_runtime: tokio::runtime::Runtime,
1558 schema: Arc<ArrowSchema>,
1559}
1560
1561impl<T: RootDecoderType> BatchDecodeIterator<T> {
1562 pub fn new(
1564 messages: VecDeque<Result<DecoderMessage>>,
1565 rows_per_batch: u32,
1566 num_rows: u64,
1567 root_decoder: T,
1568 schema: Arc<ArrowSchema>,
1569 ) -> Self {
1570 Self {
1571 messages,
1572 root_decoder,
1573 rows_remaining: num_rows,
1574 rows_per_batch,
1575 rows_scheduled: 0,
1576 rows_drained: 0,
1577 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1578 .build()
1579 .unwrap(),
1580 emitted_batch_size_warning: Arc::new(Once::new()),
1581 schema,
1582 }
1583 }
1584
1585 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1590 match maybe_done(unloaded_page.0) {
1591 MaybeDone::Done(loaded_page) => loaded_page,
1593 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1595 MaybeDone::Gone => unreachable!(),
1596 }
1597 }
1598
1599 #[instrument(skip_all)]
1604 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1605 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1606 let message = self.messages.pop_front().unwrap()?;
1607 self.rows_scheduled = message.scheduled_so_far;
1608 for decoder_message in message.decoders {
1609 match decoder_message {
1610 MessageType::UnloadedPage(unloaded_page) => {
1611 let loaded_page = self.wait_for_page(unloaded_page)?;
1612 self.root_decoder
1613 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1614 }
1615 MessageType::DecoderReady(decoder_ready) => {
1616 if !decoder_ready.path.is_empty() {
1618 self.root_decoder
1619 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1620 }
1621 }
1622 }
1623 }
1624 }
1625
1626 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1627
1628 self.root_decoder
1629 .wait(loaded_need, &self.wait_for_io_runtime)?;
1630 Ok(self.rows_scheduled)
1631 }
1632
1633 #[instrument(level = "debug", skip_all)]
1634 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1635 trace!(
1636 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1637 self.rows_remaining,
1638 self.rows_drained,
1639 self.rows_scheduled,
1640 );
1641 if self.rows_remaining == 0 {
1642 return Ok(None);
1643 }
1644
1645 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1646 self.rows_remaining -= to_take;
1647
1648 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1649 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1650 if scheduled_need > 0 {
1651 let desired_scheduled = scheduled_need + self.rows_scheduled;
1652 trace!(
1653 "Draining from scheduler (desire at least {} scheduled rows)",
1654 desired_scheduled
1655 );
1656 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1657 if actually_scheduled < desired_scheduled {
1658 let under_scheduled = desired_scheduled - actually_scheduled;
1659 to_take -= under_scheduled;
1660 }
1661 }
1662
1663 if to_take == 0 {
1664 return Ok(None);
1665 }
1666
1667 let next_task = self.root_decoder.drain_batch(to_take)?;
1668
1669 self.rows_drained += to_take;
1670
1671 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1672
1673 Ok(Some(batch))
1674 }
1675}
1676
1677impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1678 type Item = ArrowResult<RecordBatch>;
1679
1680 fn next(&mut self) -> Option<Self::Item> {
1681 self.next_batch_task()
1682 .transpose()
1683 .map(|r| r.map_err(ArrowError::from))
1684 }
1685}
1686
1687impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1688 fn schema(&self) -> Arc<ArrowSchema> {
1689 self.schema.clone()
1690 }
1691}
1692
1693pub struct StructuralBatchDecodeStream {
1695 context: DecoderContext,
1696 root_decoder: StructuralStructDecoder,
1697 rows_remaining: u64,
1698 rows_per_batch: u32,
1699 rows_scheduled: u64,
1700 rows_drained: u64,
1701 scheduler_exhausted: bool,
1702 emitted_batch_size_warning: Arc<Once>,
1703}
1704
1705impl StructuralBatchDecodeStream {
1706 pub fn new(
1717 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1718 rows_per_batch: u32,
1719 num_rows: u64,
1720 root_decoder: StructuralStructDecoder,
1721 ) -> Self {
1722 Self {
1723 context: DecoderContext::new(scheduled),
1724 root_decoder,
1725 rows_remaining: num_rows,
1726 rows_per_batch,
1727 rows_scheduled: 0,
1728 rows_drained: 0,
1729 scheduler_exhausted: false,
1730 emitted_batch_size_warning: Arc::new(Once::new()),
1731 }
1732 }
1733
1734 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1735 if self.scheduler_exhausted {
1736 return Ok(self.rows_scheduled);
1737 }
1738 while self.rows_scheduled < scheduled_need {
1739 let next_message = self.context.source.recv().await;
1740 match next_message {
1741 Some(scan_line) => {
1742 let scan_line = scan_line?;
1743 self.rows_scheduled = scan_line.scheduled_so_far;
1744 for message in scan_line.decoders {
1745 let unloaded_page = message.into_structural();
1746 let loaded_page = unloaded_page.0.await?;
1747 self.root_decoder.accept_page(loaded_page)?;
1748 }
1749 }
1750 None => {
1751 self.scheduler_exhausted = true;
1755 return Ok(self.rows_scheduled);
1756 }
1757 }
1758 }
1759 Ok(scheduled_need)
1760 }
1761
1762 #[instrument(level = "debug", skip_all)]
1763 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1764 trace!(
1765 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1766 self.rows_remaining,
1767 self.rows_drained,
1768 self.rows_scheduled,
1769 );
1770 if self.rows_remaining == 0 {
1771 return Ok(None);
1772 }
1773
1774 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1775 self.rows_remaining -= to_take;
1776
1777 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1778 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1779 if scheduled_need > 0 {
1780 let desired_scheduled = scheduled_need + self.rows_scheduled;
1781 trace!(
1782 "Draining from scheduler (desire at least {} scheduled rows)",
1783 desired_scheduled
1784 );
1785 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1786 if actually_scheduled < desired_scheduled {
1787 let under_scheduled = desired_scheduled - actually_scheduled;
1788 to_take -= under_scheduled;
1789 }
1790 }
1791
1792 if to_take == 0 {
1793 return Ok(None);
1794 }
1795
1796 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1797 self.rows_drained += to_take;
1798 Ok(Some(next_task))
1799 }
1800
1801 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1802 let stream = futures::stream::unfold(self, |mut slf| async move {
1803 let next_task = slf.next_batch_task().await;
1804 let next_task = next_task.transpose().map(|next_task| {
1805 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1806 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1807 let task = tokio::spawn(async move {
1808 let next_task = next_task?;
1809 next_task.into_batch(emitted_batch_size_warning)
1810 });
1811 (task, num_rows)
1812 });
1813 next_task.map(|(task, num_rows)| {
1814 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1815 debug_assert!(num_rows <= u32::MAX as u64);
1817 let next_task = ReadBatchTask {
1818 task,
1819 num_rows: num_rows as u32,
1820 };
1821 (next_task, slf)
1822 })
1823 });
1824 stream.boxed()
1825 }
1826}
1827
1828#[derive(Debug)]
1829pub enum RequestedRows {
1830 Ranges(Vec<Range<u64>>),
1831 Indices(Vec<u64>),
1832}
1833
1834impl RequestedRows {
1835 pub fn num_rows(&self) -> u64 {
1836 match self {
1837 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1838 Self::Indices(indices) => indices.len() as u64,
1839 }
1840 }
1841}
1842
1843#[derive(Debug, Clone)]
1844pub struct SchedulerDecoderConfig {
1845 pub decoder_plugins: Arc<DecoderPlugins>,
1846 pub batch_size: u32,
1847 pub io: Arc<dyn EncodingsIo>,
1848 pub cache: Arc<FileMetadataCache>,
1849 pub should_validate: bool,
1850}
1851
1852fn check_scheduler_on_drop(
1853 stream: BoxStream<'static, ReadBatchTask>,
1854 scheduler_handle: tokio::task::JoinHandle<()>,
1855) -> BoxStream<'static, ReadBatchTask> {
1856 let mut scheduler_handle = Some(scheduler_handle);
1860 let check_scheduler = stream::unfold((), move |_| {
1861 let handle = scheduler_handle.take();
1862 async move {
1863 if let Some(handle) = handle {
1864 handle.await.unwrap();
1865 }
1866 None
1867 }
1868 });
1869 stream.chain(check_scheduler).boxed()
1870}
1871
1872pub fn create_decode_stream(
1873 schema: &Schema,
1874 num_rows: u64,
1875 batch_size: u32,
1876 is_structural: bool,
1877 should_validate: bool,
1878 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1879) -> BoxStream<'static, ReadBatchTask> {
1880 if is_structural {
1881 let arrow_schema = ArrowSchema::from(schema);
1882 let structural_decoder = StructuralStructDecoder::new(
1883 arrow_schema.fields,
1884 should_validate,
1885 true,
1886 );
1887 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1888 } else {
1889 let arrow_schema = ArrowSchema::from(schema);
1890 let root_fields = arrow_schema.fields;
1891
1892 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1893 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1894 }
1895}
1896
1897pub fn create_decode_iterator(
1901 schema: &Schema,
1902 num_rows: u64,
1903 batch_size: u32,
1904 should_validate: bool,
1905 is_structural: bool,
1906 messages: VecDeque<Result<DecoderMessage>>,
1907) -> Box<dyn RecordBatchReader> {
1908 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1909 let root_fields = arrow_schema.fields.clone();
1910 if is_structural {
1911 let simple_struct_decoder =
1912 StructuralStructDecoder::new(root_fields, should_validate, true);
1913 Box::new(BatchDecodeIterator::new(
1914 messages,
1915 batch_size,
1916 num_rows,
1917 simple_struct_decoder,
1918 arrow_schema,
1919 ))
1920 } else {
1921 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1922 Box::new(BatchDecodeIterator::new(
1923 messages,
1924 batch_size,
1925 num_rows,
1926 root_decoder,
1927 arrow_schema,
1928 ))
1929 }
1930}
1931
1932fn create_scheduler_decoder(
1933 column_infos: Vec<Arc<ColumnInfo>>,
1934 requested_rows: RequestedRows,
1935 filter: FilterExpression,
1936 column_indices: Vec<u32>,
1937 target_schema: Arc<Schema>,
1938 config: SchedulerDecoderConfig,
1939) -> Result<BoxStream<'static, ReadBatchTask>> {
1940 let num_rows = requested_rows.num_rows();
1941
1942 let is_structural = column_infos[0].is_structural();
1943
1944 let (tx, rx) = mpsc::unbounded_channel();
1945
1946 let decode_stream = create_decode_stream(
1947 &target_schema,
1948 num_rows,
1949 config.batch_size,
1950 is_structural,
1951 config.should_validate,
1952 rx,
1953 );
1954
1955 let scheduler_handle = tokio::task::spawn(async move {
1956 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1957 target_schema.as_ref(),
1958 &column_indices,
1959 &column_infos,
1960 &vec![],
1961 num_rows,
1962 config.decoder_plugins,
1963 config.io.clone(),
1964 config.cache,
1965 &filter,
1966 )
1967 .await
1968 {
1969 Ok(scheduler) => scheduler,
1970 Err(e) => {
1971 let _ = tx.send(Err(e));
1972 return;
1973 }
1974 };
1975
1976 match requested_rows {
1977 RequestedRows::Ranges(ranges) => {
1978 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1979 }
1980 RequestedRows::Indices(indices) => {
1981 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1982 }
1983 }
1984 });
1985
1986 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1987}
1988
1989pub fn schedule_and_decode(
1995 column_infos: Vec<Arc<ColumnInfo>>,
1996 requested_rows: RequestedRows,
1997 filter: FilterExpression,
1998 column_indices: Vec<u32>,
1999 target_schema: Arc<Schema>,
2000 config: SchedulerDecoderConfig,
2001) -> BoxStream<'static, ReadBatchTask> {
2002 if requested_rows.num_rows() == 0 {
2003 return stream::empty().boxed();
2004 }
2005 match create_scheduler_decoder(
2009 column_infos,
2010 requested_rows,
2011 filter,
2012 column_indices,
2013 target_schema,
2014 config,
2015 ) {
2016 Ok(stream) => stream,
2018 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2019 num_rows: 0,
2020 task: std::future::ready(Err(e)).boxed(),
2021 }))
2022 .boxed(),
2023 }
2024}
2025
2026lazy_static::lazy_static! {
2027 pub static ref WAITER_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
2028 .build()
2029 .unwrap();
2030}
2031
2032pub fn schedule_and_decode_blocking(
2047 column_infos: Vec<Arc<ColumnInfo>>,
2048 requested_rows: RequestedRows,
2049 filter: FilterExpression,
2050 column_indices: Vec<u32>,
2051 target_schema: Arc<Schema>,
2052 config: SchedulerDecoderConfig,
2053) -> Result<Box<dyn RecordBatchReader>> {
2054 if requested_rows.num_rows() == 0 {
2055 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2056 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2057 }
2058
2059 let num_rows = requested_rows.num_rows();
2060 let is_structural = column_infos[0].is_structural();
2061
2062 let (tx, mut rx) = mpsc::unbounded_channel();
2063
2064 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2067 target_schema.as_ref(),
2068 &column_indices,
2069 &column_infos,
2070 &vec![],
2071 num_rows,
2072 config.decoder_plugins,
2073 config.io.clone(),
2074 config.cache,
2075 &filter,
2076 ))?;
2077
2078 match requested_rows {
2080 RequestedRows::Ranges(ranges) => {
2081 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2082 }
2083 RequestedRows::Indices(indices) => {
2084 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2085 }
2086 }
2087
2088 let mut messages = Vec::new();
2090 while rx
2091 .recv_many(&mut messages, usize::MAX)
2092 .now_or_never()
2093 .unwrap()
2094 != 0
2095 {}
2096
2097 let decode_iterator = create_decode_iterator(
2099 &target_schema,
2100 num_rows,
2101 config.batch_size,
2102 config.should_validate,
2103 is_structural,
2104 messages.into(),
2105 );
2106
2107 Ok(decode_iterator)
2108}
2109
2110pub trait PrimitivePageDecoder: Send + Sync {
2122 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2154}
2155
2156pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2165 fn schedule_ranges(
2177 &self,
2178 ranges: &[Range<u64>],
2179 scheduler: &Arc<dyn EncodingsIo>,
2180 top_level_row: u64,
2181 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2182}
2183
2184pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2186 fn advance(&mut self, num_rows: u64);
2187 fn current_priority(&self) -> u64;
2188 fn box_clone(&self) -> Box<dyn PriorityRange>;
2189}
2190
2191#[derive(Debug)]
2194pub struct SimplePriorityRange {
2195 priority: u64,
2196}
2197
2198impl SimplePriorityRange {
2199 fn new(priority: u64) -> Self {
2200 Self { priority }
2201 }
2202}
2203
2204impl PriorityRange for SimplePriorityRange {
2205 fn advance(&mut self, num_rows: u64) {
2206 self.priority += num_rows;
2207 }
2208
2209 fn current_priority(&self) -> u64 {
2210 self.priority
2211 }
2212
2213 fn box_clone(&self) -> Box<dyn PriorityRange> {
2214 Box::new(Self {
2215 priority: self.priority,
2216 })
2217 }
2218}
2219
2220pub struct ListPriorityRange {
2233 base: Box<dyn PriorityRange>,
2234 offsets: Arc<[u64]>,
2235 cur_index_into_offsets: usize,
2236 cur_position: u64,
2237}
2238
2239impl ListPriorityRange {
2240 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2241 Self {
2242 base,
2243 offsets,
2244 cur_index_into_offsets: 0,
2245 cur_position: 0,
2246 }
2247 }
2248}
2249
2250impl std::fmt::Debug for ListPriorityRange {
2251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2252 f.debug_struct("ListPriorityRange")
2253 .field("base", &self.base)
2254 .field("offsets.len()", &self.offsets.len())
2255 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2256 .field("cur_position", &self.cur_position)
2257 .finish()
2258 }
2259}
2260
2261impl PriorityRange for ListPriorityRange {
2262 fn advance(&mut self, num_rows: u64) {
2263 self.cur_position += num_rows;
2266 let mut idx_into_offsets = self.cur_index_into_offsets;
2267 while idx_into_offsets + 1 < self.offsets.len()
2268 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2269 {
2270 idx_into_offsets += 1;
2271 }
2272 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2273 self.cur_index_into_offsets = idx_into_offsets;
2274 self.base.advance(base_rows_advanced as u64);
2275 }
2276
2277 fn current_priority(&self) -> u64 {
2278 self.base.current_priority()
2279 }
2280
2281 fn box_clone(&self) -> Box<dyn PriorityRange> {
2282 Box::new(Self {
2283 base: self.base.box_clone(),
2284 offsets: self.offsets.clone(),
2285 cur_index_into_offsets: self.cur_index_into_offsets,
2286 cur_position: self.cur_position,
2287 })
2288 }
2289}
2290
2291pub struct SchedulerContext {
2293 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2294 io: Arc<dyn EncodingsIo>,
2295 cache: Arc<FileMetadataCache>,
2296 name: String,
2297 path: Vec<u32>,
2298 path_names: Vec<String>,
2299}
2300
2301pub struct ScopedSchedulerContext<'a> {
2302 pub context: &'a mut SchedulerContext,
2303}
2304
2305impl<'a> ScopedSchedulerContext<'a> {
2306 pub fn pop(self) -> &'a mut SchedulerContext {
2307 self.context.pop();
2308 self.context
2309 }
2310}
2311
2312impl SchedulerContext {
2313 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<FileMetadataCache>) -> Self {
2314 Self {
2315 io,
2316 cache,
2317 recv: None,
2318 name: "".to_string(),
2319 path: Vec::new(),
2320 path_names: Vec::new(),
2321 }
2322 }
2323
2324 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2325 &self.io
2326 }
2327
2328 pub fn cache(&self) -> &Arc<FileMetadataCache> {
2329 &self.cache
2330 }
2331
2332 pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2333 self.path.push(index);
2334 self.path_names.push(name.to_string());
2335 ScopedSchedulerContext { context: self }
2336 }
2337
2338 pub fn pop(&mut self) {
2339 self.path.pop();
2340 self.path_names.pop();
2341 }
2342
2343 pub fn path_name(&self) -> String {
2344 let path = self.path_names.join("/");
2345 if self.recv.is_some() {
2346 format!("TEMP({}){}", self.name, path)
2347 } else {
2348 format!("ROOT{}", path)
2349 }
2350 }
2351
2352 pub fn current_path(&self) -> VecDeque<u32> {
2353 VecDeque::from_iter(self.path.iter().copied())
2354 }
2355
2356 pub fn locate_decoder(&mut self, decoder: Box<dyn LogicalPageDecoder>) -> DecoderReady {
2357 trace!(
2358 "Scheduling decoder of type {:?} for {:?}",
2359 decoder.data_type(),
2360 self.path,
2361 );
2362 DecoderReady {
2363 decoder,
2364 path: self.current_path(),
2365 }
2366 }
2367}
2368
2369pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2370
2371impl std::fmt::Debug for UnloadedPage {
2372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2373 f.debug_struct("UnloadedPage").finish()
2374 }
2375}
2376
2377#[derive(Debug)]
2378pub struct ScheduledScanLine {
2379 pub rows_scheduled: u64,
2380 pub decoders: Vec<MessageType>,
2381}
2382
2383pub trait SchedulingJob: std::fmt::Debug {
2384 fn schedule_next(
2385 &mut self,
2386 context: &mut SchedulerContext,
2387 priority: &dyn PriorityRange,
2388 ) -> Result<ScheduledScanLine>;
2389
2390 fn num_rows(&self) -> u64;
2391}
2392
2393pub trait StructuralSchedulingJob: std::fmt::Debug {
2394 fn schedule_next(
2395 &mut self,
2396 context: &mut SchedulerContext,
2397 ) -> Result<Option<ScheduledScanLine>>;
2398}
2399
2400pub struct FilterExpression(pub Bytes);
2408
2409impl FilterExpression {
2410 pub fn no_filter() -> Self {
2415 Self(Bytes::new())
2416 }
2417
2418 pub fn is_noop(&self) -> bool {
2420 self.0.is_empty()
2421 }
2422}
2423
2424pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
2449 fn initialize<'a>(
2451 &'a self,
2452 filter: &'a FilterExpression,
2453 context: &'a SchedulerContext,
2454 ) -> BoxFuture<'a, Result<()>>;
2455 fn schedule_ranges<'a>(
2460 &'a self,
2461 ranges: &[Range<u64>],
2462 filter: &FilterExpression,
2463 ) -> Result<Box<dyn SchedulingJob + 'a>>;
2464 fn num_rows(&self) -> u64;
2466}
2467
2468pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2469 fn initialize<'a>(
2470 &'a mut self,
2471 filter: &'a FilterExpression,
2472 context: &'a SchedulerContext,
2473 ) -> BoxFuture<'a, Result<()>>;
2474 fn schedule_ranges<'a>(
2475 &'a self,
2476 ranges: &[Range<u64>],
2477 filter: &FilterExpression,
2478 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2479}
2480
2481pub trait DecodeArrayTask: Send {
2483 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2485}
2486
2487impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2488 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2489 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2490 }
2491}
2492
2493pub struct NextDecodeTask {
2498 pub task: Box<dyn DecodeArrayTask>,
2500 pub num_rows: u64,
2502}
2503
2504impl NextDecodeTask {
2505 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2510 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2511 let struct_arr = self.task.decode();
2512 match struct_arr {
2513 Ok(struct_arr) => {
2514 let batch = RecordBatch::from(struct_arr.as_struct());
2515 let size_bytes = batch.get_array_memory_size() as u64;
2516 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2517 emitted_batch_size_warning.call_once(|| {
2518 let size_mb = size_bytes / 1024 / 1024;
2519 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2520 });
2521 }
2522 Ok(batch)
2523 }
2524 Err(e) => {
2525 let e = Error::Internal {
2526 message: format!("Error decoding batch: {}", e),
2527 location: location!(),
2528 };
2529 Err(e)
2530 }
2531 }
2532 }
2533}
2534
2535#[derive(Debug)]
2536pub struct DecoderReady {
2537 pub decoder: Box<dyn LogicalPageDecoder>,
2539 pub path: VecDeque<u32>,
2558}
2559
2560#[derive(Debug)]
2564pub enum MessageType {
2565 DecoderReady(DecoderReady),
2570 UnloadedPage(UnloadedPage),
2574}
2575
2576impl MessageType {
2577 pub fn into_legacy(self) -> DecoderReady {
2578 match self {
2579 Self::DecoderReady(decoder) => decoder,
2580 Self::UnloadedPage(_) => {
2581 panic!("Expected DecoderReady but got UnloadedPage")
2582 }
2583 }
2584 }
2585
2586 pub fn into_structural(self) -> UnloadedPage {
2587 match self {
2588 Self::UnloadedPage(unloaded) => unloaded,
2589 Self::DecoderReady(_) => {
2590 panic!("Expected UnloadedPage but got DecoderReady")
2591 }
2592 }
2593 }
2594}
2595
2596pub struct DecoderMessage {
2597 pub scheduled_so_far: u64,
2598 pub decoders: Vec<MessageType>,
2599}
2600
2601pub struct DecoderContext {
2602 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2603}
2604
2605impl DecoderContext {
2606 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2607 Self { source }
2608 }
2609}
2610
2611pub trait LogicalPageDecoder: std::fmt::Debug + Send {
2620 fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
2625 Err(Error::Internal {
2626 message: format!(
2627 "The decoder {:?} does not expect children but received a child",
2628 self
2629 ),
2630 location: location!(),
2631 })
2632 }
2633 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>>;
2635 fn rows_loaded(&self) -> u64;
2637 fn rows_unloaded(&self) -> u64 {
2639 self.num_rows() - self.rows_loaded()
2640 }
2641 fn num_rows(&self) -> u64;
2643 fn rows_drained(&self) -> u64;
2645 fn rows_left(&self) -> u64 {
2647 self.num_rows() - self.rows_drained()
2648 }
2649 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
2651 fn data_type(&self) -> &DataType;
2653}
2654
2655pub struct DecodedPage {
2656 pub data: DataBlock,
2657 pub repdef: RepDefUnraveler,
2658}
2659
2660pub trait DecodePageTask: Send + std::fmt::Debug {
2661 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2663}
2664
2665pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2666 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2667 fn num_rows(&self) -> u64;
2668}
2669
2670#[derive(Debug)]
2671pub struct LoadedPage {
2672 pub decoder: Box<dyn StructuralPageDecoder>,
2674 pub path: VecDeque<u32>,
2693 pub page_index: usize,
2694}
2695
2696pub struct DecodedArray {
2697 pub array: ArrayRef,
2698 pub repdef: CompositeRepDefUnraveler,
2699}
2700
2701pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2702 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2703}
2704
2705pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2706 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2711 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2713 fn data_type(&self) -> &DataType;
2715}
2716
2717#[derive(Debug, Default)]
2718pub struct DecoderPlugins {}
2719
2720pub async fn decode_batch(
2722 batch: &EncodedBatch,
2723 filter: &FilterExpression,
2724 decoder_plugins: Arc<DecoderPlugins>,
2725 should_validate: bool,
2726 version: LanceFileVersion,
2727 cache: Option<Arc<FileMetadataCache>>,
2728) -> Result<RecordBatch> {
2729 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2734 let cache = cache.unwrap_or_else(|| {
2735 Arc::new(FileMetadataCache::with_capacity(
2736 128 * 1024 * 1024,
2737 CapacityMode::Bytes,
2738 ))
2739 });
2740 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2741 batch.schema.as_ref(),
2742 &batch.top_level_columns,
2743 &batch.page_table,
2744 &vec![],
2745 batch.num_rows,
2746 decoder_plugins,
2747 io_scheduler.clone(),
2748 cache,
2749 filter,
2750 )
2751 .await?;
2752 let (tx, rx) = unbounded_channel();
2753 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2754 let is_structural = version >= LanceFileVersion::V2_1;
2755 let mut decode_stream = create_decode_stream(
2756 &batch.schema,
2757 batch.num_rows,
2758 batch.num_rows as u32,
2759 is_structural,
2760 should_validate,
2761 rx,
2762 );
2763 decode_stream.next().await.unwrap().task.await
2764}