1use std::collections::VecDeque;
216use std::sync::Once;
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::{CapacityMode, FileMetadataCache};
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::{instrument, Instrument};
236
237use crate::buffer::LanceBuffer;
238use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239use crate::encoder::{values_column_encoding, EncodedBatch};
240use crate::encodings::logical::binary::BinaryFieldScheduler;
241use crate::encodings::logical::blob::BlobFieldScheduler;
242use crate::encodings::logical::list::{
243 ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
244};
245use crate::encodings::logical::primitive::{
246 PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
247};
248use crate::encodings::logical::r#struct::{
249 SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250};
251use crate::encodings::physical::binary::{
252 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253};
254use crate::encodings::physical::bitpack_fastlanes::InlineBitpacking;
255use crate::encodings::physical::block_compress::CompressedBufferEncoder;
256use crate::encodings::physical::fsst::{FsstMiniBlockDecompressor, FsstPerValueDecompressor};
257use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
258use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
259use crate::encodings::physical::{ColumnBuffers, FileBuffers};
260use crate::format::pb::{self, column_encoding};
261use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
262use crate::version::LanceFileVersion;
263use crate::{BufferScheduler, EncodingsIo};
264
265const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
267
268#[derive(Debug)]
275pub enum PageEncoding {
276 Legacy(pb::ArrayEncoding),
277 Structural(pb::PageLayout),
278}
279
280impl PageEncoding {
281 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
282 match self {
283 Self::Legacy(enc) => enc,
284 Self::Structural(_) => panic!("Expected a legacy encoding"),
285 }
286 }
287
288 pub fn as_structural(&self) -> &pb::PageLayout {
289 match self {
290 Self::Structural(enc) => enc,
291 Self::Legacy(_) => panic!("Expected a structural encoding"),
292 }
293 }
294
295 pub fn is_structural(&self) -> bool {
296 matches!(self, Self::Structural(_))
297 }
298}
299
300#[derive(Debug)]
304pub struct PageInfo {
305 pub num_rows: u64,
307 pub priority: u64,
311 pub encoding: PageEncoding,
313 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
315}
316
317#[derive(Debug, Clone)]
321pub struct ColumnInfo {
322 pub index: u32,
324 pub page_infos: Arc<[PageInfo]>,
326 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
328 pub encoding: pb::ColumnEncoding,
329}
330
331impl ColumnInfo {
332 pub fn new(
334 index: u32,
335 page_infos: Arc<[PageInfo]>,
336 buffer_offsets_and_sizes: Vec<(u64, u64)>,
337 encoding: pb::ColumnEncoding,
338 ) -> Self {
339 Self {
340 index,
341 page_infos,
342 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
343 encoding,
344 }
345 }
346
347 pub fn is_structural(&self) -> bool {
348 self.page_infos
349 .first()
351 .map(|page| page.encoding.is_structural())
352 .unwrap_or(false)
353 }
354}
355
356enum RootScheduler {
357 Structural(Box<dyn StructuralFieldScheduler>),
358 Legacy(Arc<dyn FieldScheduler>),
359}
360
361impl RootScheduler {
362 fn as_legacy(&self) -> &Arc<dyn FieldScheduler> {
363 match self {
364 Self::Structural(_) => panic!("Expected a legacy scheduler"),
365 Self::Legacy(s) => s,
366 }
367 }
368
369 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
370 match self {
371 Self::Structural(s) => s.as_ref(),
372 Self::Legacy(_) => panic!("Expected a structural scheduler"),
373 }
374 }
375}
376
377pub struct DecodeBatchScheduler {
399 root_scheduler: RootScheduler,
400 pub root_fields: Fields,
401 cache: Arc<FileMetadataCache>,
402}
403
404pub struct ColumnInfoIter<'a> {
405 column_infos: Vec<Arc<ColumnInfo>>,
406 column_indices: &'a [u32],
407 column_info_pos: usize,
408 column_indices_pos: usize,
409}
410
411impl<'a> ColumnInfoIter<'a> {
412 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
413 let initial_pos = column_indices[0] as usize;
414 Self {
415 column_infos,
416 column_indices,
417 column_info_pos: initial_pos,
418 column_indices_pos: 0,
419 }
420 }
421
422 pub fn peek(&self) -> &Arc<ColumnInfo> {
423 &self.column_infos[self.column_info_pos]
424 }
425
426 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
427 let column_info = self.column_infos[self.column_info_pos].clone();
428 let transformed = transform(column_info);
429 self.column_infos[self.column_info_pos] = transformed;
430 }
431
432 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
433 self.next().ok_or_else(|| {
434 Error::invalid_input(
435 "there were more fields in the schema than provided column indices / infos",
436 location!(),
437 )
438 })
439 }
440
441 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
442 if self.column_info_pos < self.column_infos.len() {
443 let info = &self.column_infos[self.column_info_pos];
444 self.column_info_pos += 1;
445 Some(info)
446 } else {
447 None
448 }
449 }
450
451 pub(crate) fn next_top_level(&mut self) {
452 self.column_indices_pos += 1;
453 if self.column_indices_pos < self.column_indices.len() {
454 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
455 } else {
456 self.column_info_pos = self.column_infos.len();
457 }
458 }
459}
460
461pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
462 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
463}
464
465pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
466 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
468 fn bits_per_value(&self) -> u64;
472}
473
474pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
475 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
477}
478
479pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
480 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
481}
482
483pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
484 fn create_miniblock_decompressor(
485 &self,
486 description: &pb::ArrayEncoding,
487 ) -> Result<Box<dyn MiniBlockDecompressor>>;
488
489 fn create_fixed_per_value_decompressor(
490 &self,
491 description: &pb::ArrayEncoding,
492 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
493
494 fn create_variable_per_value_decompressor(
495 &self,
496 description: &pb::ArrayEncoding,
497 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
498
499 fn create_block_decompressor(
500 &self,
501 description: &pb::ArrayEncoding,
502 ) -> Result<Box<dyn BlockDecompressor>>;
503}
504
505#[derive(Debug, Default)]
506pub struct CoreDecompressorStrategy {}
507
508impl DecompressorStrategy for CoreDecompressorStrategy {
509 fn create_miniblock_decompressor(
510 &self,
511 description: &pb::ArrayEncoding,
512 ) -> Result<Box<dyn MiniBlockDecompressor>> {
513 match description.array_encoding.as_ref().unwrap() {
514 pb::array_encoding::ArrayEncoding::Flat(flat) => {
515 Ok(Box::new(ValueDecompressor::from_flat(flat)))
516 }
517 pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
518 Ok(Box::new(InlineBitpacking::from_description(description)))
519 }
520 pb::array_encoding::ArrayEncoding::Variable(_) => {
521 Ok(Box::new(BinaryMiniBlockDecompressor::default()))
522 }
523 pb::array_encoding::ArrayEncoding::Fsst(description) => {
524 Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
525 }
526 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
527 Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
528 description,
529 )))
530 }
531 pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
532 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
535 }
536 _ => todo!(),
537 }
538 }
539
540 fn create_fixed_per_value_decompressor(
541 &self,
542 description: &pb::ArrayEncoding,
543 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
544 match description.array_encoding.as_ref().unwrap() {
545 pb::array_encoding::ArrayEncoding::Flat(flat) => {
546 Ok(Box::new(ValueDecompressor::from_flat(flat)))
547 }
548 pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
549 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
550 }
551 _ => todo!("fixed-per-value decompressor for {:?}", description),
552 }
553 }
554
555 fn create_variable_per_value_decompressor(
556 &self,
557 description: &pb::ArrayEncoding,
558 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
559 match *description.array_encoding.as_ref().unwrap() {
560 pb::array_encoding::ArrayEncoding::Variable(variable) => {
561 assert!(variable.bits_per_offset < u8::MAX as u32);
562 Ok(Box::new(VariableDecoder::default()))
563 }
564 pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
565 Ok(Box::new(FsstPerValueDecompressor::new(
566 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
567 Box::new(VariableDecoder::default()),
568 )))
569 }
570 pb::array_encoding::ArrayEncoding::Block(ref block) => Ok(Box::new(
571 CompressedBufferEncoder::from_scheme(&block.scheme)?,
572 )),
573 _ => todo!("variable-per-value decompressor for {:?}", description),
574 }
575 }
576
577 fn create_block_decompressor(
578 &self,
579 description: &pb::ArrayEncoding,
580 ) -> Result<Box<dyn BlockDecompressor>> {
581 match description.array_encoding.as_ref().unwrap() {
582 pb::array_encoding::ArrayEncoding::Flat(flat) => {
583 Ok(Box::new(ValueDecompressor::from_flat(flat)))
584 }
585 pb::array_encoding::ArrayEncoding::Constant(constant) => {
586 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
587 Ok(Box::new(ConstantDecompressor::new(scalar)))
588 }
589 pb::array_encoding::ArrayEncoding::Variable(_) => {
590 Ok(Box::new(BinaryBlockDecompressor::default()))
591 }
592 _ => todo!(),
593 }
594 }
595}
596
597#[derive(Debug)]
599pub struct CoreFieldDecoderStrategy {
600 pub validate_data: bool,
601 pub decompressor_strategy: Arc<dyn DecompressorStrategy>,
602}
603
604impl Default for CoreFieldDecoderStrategy {
605 fn default() -> Self {
606 Self {
607 validate_data: false,
608 decompressor_strategy: Arc::new(CoreDecompressorStrategy {}),
609 }
610 }
611}
612
613impl CoreFieldDecoderStrategy {
614 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
617 let column_encoding = column_info
618 .encoding
619 .column_encoding
620 .as_ref()
621 .ok_or_else(|| {
622 Error::invalid_input(
623 format!(
624 "the column at index {} was missing a ColumnEncoding",
625 column_info.index
626 ),
627 location!(),
628 )
629 })?;
630 if matches!(
631 column_encoding,
632 pb::column_encoding::ColumnEncoding::Values(_)
633 ) {
634 Ok(())
635 } else {
636 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
637 }
638 }
639
640 fn is_primitive(data_type: &DataType) -> bool {
641 if data_type.is_primitive() {
642 true
643 } else {
644 match data_type {
645 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
647 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
648 _ => false,
649 }
650 }
651 }
652
653 fn create_primitive_scheduler(
654 &self,
655 field: &Field,
656 column: &ColumnInfo,
657 buffers: FileBuffers,
658 ) -> Result<Box<dyn FieldScheduler>> {
659 Self::ensure_values_encoded(column, &field.name)?;
660 let column_buffers = ColumnBuffers {
662 file_buffers: buffers,
663 positions_and_sizes: &column.buffer_offsets_and_sizes,
664 };
665 Ok(Box::new(PrimitiveFieldScheduler::new(
666 column.index,
667 field.data_type(),
668 column.page_infos.clone(),
669 column_buffers,
670 self.validate_data,
671 )))
672 }
673
674 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
676 Self::ensure_values_encoded(column_info, field_name)?;
677 if column_info.page_infos.len() != 1 {
678 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
679 }
680 let encoding = &column_info.page_infos[0].encoding;
681 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
682 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
683 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
684 }
685 }
686
687 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
688 let encoding = &column_info.page_infos[0].encoding;
689 matches!(
690 encoding.as_legacy().array_encoding.as_ref().unwrap(),
691 pb::array_encoding::ArrayEncoding::PackedStruct(_)
692 )
693 }
694
695 fn create_list_scheduler(
696 &self,
697 list_field: &Field,
698 column_infos: &mut ColumnInfoIter,
699 buffers: FileBuffers,
700 offsets_column: &ColumnInfo,
701 ) -> Result<Box<dyn FieldScheduler>> {
702 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
703 let offsets_column_buffers = ColumnBuffers {
704 file_buffers: buffers,
705 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
706 };
707 let items_scheduler =
708 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
709
710 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
711 .page_infos
712 .iter()
713 .filter(|offsets_page| offsets_page.num_rows > 0)
714 .map(|offsets_page| {
715 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
716 &offsets_page.encoding.as_legacy().array_encoding
717 {
718 let inner = PageInfo {
719 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
720 encoding: PageEncoding::Legacy(
721 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
722 ),
723 num_rows: offsets_page.num_rows,
724 priority: 0,
725 };
726 (
727 inner,
728 OffsetPageInfo {
729 offsets_in_page: offsets_page.num_rows,
730 null_offset_adjustment: list_encoding.null_offset_adjustment,
731 num_items_referenced_by_page: list_encoding.num_items,
732 },
733 )
734 } else {
735 panic!("Expected a list column");
737 }
738 })
739 .unzip();
740 let inner = Arc::new(PrimitiveFieldScheduler::new(
741 offsets_column.index,
742 DataType::UInt64,
743 Arc::from(inner_infos.into_boxed_slice()),
744 offsets_column_buffers,
745 self.validate_data,
746 )) as Arc<dyn FieldScheduler>;
747 let items_field = match list_field.data_type() {
748 DataType::List(inner) => inner,
749 DataType::LargeList(inner) => inner,
750 _ => unreachable!(),
751 };
752 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
753 DataType::Int32
754 } else {
755 DataType::Int64
756 };
757 Ok(Box::new(ListFieldScheduler::new(
758 inner,
759 items_scheduler.into(),
760 items_field,
761 offset_type,
762 null_offset_adjustments,
763 )))
764 }
765
766 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
767 if let column_encoding::ColumnEncoding::Blob(blob) =
768 column_info.encoding.column_encoding.as_ref().unwrap()
769 {
770 let mut column_info = column_info.clone();
771 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
772 Some(column_info)
773 } else {
774 None
775 }
776 }
777
778 fn create_structural_field_scheduler(
779 &self,
780 field: &Field,
781 column_infos: &mut ColumnInfoIter,
782 ) -> Result<Box<dyn StructuralFieldScheduler>> {
783 let data_type = field.data_type();
784 if Self::is_primitive(&data_type) {
785 let column_info = column_infos.expect_next()?;
786 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
787 column_info.as_ref(),
788 self.decompressor_strategy.as_ref(),
789 )?);
790
791 column_infos.next_top_level();
793
794 return Ok(scheduler);
795 }
796 match &data_type {
797 DataType::Struct(fields) => {
798 if field.is_packed_struct() {
799 let column_info = column_infos.expect_next()?;
800 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
801 column_info.as_ref(),
802 self.decompressor_strategy.as_ref(),
803 )?);
804
805 column_infos.next_top_level();
807
808 return Ok(scheduler);
809 }
810 let mut child_schedulers = Vec::with_capacity(field.children.len());
811 for field in field.children.iter() {
812 let field_scheduler =
813 self.create_structural_field_scheduler(field, column_infos)?;
814 child_schedulers.push(field_scheduler);
815 }
816
817 let fields = fields.clone();
818 Ok(
819 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
820 as Box<dyn StructuralFieldScheduler>,
821 )
822 }
823 DataType::Binary | DataType::Utf8 => {
824 let column_info = column_infos.expect_next()?;
825 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
826 column_info.as_ref(),
827 self.decompressor_strategy.as_ref(),
828 )?);
829 column_infos.next_top_level();
830 Ok(scheduler)
831 }
832 DataType::List(_) | DataType::LargeList(_) => {
833 let child = field
834 .children
835 .first()
836 .expect("List field must have a child");
837 let child_scheduler =
838 self.create_structural_field_scheduler(child, column_infos)?;
839 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
840 as Box<dyn StructuralFieldScheduler>)
841 }
842 _ => todo!(),
843 }
844 }
845
846 fn create_legacy_field_scheduler(
847 &self,
848 field: &Field,
849 column_infos: &mut ColumnInfoIter,
850 buffers: FileBuffers,
851 ) -> Result<Box<dyn FieldScheduler>> {
852 let data_type = field.data_type();
853 if Self::is_primitive(&data_type) {
854 let column_info = column_infos.expect_next()?;
855 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
856 return Ok(scheduler);
857 } else if data_type.is_binary_like() {
858 let column_info = column_infos.next().unwrap().clone();
859 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
861 let desc_scheduler =
862 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
863 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
864 return Ok(blob_scheduler);
865 }
866 if let Some(page_info) = column_info.page_infos.first() {
867 if matches!(
868 page_info.encoding.as_legacy(),
869 pb::ArrayEncoding {
870 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
871 }
872 ) {
873 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
874 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
875 } else {
876 DataType::LargeList(Arc::new(ArrowField::new(
877 "item",
878 DataType::UInt8,
879 false,
880 )))
881 };
882 let list_field = Field::try_from(ArrowField::new(
883 field.name.clone(),
884 list_type,
885 field.nullable,
886 ))
887 .unwrap();
888 let list_scheduler = self.create_list_scheduler(
889 &list_field,
890 column_infos,
891 buffers,
892 &column_info,
893 )?;
894 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
895 list_scheduler.into(),
896 field.data_type(),
897 ));
898 return Ok(binary_scheduler);
899 } else {
900 let scheduler =
901 self.create_primitive_scheduler(field, &column_info, buffers)?;
902 return Ok(scheduler);
903 }
904 } else {
905 return self.create_primitive_scheduler(field, &column_info, buffers);
906 }
907 }
908 match &data_type {
909 DataType::FixedSizeList(inner, _dimension) => {
910 if Self::is_primitive(inner.data_type()) {
913 let primitive_col = column_infos.expect_next()?;
914 let scheduler =
915 self.create_primitive_scheduler(field, primitive_col, buffers)?;
916 Ok(scheduler)
917 } else {
918 todo!()
919 }
920 }
921 DataType::Dictionary(_key_type, value_type) => {
922 if Self::is_primitive(value_type) || value_type.is_binary_like() {
923 let primitive_col = column_infos.expect_next()?;
924 let scheduler =
925 self.create_primitive_scheduler(field, primitive_col, buffers)?;
926 Ok(scheduler)
927 } else {
928 Err(Error::NotSupported {
929 source: format!(
930 "No way to decode into a dictionary field of type {}",
931 value_type
932 )
933 .into(),
934 location: location!(),
935 })
936 }
937 }
938 DataType::List(_) | DataType::LargeList(_) => {
939 let offsets_column = column_infos.expect_next()?.clone();
940 column_infos.next_top_level();
941 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
942 }
943 DataType::Struct(fields) => {
944 let column_info = column_infos.expect_next()?;
945
946 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
948 return self.create_primitive_scheduler(field, &blob_col, buffers);
950 }
951
952 if Self::check_packed_struct(column_info) {
953 self.create_primitive_scheduler(field, column_info, buffers)
955 } else {
956 Self::check_simple_struct(column_info, &field.name).unwrap();
958 let num_rows = column_info
959 .page_infos
960 .iter()
961 .map(|page| page.num_rows)
962 .sum();
963 let mut child_schedulers = Vec::with_capacity(field.children.len());
964 for field in &field.children {
965 column_infos.next_top_level();
966 let field_scheduler =
967 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
968 child_schedulers.push(Arc::from(field_scheduler));
969 }
970
971 let fields = fields.clone();
972 Ok(Box::new(SimpleStructScheduler::new(
973 child_schedulers,
974 fields,
975 num_rows,
976 )))
977 }
978 }
979 _ => todo!(),
981 }
982 }
983}
984
985fn root_column(num_rows: u64) -> ColumnInfo {
987 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
988 let final_page_num_rows = num_rows % (u32::MAX as u64);
989 let root_pages = (0..num_root_pages)
990 .map(|i| PageInfo {
991 num_rows: if i == num_root_pages - 1 {
992 final_page_num_rows
993 } else {
994 u64::MAX
995 },
996 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
997 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
998 pb::SimpleStruct {},
999 )),
1000 }),
1001 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
1003 })
1004 .collect::<Vec<_>>();
1005 ColumnInfo {
1006 buffer_offsets_and_sizes: Arc::new([]),
1007 encoding: values_column_encoding(),
1008 index: u32::MAX,
1009 page_infos: Arc::from(root_pages),
1010 }
1011}
1012
1013pub enum RootDecoder {
1014 Structural(StructuralStructDecoder),
1015 Legacy(SimpleStructDecoder),
1016}
1017
1018impl RootDecoder {
1019 pub fn into_structural(self) -> StructuralStructDecoder {
1020 match self {
1021 Self::Structural(decoder) => decoder,
1022 Self::Legacy(_) => panic!("Expected a structural decoder"),
1023 }
1024 }
1025
1026 pub fn into_legacy(self) -> SimpleStructDecoder {
1027 match self {
1028 Self::Legacy(decoder) => decoder,
1029 Self::Structural(_) => panic!("Expected a legacy decoder"),
1030 }
1031 }
1032}
1033
1034impl DecodeBatchScheduler {
1035 #[allow(clippy::too_many_arguments)]
1038 pub async fn try_new<'a>(
1039 schema: &'a Schema,
1040 column_indices: &[u32],
1041 column_infos: &[Arc<ColumnInfo>],
1042 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
1043 num_rows: u64,
1044 _decoder_plugins: Arc<DecoderPlugins>,
1045 io: Arc<dyn EncodingsIo>,
1046 cache: Arc<FileMetadataCache>,
1047 filter: &FilterExpression,
1048 ) -> Result<Self> {
1049 assert!(num_rows > 0);
1050 let buffers = FileBuffers {
1051 positions_and_sizes: file_buffer_positions_and_sizes,
1052 };
1053 let arrow_schema = ArrowSchema::from(schema);
1054 let root_fields = arrow_schema.fields().clone();
1055 let root_type = DataType::Struct(root_fields.clone());
1056 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
1057 root_field.children.clone_from(&schema.fields);
1061 root_field
1062 .metadata
1063 .insert("__lance_decoder_root".to_string(), "true".to_string());
1064
1065 if column_infos[0].is_structural() {
1066 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
1067
1068 let mut root_scheduler = CoreFieldDecoderStrategy::default()
1069 .create_structural_field_scheduler(&root_field, &mut column_iter)?;
1070
1071 let context = SchedulerContext::new(io, cache.clone());
1072 root_scheduler.initialize(filter, &context).await?;
1073
1074 Ok(Self {
1075 root_scheduler: RootScheduler::Structural(root_scheduler),
1076 root_fields,
1077 cache,
1078 })
1079 } else {
1080 let mut columns = Vec::with_capacity(column_infos.len() + 1);
1083 columns.push(Arc::new(root_column(num_rows)));
1084 columns.extend(column_infos.iter().cloned());
1085
1086 let adjusted_column_indices = [0_u32]
1087 .into_iter()
1088 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
1089 .collect::<Vec<_>>();
1090 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
1091 let root_scheduler = CoreFieldDecoderStrategy::default()
1092 .create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
1093
1094 let context = SchedulerContext::new(io, cache.clone());
1095 root_scheduler.initialize(filter, &context).await?;
1096
1097 Ok(Self {
1098 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
1099 root_fields,
1100 cache,
1101 })
1102 }
1103 }
1104
1105 pub fn from_scheduler(
1106 root_scheduler: Arc<dyn FieldScheduler>,
1107 root_fields: Fields,
1108 cache: Arc<FileMetadataCache>,
1109 ) -> Self {
1110 Self {
1111 root_scheduler: RootScheduler::Legacy(root_scheduler),
1112 root_fields,
1113 cache,
1114 }
1115 }
1116
1117 fn do_schedule_ranges_structural(
1118 &mut self,
1119 ranges: &[Range<u64>],
1120 filter: &FilterExpression,
1121 io: Arc<dyn EncodingsIo>,
1122 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1123 ) {
1124 let root_scheduler = self.root_scheduler.as_structural();
1125 let mut context = SchedulerContext::new(io, self.cache.clone());
1126 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1127 if let Err(schedule_ranges_err) = maybe_root_job {
1128 schedule_action(Err(schedule_ranges_err));
1129 return;
1130 }
1131 let mut root_job = maybe_root_job.unwrap();
1132 let mut num_rows_scheduled = 0;
1133 loop {
1134 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1135 if let Err(err) = maybe_next_scan_line {
1136 schedule_action(Err(err));
1137 return;
1138 }
1139 let next_scan_line = maybe_next_scan_line.unwrap();
1140 match next_scan_line {
1141 Some(next_scan_line) => {
1142 trace!(
1143 "Scheduled scan line of {} rows and {} decoders",
1144 next_scan_line.rows_scheduled,
1145 next_scan_line.decoders.len()
1146 );
1147 num_rows_scheduled += next_scan_line.rows_scheduled;
1148 if !schedule_action(Ok(DecoderMessage {
1149 scheduled_so_far: num_rows_scheduled,
1150 decoders: next_scan_line.decoders,
1151 })) {
1152 return;
1154 }
1155 }
1156 None => return,
1157 }
1158 }
1159 }
1160
1161 fn do_schedule_ranges_legacy(
1162 &mut self,
1163 ranges: &[Range<u64>],
1164 filter: &FilterExpression,
1165 io: Arc<dyn EncodingsIo>,
1166 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1167 priority: Option<Box<dyn PriorityRange>>,
1171 ) {
1172 let root_scheduler = self.root_scheduler.as_legacy();
1173 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1174 trace!(
1175 "Scheduling {} ranges across {}..{} ({} rows){}",
1176 ranges.len(),
1177 ranges.first().unwrap().start,
1178 ranges.last().unwrap().end,
1179 rows_requested,
1180 priority
1181 .as_ref()
1182 .map(|p| format!(" (priority={:?})", p))
1183 .unwrap_or_default()
1184 );
1185
1186 let mut context = SchedulerContext::new(io, self.cache.clone());
1187 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1188 if let Err(schedule_ranges_err) = maybe_root_job {
1189 schedule_action(Err(schedule_ranges_err));
1190 return;
1191 }
1192 let mut root_job = maybe_root_job.unwrap();
1193 let mut num_rows_scheduled = 0;
1194 let mut rows_to_schedule = root_job.num_rows();
1195 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1196 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1197 while rows_to_schedule > 0 {
1198 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1199 if let Err(schedule_next_err) = maybe_next_scan_line {
1200 schedule_action(Err(schedule_next_err));
1201 return;
1202 }
1203 let next_scan_line = maybe_next_scan_line.unwrap();
1204 priority.advance(next_scan_line.rows_scheduled);
1205 num_rows_scheduled += next_scan_line.rows_scheduled;
1206 rows_to_schedule -= next_scan_line.rows_scheduled;
1207 trace!(
1208 "Scheduled scan line of {} rows and {} decoders",
1209 next_scan_line.rows_scheduled,
1210 next_scan_line.decoders.len()
1211 );
1212 if !schedule_action(Ok(DecoderMessage {
1213 scheduled_so_far: num_rows_scheduled,
1214 decoders: next_scan_line.decoders,
1215 })) {
1216 return;
1218 }
1219
1220 trace!("Finished scheduling {} ranges", ranges.len());
1221 }
1222 }
1223
1224 fn do_schedule_ranges(
1225 &mut self,
1226 ranges: &[Range<u64>],
1227 filter: &FilterExpression,
1228 io: Arc<dyn EncodingsIo>,
1229 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1230 priority: Option<Box<dyn PriorityRange>>,
1234 ) {
1235 match &self.root_scheduler {
1236 RootScheduler::Legacy(_) => {
1237 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1238 }
1239 RootScheduler::Structural(_) => {
1240 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1241 }
1242 }
1243 }
1244
1245 pub fn schedule_ranges_to_vec(
1248 &mut self,
1249 ranges: &[Range<u64>],
1250 filter: &FilterExpression,
1251 io: Arc<dyn EncodingsIo>,
1252 priority: Option<Box<dyn PriorityRange>>,
1253 ) -> Result<Vec<DecoderMessage>> {
1254 let mut decode_messages = Vec::new();
1255 self.do_schedule_ranges(
1256 ranges,
1257 filter,
1258 io,
1259 |msg| {
1260 decode_messages.push(msg);
1261 true
1262 },
1263 priority,
1264 );
1265 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1266 }
1267
1268 #[instrument(skip_all)]
1278 pub fn schedule_ranges(
1279 &mut self,
1280 ranges: &[Range<u64>],
1281 filter: &FilterExpression,
1282 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1283 scheduler: Arc<dyn EncodingsIo>,
1284 ) {
1285 self.do_schedule_ranges(
1286 ranges,
1287 filter,
1288 scheduler,
1289 |msg| {
1290 match sink.send(msg) {
1291 Ok(_) => true,
1292 Err(SendError { .. }) => {
1293 debug!(
1296 "schedule_ranges aborting early since decoder appears to have been dropped"
1297 );
1298 false
1299 }
1300 }
1301 },
1302 None,
1303 )
1304 }
1305
1306 #[instrument(skip_all)]
1314 pub fn schedule_range(
1315 &mut self,
1316 range: Range<u64>,
1317 filter: &FilterExpression,
1318 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1319 scheduler: Arc<dyn EncodingsIo>,
1320 ) {
1321 self.schedule_ranges(&[range], filter, sink, scheduler)
1322 }
1323
1324 pub fn schedule_take(
1332 &mut self,
1333 indices: &[u64],
1334 filter: &FilterExpression,
1335 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1336 scheduler: Arc<dyn EncodingsIo>,
1337 ) {
1338 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1339 if indices.is_empty() {
1340 return;
1341 }
1342 trace!("Scheduling take of {} rows", indices.len());
1343 let ranges = Self::indices_to_ranges(indices);
1344 self.schedule_ranges(&ranges, filter, sink, scheduler)
1345 }
1346
1347 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1349 let mut ranges = Vec::new();
1350 let mut start = indices[0];
1351
1352 for window in indices.windows(2) {
1353 if window[1] != window[0] + 1 {
1354 ranges.push(start..window[0] + 1);
1355 start = window[1];
1356 }
1357 }
1358
1359 ranges.push(start..*indices.last().unwrap() + 1);
1360 ranges
1361 }
1362}
1363
1364pub struct ReadBatchTask {
1365 pub task: BoxFuture<'static, Result<RecordBatch>>,
1366 pub num_rows: u32,
1367}
1368
1369pub struct BatchDecodeStream {
1371 context: DecoderContext,
1372 root_decoder: SimpleStructDecoder,
1373 rows_remaining: u64,
1374 rows_per_batch: u32,
1375 rows_scheduled: u64,
1376 rows_drained: u64,
1377 scheduler_exhausted: bool,
1378 emitted_batch_size_warning: Arc<Once>,
1379}
1380
1381impl BatchDecodeStream {
1382 pub fn new(
1393 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1394 rows_per_batch: u32,
1395 num_rows: u64,
1396 root_decoder: SimpleStructDecoder,
1397 ) -> Self {
1398 Self {
1399 context: DecoderContext::new(scheduled),
1400 root_decoder,
1401 rows_remaining: num_rows,
1402 rows_per_batch,
1403 rows_scheduled: 0,
1404 rows_drained: 0,
1405 scheduler_exhausted: false,
1406 emitted_batch_size_warning: Arc::new(Once::new()),
1407 }
1408 }
1409
1410 fn accept_decoder(&mut self, decoder: DecoderReady) -> Result<()> {
1411 if decoder.path.is_empty() {
1412 Ok(())
1414 } else {
1415 self.root_decoder.accept_child(decoder)
1416 }
1417 }
1418
1419 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1420 if self.scheduler_exhausted {
1421 return Ok(self.rows_scheduled);
1422 }
1423 while self.rows_scheduled < scheduled_need {
1424 let next_message = self.context.source.recv().await;
1425 match next_message {
1426 Some(scan_line) => {
1427 let scan_line = scan_line?;
1428 self.rows_scheduled = scan_line.scheduled_so_far;
1429 for message in scan_line.decoders {
1430 self.accept_decoder(message.into_legacy())?;
1431 }
1432 }
1433 None => {
1434 self.scheduler_exhausted = true;
1438 return Ok(self.rows_scheduled);
1439 }
1440 }
1441 }
1442 Ok(scheduled_need)
1443 }
1444
1445 #[instrument(level = "debug", skip_all)]
1446 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1447 trace!(
1448 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1449 self.rows_remaining,
1450 self.rows_drained,
1451 self.rows_scheduled,
1452 );
1453 if self.rows_remaining == 0 {
1454 return Ok(None);
1455 }
1456
1457 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1458 self.rows_remaining -= to_take;
1459
1460 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1461 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1462 if scheduled_need > 0 {
1463 let desired_scheduled = scheduled_need + self.rows_scheduled;
1464 trace!(
1465 "Draining from scheduler (desire at least {} scheduled rows)",
1466 desired_scheduled
1467 );
1468 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1469 if actually_scheduled < desired_scheduled {
1470 let under_scheduled = desired_scheduled - actually_scheduled;
1471 to_take -= under_scheduled;
1472 }
1473 }
1474
1475 if to_take == 0 {
1476 return Ok(None);
1477 }
1478
1479 let loaded_need = self.rows_drained + to_take - 1;
1481 trace!(
1482 "Waiting for I/O (desire at least {} fully loaded rows)",
1483 loaded_need
1484 );
1485 self.root_decoder.wait_for_loaded(loaded_need).await?;
1486
1487 let next_task = self.root_decoder.drain(to_take)?;
1488 self.rows_drained += to_take;
1489 Ok(Some(next_task))
1490 }
1491
1492 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1493 let stream = futures::stream::unfold(self, |mut slf| async move {
1494 let next_task = slf.next_batch_task().await;
1495 let next_task = next_task.transpose().map(|next_task| {
1496 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1497 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1498 let task = tokio::spawn(
1499 (async move {
1500 let next_task = next_task?;
1501 next_task.into_batch(emitted_batch_size_warning)
1502 })
1503 .in_current_span(),
1504 );
1505 (task, num_rows)
1506 });
1507 next_task.map(|(task, num_rows)| {
1508 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1509 debug_assert!(num_rows <= u32::MAX as u64);
1511 let next_task = ReadBatchTask {
1512 task,
1513 num_rows: num_rows as u32,
1514 };
1515 (next_task, slf)
1516 })
1517 });
1518 stream.boxed()
1519 }
1520}
1521
1522enum RootDecoderMessage {
1525 LoadedPage(LoadedPage),
1526 LegacyPage(DecoderReady),
1527}
1528trait RootDecoderType {
1529 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1530 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1531 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1532}
1533impl RootDecoderType for StructuralStructDecoder {
1534 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1535 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1536 unreachable!()
1537 };
1538 self.accept_page(loaded_page)
1539 }
1540 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1541 self.drain_batch_task(num_rows)
1542 }
1543 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1544 Ok(())
1546 }
1547}
1548impl RootDecoderType for SimpleStructDecoder {
1549 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1550 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1551 unreachable!()
1552 };
1553 self.accept_child(legacy_page)
1554 }
1555 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1556 self.drain(num_rows)
1557 }
1558 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1559 runtime.block_on(self.wait_for_loaded(loaded_need))
1560 }
1561}
1562
1563struct BatchDecodeIterator<T: RootDecoderType> {
1565 messages: VecDeque<Result<DecoderMessage>>,
1566 root_decoder: T,
1567 rows_remaining: u64,
1568 rows_per_batch: u32,
1569 rows_scheduled: u64,
1570 rows_drained: u64,
1571 emitted_batch_size_warning: Arc<Once>,
1572 wait_for_io_runtime: tokio::runtime::Runtime,
1576 schema: Arc<ArrowSchema>,
1577}
1578
1579impl<T: RootDecoderType> BatchDecodeIterator<T> {
1580 pub fn new(
1582 messages: VecDeque<Result<DecoderMessage>>,
1583 rows_per_batch: u32,
1584 num_rows: u64,
1585 root_decoder: T,
1586 schema: Arc<ArrowSchema>,
1587 ) -> Self {
1588 Self {
1589 messages,
1590 root_decoder,
1591 rows_remaining: num_rows,
1592 rows_per_batch,
1593 rows_scheduled: 0,
1594 rows_drained: 0,
1595 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1596 .build()
1597 .unwrap(),
1598 emitted_batch_size_warning: Arc::new(Once::new()),
1599 schema,
1600 }
1601 }
1602
1603 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1608 match maybe_done(unloaded_page.0) {
1609 MaybeDone::Done(loaded_page) => loaded_page,
1611 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1613 MaybeDone::Gone => unreachable!(),
1614 }
1615 }
1616
1617 #[instrument(skip_all)]
1622 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1623 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1624 let message = self.messages.pop_front().unwrap()?;
1625 self.rows_scheduled = message.scheduled_so_far;
1626 for decoder_message in message.decoders {
1627 match decoder_message {
1628 MessageType::UnloadedPage(unloaded_page) => {
1629 let loaded_page = self.wait_for_page(unloaded_page)?;
1630 self.root_decoder
1631 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1632 }
1633 MessageType::DecoderReady(decoder_ready) => {
1634 if !decoder_ready.path.is_empty() {
1636 self.root_decoder
1637 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1638 }
1639 }
1640 }
1641 }
1642 }
1643
1644 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1645
1646 self.root_decoder
1647 .wait(loaded_need, &self.wait_for_io_runtime)?;
1648 Ok(self.rows_scheduled)
1649 }
1650
1651 #[instrument(level = "debug", skip_all)]
1652 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1653 trace!(
1654 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1655 self.rows_remaining,
1656 self.rows_drained,
1657 self.rows_scheduled,
1658 );
1659 if self.rows_remaining == 0 {
1660 return Ok(None);
1661 }
1662
1663 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1664 self.rows_remaining -= to_take;
1665
1666 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1667 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1668 if scheduled_need > 0 {
1669 let desired_scheduled = scheduled_need + self.rows_scheduled;
1670 trace!(
1671 "Draining from scheduler (desire at least {} scheduled rows)",
1672 desired_scheduled
1673 );
1674 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1675 if actually_scheduled < desired_scheduled {
1676 let under_scheduled = desired_scheduled - actually_scheduled;
1677 to_take -= under_scheduled;
1678 }
1679 }
1680
1681 if to_take == 0 {
1682 return Ok(None);
1683 }
1684
1685 let next_task = self.root_decoder.drain_batch(to_take)?;
1686
1687 self.rows_drained += to_take;
1688
1689 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1690
1691 Ok(Some(batch))
1692 }
1693}
1694
1695impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1696 type Item = ArrowResult<RecordBatch>;
1697
1698 fn next(&mut self) -> Option<Self::Item> {
1699 self.next_batch_task()
1700 .transpose()
1701 .map(|r| r.map_err(ArrowError::from))
1702 }
1703}
1704
1705impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1706 fn schema(&self) -> Arc<ArrowSchema> {
1707 self.schema.clone()
1708 }
1709}
1710
1711pub struct StructuralBatchDecodeStream {
1713 context: DecoderContext,
1714 root_decoder: StructuralStructDecoder,
1715 rows_remaining: u64,
1716 rows_per_batch: u32,
1717 rows_scheduled: u64,
1718 rows_drained: u64,
1719 scheduler_exhausted: bool,
1720 emitted_batch_size_warning: Arc<Once>,
1721}
1722
1723impl StructuralBatchDecodeStream {
1724 pub fn new(
1735 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1736 rows_per_batch: u32,
1737 num_rows: u64,
1738 root_decoder: StructuralStructDecoder,
1739 ) -> Self {
1740 Self {
1741 context: DecoderContext::new(scheduled),
1742 root_decoder,
1743 rows_remaining: num_rows,
1744 rows_per_batch,
1745 rows_scheduled: 0,
1746 rows_drained: 0,
1747 scheduler_exhausted: false,
1748 emitted_batch_size_warning: Arc::new(Once::new()),
1749 }
1750 }
1751
1752 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1753 if self.scheduler_exhausted {
1754 return Ok(self.rows_scheduled);
1755 }
1756 while self.rows_scheduled < scheduled_need {
1757 let next_message = self.context.source.recv().await;
1758 match next_message {
1759 Some(scan_line) => {
1760 let scan_line = scan_line?;
1761 self.rows_scheduled = scan_line.scheduled_so_far;
1762 for message in scan_line.decoders {
1763 let unloaded_page = message.into_structural();
1764 let loaded_page = unloaded_page.0.await?;
1765 self.root_decoder.accept_page(loaded_page)?;
1766 }
1767 }
1768 None => {
1769 self.scheduler_exhausted = true;
1773 return Ok(self.rows_scheduled);
1774 }
1775 }
1776 }
1777 Ok(scheduled_need)
1778 }
1779
1780 #[instrument(level = "debug", skip_all)]
1781 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1782 trace!(
1783 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1784 self.rows_remaining,
1785 self.rows_drained,
1786 self.rows_scheduled,
1787 );
1788 if self.rows_remaining == 0 {
1789 return Ok(None);
1790 }
1791
1792 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1793 self.rows_remaining -= to_take;
1794
1795 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1796 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1797 if scheduled_need > 0 {
1798 let desired_scheduled = scheduled_need + self.rows_scheduled;
1799 trace!(
1800 "Draining from scheduler (desire at least {} scheduled rows)",
1801 desired_scheduled
1802 );
1803 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1804 if actually_scheduled < desired_scheduled {
1805 let under_scheduled = desired_scheduled - actually_scheduled;
1806 to_take -= under_scheduled;
1807 }
1808 }
1809
1810 if to_take == 0 {
1811 return Ok(None);
1812 }
1813
1814 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1815 self.rows_drained += to_take;
1816 Ok(Some(next_task))
1817 }
1818
1819 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1820 let stream = futures::stream::unfold(self, |mut slf| async move {
1821 let next_task = slf.next_batch_task().await;
1822 let next_task = next_task.transpose().map(|next_task| {
1823 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1824 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1825 let task = tokio::spawn(async move {
1826 let next_task = next_task?;
1827 next_task.into_batch(emitted_batch_size_warning)
1828 });
1829 (task, num_rows)
1830 });
1831 next_task.map(|(task, num_rows)| {
1832 let task = task.map(|join_wrapper| join_wrapper.unwrap()).boxed();
1833 debug_assert!(num_rows <= u32::MAX as u64);
1835 let next_task = ReadBatchTask {
1836 task,
1837 num_rows: num_rows as u32,
1838 };
1839 (next_task, slf)
1840 })
1841 });
1842 stream.boxed()
1843 }
1844}
1845
1846#[derive(Debug)]
1847pub enum RequestedRows {
1848 Ranges(Vec<Range<u64>>),
1849 Indices(Vec<u64>),
1850}
1851
1852impl RequestedRows {
1853 pub fn num_rows(&self) -> u64 {
1854 match self {
1855 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1856 Self::Indices(indices) => indices.len() as u64,
1857 }
1858 }
1859}
1860
1861#[derive(Debug, Clone)]
1862pub struct SchedulerDecoderConfig {
1863 pub decoder_plugins: Arc<DecoderPlugins>,
1864 pub batch_size: u32,
1865 pub io: Arc<dyn EncodingsIo>,
1866 pub cache: Arc<FileMetadataCache>,
1867 pub should_validate: bool,
1868}
1869
1870fn check_scheduler_on_drop(
1871 stream: BoxStream<'static, ReadBatchTask>,
1872 scheduler_handle: tokio::task::JoinHandle<()>,
1873) -> BoxStream<'static, ReadBatchTask> {
1874 let mut scheduler_handle = Some(scheduler_handle);
1878 let check_scheduler = stream::unfold((), move |_| {
1879 let handle = scheduler_handle.take();
1880 async move {
1881 if let Some(handle) = handle {
1882 handle.await.unwrap();
1883 }
1884 None
1885 }
1886 });
1887 stream.chain(check_scheduler).boxed()
1888}
1889
1890pub fn create_decode_stream(
1891 schema: &Schema,
1892 num_rows: u64,
1893 batch_size: u32,
1894 is_structural: bool,
1895 should_validate: bool,
1896 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1897) -> BoxStream<'static, ReadBatchTask> {
1898 if is_structural {
1899 let arrow_schema = ArrowSchema::from(schema);
1900 let structural_decoder = StructuralStructDecoder::new(
1901 arrow_schema.fields,
1902 should_validate,
1903 true,
1904 );
1905 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1906 } else {
1907 let arrow_schema = ArrowSchema::from(schema);
1908 let root_fields = arrow_schema.fields;
1909
1910 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1911 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1912 }
1913}
1914
1915pub fn create_decode_iterator(
1919 schema: &Schema,
1920 num_rows: u64,
1921 batch_size: u32,
1922 should_validate: bool,
1923 is_structural: bool,
1924 messages: VecDeque<Result<DecoderMessage>>,
1925) -> Box<dyn RecordBatchReader + Send + 'static> {
1926 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1927 let root_fields = arrow_schema.fields.clone();
1928 if is_structural {
1929 let simple_struct_decoder =
1930 StructuralStructDecoder::new(root_fields, should_validate, true);
1931 Box::new(BatchDecodeIterator::new(
1932 messages,
1933 batch_size,
1934 num_rows,
1935 simple_struct_decoder,
1936 arrow_schema,
1937 ))
1938 } else {
1939 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1940 Box::new(BatchDecodeIterator::new(
1941 messages,
1942 batch_size,
1943 num_rows,
1944 root_decoder,
1945 arrow_schema,
1946 ))
1947 }
1948}
1949
1950fn create_scheduler_decoder(
1951 column_infos: Vec<Arc<ColumnInfo>>,
1952 requested_rows: RequestedRows,
1953 filter: FilterExpression,
1954 column_indices: Vec<u32>,
1955 target_schema: Arc<Schema>,
1956 config: SchedulerDecoderConfig,
1957) -> Result<BoxStream<'static, ReadBatchTask>> {
1958 let num_rows = requested_rows.num_rows();
1959
1960 let is_structural = column_infos[0].is_structural();
1961
1962 let (tx, rx) = mpsc::unbounded_channel();
1963
1964 let decode_stream = create_decode_stream(
1965 &target_schema,
1966 num_rows,
1967 config.batch_size,
1968 is_structural,
1969 config.should_validate,
1970 rx,
1971 );
1972
1973 let scheduler_handle = tokio::task::spawn(async move {
1974 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1975 target_schema.as_ref(),
1976 &column_indices,
1977 &column_infos,
1978 &vec![],
1979 num_rows,
1980 config.decoder_plugins,
1981 config.io.clone(),
1982 config.cache,
1983 &filter,
1984 )
1985 .await
1986 {
1987 Ok(scheduler) => scheduler,
1988 Err(e) => {
1989 let _ = tx.send(Err(e));
1990 return;
1991 }
1992 };
1993
1994 match requested_rows {
1995 RequestedRows::Ranges(ranges) => {
1996 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1997 }
1998 RequestedRows::Indices(indices) => {
1999 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2000 }
2001 }
2002 });
2003
2004 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
2005}
2006
2007pub fn schedule_and_decode(
2013 column_infos: Vec<Arc<ColumnInfo>>,
2014 requested_rows: RequestedRows,
2015 filter: FilterExpression,
2016 column_indices: Vec<u32>,
2017 target_schema: Arc<Schema>,
2018 config: SchedulerDecoderConfig,
2019) -> BoxStream<'static, ReadBatchTask> {
2020 if requested_rows.num_rows() == 0 {
2021 return stream::empty().boxed();
2022 }
2023 match create_scheduler_decoder(
2027 column_infos,
2028 requested_rows,
2029 filter,
2030 column_indices,
2031 target_schema,
2032 config,
2033 ) {
2034 Ok(stream) => stream,
2036 Err(e) => stream::once(std::future::ready(ReadBatchTask {
2037 num_rows: 0,
2038 task: std::future::ready(Err(e)).boxed(),
2039 }))
2040 .boxed(),
2041 }
2042}
2043
2044lazy_static::lazy_static! {
2045 pub static ref WAITER_RT: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
2046 .build()
2047 .unwrap();
2048}
2049
2050pub fn schedule_and_decode_blocking(
2065 column_infos: Vec<Arc<ColumnInfo>>,
2066 requested_rows: RequestedRows,
2067 filter: FilterExpression,
2068 column_indices: Vec<u32>,
2069 target_schema: Arc<Schema>,
2070 config: SchedulerDecoderConfig,
2071) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
2072 if requested_rows.num_rows() == 0 {
2073 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
2074 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
2075 }
2076
2077 let num_rows = requested_rows.num_rows();
2078 let is_structural = column_infos[0].is_structural();
2079
2080 let (tx, mut rx) = mpsc::unbounded_channel();
2081
2082 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2085 target_schema.as_ref(),
2086 &column_indices,
2087 &column_infos,
2088 &vec![],
2089 num_rows,
2090 config.decoder_plugins,
2091 config.io.clone(),
2092 config.cache,
2093 &filter,
2094 ))?;
2095
2096 match requested_rows {
2098 RequestedRows::Ranges(ranges) => {
2099 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2100 }
2101 RequestedRows::Indices(indices) => {
2102 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2103 }
2104 }
2105
2106 let mut messages = Vec::new();
2108 while rx
2109 .recv_many(&mut messages, usize::MAX)
2110 .now_or_never()
2111 .unwrap()
2112 != 0
2113 {}
2114
2115 let decode_iterator = create_decode_iterator(
2117 &target_schema,
2118 num_rows,
2119 config.batch_size,
2120 config.should_validate,
2121 is_structural,
2122 messages.into(),
2123 );
2124
2125 Ok(decode_iterator)
2126}
2127
2128pub trait PrimitivePageDecoder: Send + Sync {
2140 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2172}
2173
2174pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2183 fn schedule_ranges(
2195 &self,
2196 ranges: &[Range<u64>],
2197 scheduler: &Arc<dyn EncodingsIo>,
2198 top_level_row: u64,
2199 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2200}
2201
2202pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2204 fn advance(&mut self, num_rows: u64);
2205 fn current_priority(&self) -> u64;
2206 fn box_clone(&self) -> Box<dyn PriorityRange>;
2207}
2208
2209#[derive(Debug)]
2212pub struct SimplePriorityRange {
2213 priority: u64,
2214}
2215
2216impl SimplePriorityRange {
2217 fn new(priority: u64) -> Self {
2218 Self { priority }
2219 }
2220}
2221
2222impl PriorityRange for SimplePriorityRange {
2223 fn advance(&mut self, num_rows: u64) {
2224 self.priority += num_rows;
2225 }
2226
2227 fn current_priority(&self) -> u64 {
2228 self.priority
2229 }
2230
2231 fn box_clone(&self) -> Box<dyn PriorityRange> {
2232 Box::new(Self {
2233 priority: self.priority,
2234 })
2235 }
2236}
2237
2238pub struct ListPriorityRange {
2251 base: Box<dyn PriorityRange>,
2252 offsets: Arc<[u64]>,
2253 cur_index_into_offsets: usize,
2254 cur_position: u64,
2255}
2256
2257impl ListPriorityRange {
2258 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2259 Self {
2260 base,
2261 offsets,
2262 cur_index_into_offsets: 0,
2263 cur_position: 0,
2264 }
2265 }
2266}
2267
2268impl std::fmt::Debug for ListPriorityRange {
2269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2270 f.debug_struct("ListPriorityRange")
2271 .field("base", &self.base)
2272 .field("offsets.len()", &self.offsets.len())
2273 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2274 .field("cur_position", &self.cur_position)
2275 .finish()
2276 }
2277}
2278
2279impl PriorityRange for ListPriorityRange {
2280 fn advance(&mut self, num_rows: u64) {
2281 self.cur_position += num_rows;
2284 let mut idx_into_offsets = self.cur_index_into_offsets;
2285 while idx_into_offsets + 1 < self.offsets.len()
2286 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2287 {
2288 idx_into_offsets += 1;
2289 }
2290 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2291 self.cur_index_into_offsets = idx_into_offsets;
2292 self.base.advance(base_rows_advanced as u64);
2293 }
2294
2295 fn current_priority(&self) -> u64 {
2296 self.base.current_priority()
2297 }
2298
2299 fn box_clone(&self) -> Box<dyn PriorityRange> {
2300 Box::new(Self {
2301 base: self.base.box_clone(),
2302 offsets: self.offsets.clone(),
2303 cur_index_into_offsets: self.cur_index_into_offsets,
2304 cur_position: self.cur_position,
2305 })
2306 }
2307}
2308
2309pub struct SchedulerContext {
2311 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2312 io: Arc<dyn EncodingsIo>,
2313 cache: Arc<FileMetadataCache>,
2314 name: String,
2315 path: Vec<u32>,
2316 path_names: Vec<String>,
2317}
2318
2319pub struct ScopedSchedulerContext<'a> {
2320 pub context: &'a mut SchedulerContext,
2321}
2322
2323impl<'a> ScopedSchedulerContext<'a> {
2324 pub fn pop(self) -> &'a mut SchedulerContext {
2325 self.context.pop();
2326 self.context
2327 }
2328}
2329
2330impl SchedulerContext {
2331 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<FileMetadataCache>) -> Self {
2332 Self {
2333 io,
2334 cache,
2335 recv: None,
2336 name: "".to_string(),
2337 path: Vec::new(),
2338 path_names: Vec::new(),
2339 }
2340 }
2341
2342 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2343 &self.io
2344 }
2345
2346 pub fn cache(&self) -> &Arc<FileMetadataCache> {
2347 &self.cache
2348 }
2349
2350 pub fn push(&mut self, name: &str, index: u32) -> ScopedSchedulerContext {
2351 self.path.push(index);
2352 self.path_names.push(name.to_string());
2353 ScopedSchedulerContext { context: self }
2354 }
2355
2356 pub fn pop(&mut self) {
2357 self.path.pop();
2358 self.path_names.pop();
2359 }
2360
2361 pub fn path_name(&self) -> String {
2362 let path = self.path_names.join("/");
2363 if self.recv.is_some() {
2364 format!("TEMP({}){}", self.name, path)
2365 } else {
2366 format!("ROOT{}", path)
2367 }
2368 }
2369
2370 pub fn current_path(&self) -> VecDeque<u32> {
2371 VecDeque::from_iter(self.path.iter().copied())
2372 }
2373
2374 pub fn locate_decoder(&mut self, decoder: Box<dyn LogicalPageDecoder>) -> DecoderReady {
2375 trace!(
2376 "Scheduling decoder of type {:?} for {:?}",
2377 decoder.data_type(),
2378 self.path,
2379 );
2380 DecoderReady {
2381 decoder,
2382 path: self.current_path(),
2383 }
2384 }
2385}
2386
2387pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2388
2389impl std::fmt::Debug for UnloadedPage {
2390 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2391 f.debug_struct("UnloadedPage").finish()
2392 }
2393}
2394
2395#[derive(Debug)]
2396pub struct ScheduledScanLine {
2397 pub rows_scheduled: u64,
2398 pub decoders: Vec<MessageType>,
2399}
2400
2401pub trait SchedulingJob: std::fmt::Debug {
2402 fn schedule_next(
2403 &mut self,
2404 context: &mut SchedulerContext,
2405 priority: &dyn PriorityRange,
2406 ) -> Result<ScheduledScanLine>;
2407
2408 fn num_rows(&self) -> u64;
2409}
2410
2411pub trait StructuralSchedulingJob: std::fmt::Debug {
2412 fn schedule_next(
2413 &mut self,
2414 context: &mut SchedulerContext,
2415 ) -> Result<Option<ScheduledScanLine>>;
2416}
2417
2418pub struct FilterExpression(pub Bytes);
2426
2427impl FilterExpression {
2428 pub fn no_filter() -> Self {
2433 Self(Bytes::new())
2434 }
2435
2436 pub fn is_noop(&self) -> bool {
2438 self.0.is_empty()
2439 }
2440}
2441
2442pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
2467 fn initialize<'a>(
2469 &'a self,
2470 filter: &'a FilterExpression,
2471 context: &'a SchedulerContext,
2472 ) -> BoxFuture<'a, Result<()>>;
2473 fn schedule_ranges<'a>(
2478 &'a self,
2479 ranges: &[Range<u64>],
2480 filter: &FilterExpression,
2481 ) -> Result<Box<dyn SchedulingJob + 'a>>;
2482 fn num_rows(&self) -> u64;
2484}
2485
2486pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2487 fn initialize<'a>(
2488 &'a mut self,
2489 filter: &'a FilterExpression,
2490 context: &'a SchedulerContext,
2491 ) -> BoxFuture<'a, Result<()>>;
2492 fn schedule_ranges<'a>(
2493 &'a self,
2494 ranges: &[Range<u64>],
2495 filter: &FilterExpression,
2496 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2497}
2498
2499pub trait DecodeArrayTask: Send {
2501 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2503}
2504
2505impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2506 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2507 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2508 }
2509}
2510
2511pub struct NextDecodeTask {
2516 pub task: Box<dyn DecodeArrayTask>,
2518 pub num_rows: u64,
2520}
2521
2522impl NextDecodeTask {
2523 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2528 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2529 let struct_arr = self.task.decode();
2530 match struct_arr {
2531 Ok(struct_arr) => {
2532 let batch = RecordBatch::from(struct_arr.as_struct());
2533 let size_bytes = batch.get_array_memory_size() as u64;
2534 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2535 emitted_batch_size_warning.call_once(|| {
2536 let size_mb = size_bytes / 1024 / 1024;
2537 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2538 });
2539 }
2540 Ok(batch)
2541 }
2542 Err(e) => {
2543 let e = Error::Internal {
2544 message: format!("Error decoding batch: {}", e),
2545 location: location!(),
2546 };
2547 Err(e)
2548 }
2549 }
2550 }
2551}
2552
2553#[derive(Debug)]
2554pub struct DecoderReady {
2555 pub decoder: Box<dyn LogicalPageDecoder>,
2557 pub path: VecDeque<u32>,
2576}
2577
2578#[derive(Debug)]
2582pub enum MessageType {
2583 DecoderReady(DecoderReady),
2588 UnloadedPage(UnloadedPage),
2592}
2593
2594impl MessageType {
2595 pub fn into_legacy(self) -> DecoderReady {
2596 match self {
2597 Self::DecoderReady(decoder) => decoder,
2598 Self::UnloadedPage(_) => {
2599 panic!("Expected DecoderReady but got UnloadedPage")
2600 }
2601 }
2602 }
2603
2604 pub fn into_structural(self) -> UnloadedPage {
2605 match self {
2606 Self::UnloadedPage(unloaded) => unloaded,
2607 Self::DecoderReady(_) => {
2608 panic!("Expected UnloadedPage but got DecoderReady")
2609 }
2610 }
2611 }
2612}
2613
2614pub struct DecoderMessage {
2615 pub scheduled_so_far: u64,
2616 pub decoders: Vec<MessageType>,
2617}
2618
2619pub struct DecoderContext {
2620 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2621}
2622
2623impl DecoderContext {
2624 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2625 Self { source }
2626 }
2627}
2628
2629pub trait LogicalPageDecoder: std::fmt::Debug + Send {
2638 fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
2643 Err(Error::Internal {
2644 message: format!(
2645 "The decoder {:?} does not expect children but received a child",
2646 self
2647 ),
2648 location: location!(),
2649 })
2650 }
2651 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>>;
2653 fn rows_loaded(&self) -> u64;
2655 fn rows_unloaded(&self) -> u64 {
2657 self.num_rows() - self.rows_loaded()
2658 }
2659 fn num_rows(&self) -> u64;
2661 fn rows_drained(&self) -> u64;
2663 fn rows_left(&self) -> u64 {
2665 self.num_rows() - self.rows_drained()
2666 }
2667 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
2669 fn data_type(&self) -> &DataType;
2671}
2672
2673pub struct DecodedPage {
2674 pub data: DataBlock,
2675 pub repdef: RepDefUnraveler,
2676}
2677
2678pub trait DecodePageTask: Send + std::fmt::Debug {
2679 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2681}
2682
2683pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2684 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2685 fn num_rows(&self) -> u64;
2686}
2687
2688#[derive(Debug)]
2689pub struct LoadedPage {
2690 pub decoder: Box<dyn StructuralPageDecoder>,
2692 pub path: VecDeque<u32>,
2711 pub page_index: usize,
2712}
2713
2714pub struct DecodedArray {
2715 pub array: ArrayRef,
2716 pub repdef: CompositeRepDefUnraveler,
2717}
2718
2719pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2720 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2721}
2722
2723pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2724 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2729 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2731 fn data_type(&self) -> &DataType;
2733}
2734
2735#[derive(Debug, Default)]
2736pub struct DecoderPlugins {}
2737
2738pub async fn decode_batch(
2740 batch: &EncodedBatch,
2741 filter: &FilterExpression,
2742 decoder_plugins: Arc<DecoderPlugins>,
2743 should_validate: bool,
2744 version: LanceFileVersion,
2745 cache: Option<Arc<FileMetadataCache>>,
2746) -> Result<RecordBatch> {
2747 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2752 let cache = cache.unwrap_or_else(|| {
2753 Arc::new(FileMetadataCache::with_capacity(
2754 128 * 1024 * 1024,
2755 CapacityMode::Bytes,
2756 ))
2757 });
2758 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2759 batch.schema.as_ref(),
2760 &batch.top_level_columns,
2761 &batch.page_table,
2762 &vec![],
2763 batch.num_rows,
2764 decoder_plugins,
2765 io_scheduler.clone(),
2766 cache,
2767 filter,
2768 )
2769 .await?;
2770 let (tx, rx) = unbounded_channel();
2771 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2772 let is_structural = version >= LanceFileVersion::V2_1;
2773 let mut decode_stream = create_decode_stream(
2774 &batch.schema,
2775 batch.num_rows,
2776 batch.num_rows as u32,
2777 is_structural,
2778 should_validate,
2779 rx,
2780 );
2781 decode_stream.next().await.unwrap().task.await
2782}
2783
2784#[cfg(test)]
2785mod tests {
2787 use super::*;
2788
2789 #[test]
2790 fn test_coalesce_indices_to_ranges_with_single_index() {
2791 let indices = vec![1];
2792 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2793 assert_eq!(ranges, vec![1..2]);
2794 }
2795
2796 #[test]
2797 fn test_coalesce_indices_to_ranges() {
2798 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2799 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2800 assert_eq!(ranges, vec![1..10]);
2801 }
2802
2803 #[test]
2804 fn test_coalesce_indices_to_ranges_with_gaps() {
2805 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2806 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2807 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2808 }
2809}