1use std::collections::VecDeque;
216use std::sync::{LazyLock, Once};
217use std::{ops::Range, sync::Arc};
218
219use arrow_array::cast::AsArray;
220use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
221use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
222use bytes::Bytes;
223use futures::future::{maybe_done, BoxFuture, MaybeDone};
224use futures::stream::{self, BoxStream};
225use futures::{FutureExt, StreamExt};
226use lance_arrow::DataTypeExt;
227use lance_core::cache::LanceCache;
228use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
229use log::{debug, trace, warn};
230use snafu::location;
231use tokio::sync::mpsc::error::SendError;
232use tokio::sync::mpsc::{self, unbounded_channel};
233
234use lance_core::{ArrowResult, Error, Result};
235use tracing::instrument;
236
237use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
238use crate::data::DataBlock;
239use crate::encoder::EncodedBatch;
240use crate::encodings::logical::list::StructuralListScheduler;
241use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
242use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
243use crate::format::pb::{self, column_encoding};
244use crate::format::pb21;
245use crate::previous::decoder::LogicalPageDecoder;
246use crate::previous::encodings::logical::list::OffsetPageInfo;
247use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
248use crate::previous::encodings::logical::{
249 binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
250 primitive::PrimitiveFieldScheduler,
251};
252use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
253use crate::version::LanceFileVersion;
254use crate::{BufferScheduler, EncodingsIo};
255
256const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
258
259#[derive(Debug)]
266pub enum PageEncoding {
267 Legacy(pb::ArrayEncoding),
268 Structural(pb21::PageLayout),
269}
270
271impl PageEncoding {
272 pub fn as_legacy(&self) -> &pb::ArrayEncoding {
273 match self {
274 Self::Legacy(enc) => enc,
275 Self::Structural(_) => panic!("Expected a legacy encoding"),
276 }
277 }
278
279 pub fn as_structural(&self) -> &pb21::PageLayout {
280 match self {
281 Self::Structural(enc) => enc,
282 Self::Legacy(_) => panic!("Expected a structural encoding"),
283 }
284 }
285
286 pub fn is_structural(&self) -> bool {
287 matches!(self, Self::Structural(_))
288 }
289}
290
291#[derive(Debug)]
295pub struct PageInfo {
296 pub num_rows: u64,
298 pub priority: u64,
302 pub encoding: PageEncoding,
304 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
306}
307
308#[derive(Debug, Clone)]
312pub struct ColumnInfo {
313 pub index: u32,
315 pub page_infos: Arc<[PageInfo]>,
317 pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
319 pub encoding: pb::ColumnEncoding,
320}
321
322impl ColumnInfo {
323 pub fn new(
325 index: u32,
326 page_infos: Arc<[PageInfo]>,
327 buffer_offsets_and_sizes: Vec<(u64, u64)>,
328 encoding: pb::ColumnEncoding,
329 ) -> Self {
330 Self {
331 index,
332 page_infos,
333 buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
334 encoding,
335 }
336 }
337
338 pub fn is_structural(&self) -> bool {
339 self.page_infos
340 .first()
342 .map(|page| page.encoding.is_structural())
343 .unwrap_or(false)
344 }
345}
346
347enum RootScheduler {
348 Structural(Box<dyn StructuralFieldScheduler>),
349 Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
350}
351
352impl RootScheduler {
353 fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
354 match self {
355 Self::Structural(_) => panic!("Expected a legacy scheduler"),
356 Self::Legacy(s) => s,
357 }
358 }
359
360 fn as_structural(&self) -> &dyn StructuralFieldScheduler {
361 match self {
362 Self::Structural(s) => s.as_ref(),
363 Self::Legacy(_) => panic!("Expected a structural scheduler"),
364 }
365 }
366}
367
368pub struct DecodeBatchScheduler {
390 root_scheduler: RootScheduler,
391 pub root_fields: Fields,
392 cache: Arc<LanceCache>,
393}
394
395pub struct ColumnInfoIter<'a> {
396 column_infos: Vec<Arc<ColumnInfo>>,
397 column_indices: &'a [u32],
398 column_info_pos: usize,
399 column_indices_pos: usize,
400}
401
402impl<'a> ColumnInfoIter<'a> {
403 pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
404 let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
405 Self {
406 column_infos,
407 column_indices,
408 column_info_pos: initial_pos,
409 column_indices_pos: 0,
410 }
411 }
412
413 pub fn peek(&self) -> &Arc<ColumnInfo> {
414 &self.column_infos[self.column_info_pos]
415 }
416
417 pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
418 let column_info = self.column_infos[self.column_info_pos].clone();
419 let transformed = transform(column_info);
420 self.column_infos[self.column_info_pos] = transformed;
421 }
422
423 pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
424 self.next().ok_or_else(|| {
425 Error::invalid_input(
426 "there were more fields in the schema than provided column indices / infos",
427 location!(),
428 )
429 })
430 }
431
432 fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
433 if self.column_info_pos < self.column_infos.len() {
434 let info = &self.column_infos[self.column_info_pos];
435 self.column_info_pos += 1;
436 Some(info)
437 } else {
438 None
439 }
440 }
441
442 pub(crate) fn next_top_level(&mut self) {
443 self.column_indices_pos += 1;
444 if self.column_indices_pos < self.column_indices.len() {
445 self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
446 } else {
447 self.column_info_pos = self.column_infos.len();
448 }
449 }
450}
451
452#[derive(Clone, Copy, Debug)]
454pub struct FileBuffers<'a> {
455 pub positions_and_sizes: &'a [(u64, u64)],
456}
457
458#[derive(Clone, Copy, Debug)]
460pub struct ColumnBuffers<'a, 'b> {
461 pub file_buffers: FileBuffers<'a>,
462 pub positions_and_sizes: &'b [(u64, u64)],
463}
464
465#[derive(Clone, Copy, Debug)]
467pub struct PageBuffers<'a, 'b, 'c> {
468 pub column_buffers: ColumnBuffers<'a, 'b>,
469 pub positions_and_sizes: &'c [(u64, u64)],
470}
471
472#[derive(Debug)]
474pub struct CoreFieldDecoderStrategy {
475 pub validate_data: bool,
476 pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
477 pub cache_repetition_index: bool,
478}
479
480impl Default for CoreFieldDecoderStrategy {
481 fn default() -> Self {
482 Self {
483 validate_data: false,
484 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
485 cache_repetition_index: false,
486 }
487 }
488}
489
490impl CoreFieldDecoderStrategy {
491 pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
493 self.cache_repetition_index = cache_repetition_index;
494 self
495 }
496
497 pub fn from_decoder_config(config: &DecoderConfig) -> Self {
499 Self {
500 validate_data: config.validate_on_decode,
501 decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
502 cache_repetition_index: config.cache_repetition_index,
503 }
504 }
505
506 fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
509 let column_encoding = column_info
510 .encoding
511 .column_encoding
512 .as_ref()
513 .ok_or_else(|| {
514 Error::invalid_input(
515 format!(
516 "the column at index {} was missing a ColumnEncoding",
517 column_info.index
518 ),
519 location!(),
520 )
521 })?;
522 if matches!(
523 column_encoding,
524 pb::column_encoding::ColumnEncoding::Values(_)
525 ) {
526 Ok(())
527 } else {
528 Err(Error::invalid_input(format!("the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it", column_info.index, field_name, column_encoding), location!()))
529 }
530 }
531
532 fn is_primitive(data_type: &DataType) -> bool {
533 if data_type.is_primitive() {
534 true
535 } else {
536 match data_type {
537 DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
539 DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
540 _ => false,
541 }
542 }
543 }
544
545 fn create_primitive_scheduler(
546 &self,
547 field: &Field,
548 column: &ColumnInfo,
549 buffers: FileBuffers,
550 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
551 Self::ensure_values_encoded(column, &field.name)?;
552 let column_buffers = ColumnBuffers {
554 file_buffers: buffers,
555 positions_and_sizes: &column.buffer_offsets_and_sizes,
556 };
557 Ok(Box::new(PrimitiveFieldScheduler::new(
558 column.index,
559 field.data_type(),
560 column.page_infos.clone(),
561 column_buffers,
562 self.validate_data,
563 )))
564 }
565
566 fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
568 Self::ensure_values_encoded(column_info, field_name)?;
569 if column_info.page_infos.len() != 1 {
570 return Err(Error::InvalidInput { source: format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into(), location: location!() });
571 }
572 let encoding = &column_info.page_infos[0].encoding;
573 match encoding.as_legacy().array_encoding.as_ref().unwrap() {
574 pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
575 _ => Err(Error::InvalidInput { source: format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into(), location: location!() }),
576 }
577 }
578
579 fn check_packed_struct(column_info: &ColumnInfo) -> bool {
580 let encoding = &column_info.page_infos[0].encoding;
581 matches!(
582 encoding.as_legacy().array_encoding.as_ref().unwrap(),
583 pb::array_encoding::ArrayEncoding::PackedStruct(_)
584 )
585 }
586
587 fn create_list_scheduler(
588 &self,
589 list_field: &Field,
590 column_infos: &mut ColumnInfoIter,
591 buffers: FileBuffers,
592 offsets_column: &ColumnInfo,
593 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
594 Self::ensure_values_encoded(offsets_column, &list_field.name)?;
595 let offsets_column_buffers = ColumnBuffers {
596 file_buffers: buffers,
597 positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
598 };
599 let items_scheduler =
600 self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
601
602 let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
603 .page_infos
604 .iter()
605 .filter(|offsets_page| offsets_page.num_rows > 0)
606 .map(|offsets_page| {
607 if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
608 &offsets_page.encoding.as_legacy().array_encoding
609 {
610 let inner = PageInfo {
611 buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
612 encoding: PageEncoding::Legacy(
613 list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
614 ),
615 num_rows: offsets_page.num_rows,
616 priority: 0,
617 };
618 (
619 inner,
620 OffsetPageInfo {
621 offsets_in_page: offsets_page.num_rows,
622 null_offset_adjustment: list_encoding.null_offset_adjustment,
623 num_items_referenced_by_page: list_encoding.num_items,
624 },
625 )
626 } else {
627 panic!("Expected a list column");
629 }
630 })
631 .unzip();
632 let inner = Arc::new(PrimitiveFieldScheduler::new(
633 offsets_column.index,
634 DataType::UInt64,
635 Arc::from(inner_infos.into_boxed_slice()),
636 offsets_column_buffers,
637 self.validate_data,
638 )) as Arc<dyn crate::previous::decoder::FieldScheduler>;
639 let items_field = match list_field.data_type() {
640 DataType::List(inner) => inner,
641 DataType::LargeList(inner) => inner,
642 _ => unreachable!(),
643 };
644 let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
645 DataType::Int32
646 } else {
647 DataType::Int64
648 };
649 Ok(Box::new(ListFieldScheduler::new(
650 inner,
651 items_scheduler.into(),
652 items_field,
653 offset_type,
654 null_offset_adjustments,
655 )))
656 }
657
658 fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
659 if let column_encoding::ColumnEncoding::Blob(blob) =
660 column_info.encoding.column_encoding.as_ref().unwrap()
661 {
662 let mut column_info = column_info.clone();
663 column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
664 Some(column_info)
665 } else {
666 None
667 }
668 }
669
670 fn create_structural_field_scheduler(
671 &self,
672 field: &Field,
673 column_infos: &mut ColumnInfoIter,
674 ) -> Result<Box<dyn StructuralFieldScheduler>> {
675 let data_type = field.data_type();
676 if Self::is_primitive(&data_type) {
677 let column_info = column_infos.expect_next()?;
678 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
679 column_info.as_ref(),
680 self.decompressor_strategy.as_ref(),
681 self.cache_repetition_index,
682 )?);
683
684 column_infos.next_top_level();
686
687 return Ok(scheduler);
688 }
689 match &data_type {
690 DataType::Struct(fields) => {
691 if field.is_packed_struct() {
692 let column_info = column_infos.expect_next()?;
693 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
694 column_info.as_ref(),
695 self.decompressor_strategy.as_ref(),
696 self.cache_repetition_index,
697 )?);
698
699 column_infos.next_top_level();
701
702 return Ok(scheduler);
703 }
704 let mut child_schedulers = Vec::with_capacity(field.children.len());
705 for field in field.children.iter() {
706 let field_scheduler =
707 self.create_structural_field_scheduler(field, column_infos)?;
708 child_schedulers.push(field_scheduler);
709 }
710
711 let fields = fields.clone();
712 Ok(
713 Box::new(StructuralStructScheduler::new(child_schedulers, fields))
714 as Box<dyn StructuralFieldScheduler>,
715 )
716 }
717 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
718 let column_info = column_infos.expect_next()?;
719 let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
720 column_info.as_ref(),
721 self.decompressor_strategy.as_ref(),
722 self.cache_repetition_index,
723 )?);
724 column_infos.next_top_level();
725 Ok(scheduler)
726 }
727 DataType::List(_) | DataType::LargeList(_) => {
728 let child = field
729 .children
730 .first()
731 .expect("List field must have a child");
732 let child_scheduler =
733 self.create_structural_field_scheduler(child, column_infos)?;
734 Ok(Box::new(StructuralListScheduler::new(child_scheduler))
735 as Box<dyn StructuralFieldScheduler>)
736 }
737 _ => todo!(),
738 }
739 }
740
741 fn create_legacy_field_scheduler(
742 &self,
743 field: &Field,
744 column_infos: &mut ColumnInfoIter,
745 buffers: FileBuffers,
746 ) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
747 let data_type = field.data_type();
748 if Self::is_primitive(&data_type) {
749 let column_info = column_infos.expect_next()?;
750 let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
751 return Ok(scheduler);
752 } else if data_type.is_binary_like() {
753 let column_info = column_infos.next().unwrap().clone();
754 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
756 let desc_scheduler =
757 self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
758 let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
759 return Ok(blob_scheduler);
760 }
761 if let Some(page_info) = column_info.page_infos.first() {
762 if matches!(
763 page_info.encoding.as_legacy(),
764 pb::ArrayEncoding {
765 array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
766 }
767 ) {
768 let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
769 DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
770 } else {
771 DataType::LargeList(Arc::new(ArrowField::new(
772 "item",
773 DataType::UInt8,
774 false,
775 )))
776 };
777 let list_field = Field::try_from(ArrowField::new(
778 field.name.clone(),
779 list_type,
780 field.nullable,
781 ))
782 .unwrap();
783 let list_scheduler = self.create_list_scheduler(
784 &list_field,
785 column_infos,
786 buffers,
787 &column_info,
788 )?;
789 let binary_scheduler = Box::new(BinaryFieldScheduler::new(
790 list_scheduler.into(),
791 field.data_type(),
792 ));
793 return Ok(binary_scheduler);
794 } else {
795 let scheduler =
796 self.create_primitive_scheduler(field, &column_info, buffers)?;
797 return Ok(scheduler);
798 }
799 } else {
800 return self.create_primitive_scheduler(field, &column_info, buffers);
801 }
802 }
803 match &data_type {
804 DataType::FixedSizeList(inner, _dimension) => {
805 if Self::is_primitive(inner.data_type()) {
808 let primitive_col = column_infos.expect_next()?;
809 let scheduler =
810 self.create_primitive_scheduler(field, primitive_col, buffers)?;
811 Ok(scheduler)
812 } else {
813 todo!()
814 }
815 }
816 DataType::Dictionary(_key_type, value_type) => {
817 if Self::is_primitive(value_type) || value_type.is_binary_like() {
818 let primitive_col = column_infos.expect_next()?;
819 let scheduler =
820 self.create_primitive_scheduler(field, primitive_col, buffers)?;
821 Ok(scheduler)
822 } else {
823 Err(Error::NotSupported {
824 source: format!(
825 "No way to decode into a dictionary field of type {}",
826 value_type
827 )
828 .into(),
829 location: location!(),
830 })
831 }
832 }
833 DataType::List(_) | DataType::LargeList(_) => {
834 let offsets_column = column_infos.expect_next()?.clone();
835 column_infos.next_top_level();
836 self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
837 }
838 DataType::Struct(fields) => {
839 let column_info = column_infos.expect_next()?;
840
841 if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
843 return self.create_primitive_scheduler(field, &blob_col, buffers);
845 }
846
847 if Self::check_packed_struct(column_info) {
848 self.create_primitive_scheduler(field, column_info, buffers)
850 } else {
851 Self::check_simple_struct(column_info, &field.name).unwrap();
853 let num_rows = column_info
854 .page_infos
855 .iter()
856 .map(|page| page.num_rows)
857 .sum();
858 let mut child_schedulers = Vec::with_capacity(field.children.len());
859 for field in &field.children {
860 column_infos.next_top_level();
861 let field_scheduler =
862 self.create_legacy_field_scheduler(field, column_infos, buffers)?;
863 child_schedulers.push(Arc::from(field_scheduler));
864 }
865
866 let fields = fields.clone();
867 Ok(Box::new(SimpleStructScheduler::new(
868 child_schedulers,
869 fields,
870 num_rows,
871 )))
872 }
873 }
874 _ => todo!(),
876 }
877 }
878}
879
880fn root_column(num_rows: u64) -> ColumnInfo {
882 let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
883 let final_page_num_rows = num_rows % (u32::MAX as u64);
884 let root_pages = (0..num_root_pages)
885 .map(|i| PageInfo {
886 num_rows: if i == num_root_pages - 1 {
887 final_page_num_rows
888 } else {
889 u64::MAX
890 },
891 encoding: PageEncoding::Legacy(pb::ArrayEncoding {
892 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
893 pb::SimpleStruct {},
894 )),
895 }),
896 priority: 0, buffer_offsets_and_sizes: Arc::new([]),
898 })
899 .collect::<Vec<_>>();
900 ColumnInfo {
901 buffer_offsets_and_sizes: Arc::new([]),
902 encoding: pb::ColumnEncoding {
903 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
904 },
905 index: u32::MAX,
906 page_infos: Arc::from(root_pages),
907 }
908}
909
910pub enum RootDecoder {
911 Structural(StructuralStructDecoder),
912 Legacy(SimpleStructDecoder),
913}
914
915impl RootDecoder {
916 pub fn into_structural(self) -> StructuralStructDecoder {
917 match self {
918 Self::Structural(decoder) => decoder,
919 Self::Legacy(_) => panic!("Expected a structural decoder"),
920 }
921 }
922
923 pub fn into_legacy(self) -> SimpleStructDecoder {
924 match self {
925 Self::Legacy(decoder) => decoder,
926 Self::Structural(_) => panic!("Expected a legacy decoder"),
927 }
928 }
929}
930
931impl DecodeBatchScheduler {
932 #[allow(clippy::too_many_arguments)]
935 pub async fn try_new<'a>(
936 schema: &'a Schema,
937 column_indices: &[u32],
938 column_infos: &[Arc<ColumnInfo>],
939 file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
940 num_rows: u64,
941 _decoder_plugins: Arc<DecoderPlugins>,
942 io: Arc<dyn EncodingsIo>,
943 cache: Arc<LanceCache>,
944 filter: &FilterExpression,
945 decoder_config: &DecoderConfig,
946 ) -> Result<Self> {
947 assert!(num_rows > 0);
948 let buffers = FileBuffers {
949 positions_and_sizes: file_buffer_positions_and_sizes,
950 };
951 let arrow_schema = ArrowSchema::from(schema);
952 let root_fields = arrow_schema.fields().clone();
953 let root_type = DataType::Struct(root_fields.clone());
954 let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
955 root_field.children.clone_from(&schema.fields);
959 root_field
960 .metadata
961 .insert("__lance_decoder_root".to_string(), "true".to_string());
962
963 if column_infos.is_empty() || column_infos[0].is_structural() {
964 let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
965
966 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
967 let mut root_scheduler =
968 strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
969
970 let context = SchedulerContext::new(io, cache.clone());
971 root_scheduler.initialize(filter, &context).await?;
972
973 Ok(Self {
974 root_scheduler: RootScheduler::Structural(root_scheduler),
975 root_fields,
976 cache,
977 })
978 } else {
979 let mut columns = Vec::with_capacity(column_infos.len() + 1);
982 columns.push(Arc::new(root_column(num_rows)));
983 columns.extend(column_infos.iter().cloned());
984
985 let adjusted_column_indices = [0_u32]
986 .into_iter()
987 .chain(column_indices.iter().map(|i| i.saturating_add(1)))
988 .collect::<Vec<_>>();
989 let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
990 let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
991 let root_scheduler =
992 strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
993
994 let context = SchedulerContext::new(io, cache.clone());
995 root_scheduler.initialize(filter, &context).await?;
996
997 Ok(Self {
998 root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
999 root_fields,
1000 cache,
1001 })
1002 }
1003 }
1004
1005 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
1006 pub fn from_scheduler(
1007 root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
1008 root_fields: Fields,
1009 cache: Arc<LanceCache>,
1010 ) -> Self {
1011 Self {
1012 root_scheduler: RootScheduler::Legacy(root_scheduler),
1013 root_fields,
1014 cache,
1015 }
1016 }
1017
1018 fn do_schedule_ranges_structural(
1019 &mut self,
1020 ranges: &[Range<u64>],
1021 filter: &FilterExpression,
1022 io: Arc<dyn EncodingsIo>,
1023 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1024 ) {
1025 let root_scheduler = self.root_scheduler.as_structural();
1026 let mut context = SchedulerContext::new(io, self.cache.clone());
1027 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1028 if let Err(schedule_ranges_err) = maybe_root_job {
1029 schedule_action(Err(schedule_ranges_err));
1030 return;
1031 }
1032 let mut root_job = maybe_root_job.unwrap();
1033 let mut num_rows_scheduled = 0;
1034 loop {
1035 let maybe_next_scan_line = root_job.schedule_next(&mut context);
1036 if let Err(err) = maybe_next_scan_line {
1037 schedule_action(Err(err));
1038 return;
1039 }
1040 let next_scan_line = maybe_next_scan_line.unwrap();
1041 match next_scan_line {
1042 Some(next_scan_line) => {
1043 trace!(
1044 "Scheduled scan line of {} rows and {} decoders",
1045 next_scan_line.rows_scheduled,
1046 next_scan_line.decoders.len()
1047 );
1048 num_rows_scheduled += next_scan_line.rows_scheduled;
1049 if !schedule_action(Ok(DecoderMessage {
1050 scheduled_so_far: num_rows_scheduled,
1051 decoders: next_scan_line.decoders,
1052 })) {
1053 return;
1055 }
1056 }
1057 None => return,
1058 }
1059 }
1060 }
1061
1062 fn do_schedule_ranges_legacy(
1063 &mut self,
1064 ranges: &[Range<u64>],
1065 filter: &FilterExpression,
1066 io: Arc<dyn EncodingsIo>,
1067 mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1068 priority: Option<Box<dyn PriorityRange>>,
1072 ) {
1073 let root_scheduler = self.root_scheduler.as_legacy();
1074 let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1075 trace!(
1076 "Scheduling {} ranges across {}..{} ({} rows){}",
1077 ranges.len(),
1078 ranges.first().unwrap().start,
1079 ranges.last().unwrap().end,
1080 rows_requested,
1081 priority
1082 .as_ref()
1083 .map(|p| format!(" (priority={:?})", p))
1084 .unwrap_or_default()
1085 );
1086
1087 let mut context = SchedulerContext::new(io, self.cache.clone());
1088 let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
1089 if let Err(schedule_ranges_err) = maybe_root_job {
1090 schedule_action(Err(schedule_ranges_err));
1091 return;
1092 }
1093 let mut root_job = maybe_root_job.unwrap();
1094 let mut num_rows_scheduled = 0;
1095 let mut rows_to_schedule = root_job.num_rows();
1096 let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
1097 trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
1098 while rows_to_schedule > 0 {
1099 let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
1100 if let Err(schedule_next_err) = maybe_next_scan_line {
1101 schedule_action(Err(schedule_next_err));
1102 return;
1103 }
1104 let next_scan_line = maybe_next_scan_line.unwrap();
1105 priority.advance(next_scan_line.rows_scheduled);
1106 num_rows_scheduled += next_scan_line.rows_scheduled;
1107 rows_to_schedule -= next_scan_line.rows_scheduled;
1108 trace!(
1109 "Scheduled scan line of {} rows and {} decoders",
1110 next_scan_line.rows_scheduled,
1111 next_scan_line.decoders.len()
1112 );
1113 if !schedule_action(Ok(DecoderMessage {
1114 scheduled_so_far: num_rows_scheduled,
1115 decoders: next_scan_line.decoders,
1116 })) {
1117 return;
1119 }
1120
1121 trace!("Finished scheduling {} ranges", ranges.len());
1122 }
1123 }
1124
1125 fn do_schedule_ranges(
1126 &mut self,
1127 ranges: &[Range<u64>],
1128 filter: &FilterExpression,
1129 io: Arc<dyn EncodingsIo>,
1130 schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
1131 priority: Option<Box<dyn PriorityRange>>,
1135 ) {
1136 match &self.root_scheduler {
1137 RootScheduler::Legacy(_) => {
1138 self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
1139 }
1140 RootScheduler::Structural(_) => {
1141 self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
1142 }
1143 }
1144 }
1145
1146 pub fn schedule_ranges_to_vec(
1149 &mut self,
1150 ranges: &[Range<u64>],
1151 filter: &FilterExpression,
1152 io: Arc<dyn EncodingsIo>,
1153 priority: Option<Box<dyn PriorityRange>>,
1154 ) -> Result<Vec<DecoderMessage>> {
1155 let mut decode_messages = Vec::new();
1156 self.do_schedule_ranges(
1157 ranges,
1158 filter,
1159 io,
1160 |msg| {
1161 decode_messages.push(msg);
1162 true
1163 },
1164 priority,
1165 );
1166 decode_messages.into_iter().collect::<Result<Vec<_>>>()
1167 }
1168
1169 #[instrument(skip_all)]
1179 pub fn schedule_ranges(
1180 &mut self,
1181 ranges: &[Range<u64>],
1182 filter: &FilterExpression,
1183 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1184 scheduler: Arc<dyn EncodingsIo>,
1185 ) {
1186 self.do_schedule_ranges(
1187 ranges,
1188 filter,
1189 scheduler,
1190 |msg| {
1191 match sink.send(msg) {
1192 Ok(_) => true,
1193 Err(SendError { .. }) => {
1194 debug!(
1197 "schedule_ranges aborting early since decoder appears to have been dropped"
1198 );
1199 false
1200 }
1201 }
1202 },
1203 None,
1204 )
1205 }
1206
1207 #[instrument(skip_all)]
1215 pub fn schedule_range(
1216 &mut self,
1217 range: Range<u64>,
1218 filter: &FilterExpression,
1219 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1220 scheduler: Arc<dyn EncodingsIo>,
1221 ) {
1222 self.schedule_ranges(&[range], filter, sink, scheduler)
1223 }
1224
1225 pub fn schedule_take(
1233 &mut self,
1234 indices: &[u64],
1235 filter: &FilterExpression,
1236 sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
1237 scheduler: Arc<dyn EncodingsIo>,
1238 ) {
1239 debug_assert!(indices.windows(2).all(|w| w[0] <= w[1]));
1240 if indices.is_empty() {
1241 return;
1242 }
1243 trace!("Scheduling take of {} rows", indices.len());
1244 let ranges = Self::indices_to_ranges(indices);
1245 self.schedule_ranges(&ranges, filter, sink, scheduler)
1246 }
1247
1248 fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
1250 let mut ranges = Vec::new();
1251 let mut start = indices[0];
1252
1253 for window in indices.windows(2) {
1254 if window[1] != window[0] + 1 {
1255 ranges.push(start..window[0] + 1);
1256 start = window[1];
1257 }
1258 }
1259
1260 ranges.push(start..*indices.last().unwrap() + 1);
1261 ranges
1262 }
1263}
1264
1265pub struct ReadBatchTask {
1266 pub task: BoxFuture<'static, Result<RecordBatch>>,
1267 pub num_rows: u32,
1268}
1269
1270pub struct BatchDecodeStream {
1272 context: DecoderContext,
1273 root_decoder: SimpleStructDecoder,
1274 rows_remaining: u64,
1275 rows_per_batch: u32,
1276 rows_scheduled: u64,
1277 rows_drained: u64,
1278 scheduler_exhausted: bool,
1279 emitted_batch_size_warning: Arc<Once>,
1280}
1281
1282impl BatchDecodeStream {
1283 pub fn new(
1294 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1295 rows_per_batch: u32,
1296 num_rows: u64,
1297 root_decoder: SimpleStructDecoder,
1298 ) -> Self {
1299 Self {
1300 context: DecoderContext::new(scheduled),
1301 root_decoder,
1302 rows_remaining: num_rows,
1303 rows_per_batch,
1304 rows_scheduled: 0,
1305 rows_drained: 0,
1306 scheduler_exhausted: false,
1307 emitted_batch_size_warning: Arc::new(Once::new()),
1308 }
1309 }
1310
1311 fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
1312 if decoder.path.is_empty() {
1313 Ok(())
1315 } else {
1316 self.root_decoder.accept_child(decoder)
1317 }
1318 }
1319
1320 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1321 if self.scheduler_exhausted {
1322 return Ok(self.rows_scheduled);
1323 }
1324 while self.rows_scheduled < scheduled_need {
1325 let next_message = self.context.source.recv().await;
1326 match next_message {
1327 Some(scan_line) => {
1328 let scan_line = scan_line?;
1329 self.rows_scheduled = scan_line.scheduled_so_far;
1330 for message in scan_line.decoders {
1331 self.accept_decoder(message.into_legacy())?;
1332 }
1333 }
1334 None => {
1335 self.scheduler_exhausted = true;
1339 return Ok(self.rows_scheduled);
1340 }
1341 }
1342 }
1343 Ok(scheduled_need)
1344 }
1345
1346 #[instrument(level = "debug", skip_all)]
1347 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1348 trace!(
1349 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1350 self.rows_remaining,
1351 self.rows_drained,
1352 self.rows_scheduled,
1353 );
1354 if self.rows_remaining == 0 {
1355 return Ok(None);
1356 }
1357
1358 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1359 self.rows_remaining -= to_take;
1360
1361 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1362 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1363 if scheduled_need > 0 {
1364 let desired_scheduled = scheduled_need + self.rows_scheduled;
1365 trace!(
1366 "Draining from scheduler (desire at least {} scheduled rows)",
1367 desired_scheduled
1368 );
1369 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1370 if actually_scheduled < desired_scheduled {
1371 let under_scheduled = desired_scheduled - actually_scheduled;
1372 to_take -= under_scheduled;
1373 }
1374 }
1375
1376 if to_take == 0 {
1377 return Ok(None);
1378 }
1379
1380 let loaded_need = self.rows_drained + to_take - 1;
1382 trace!(
1383 "Waiting for I/O (desire at least {} fully loaded rows)",
1384 loaded_need
1385 );
1386 self.root_decoder.wait_for_loaded(loaded_need).await?;
1387
1388 let next_task = self.root_decoder.drain(to_take)?;
1389 self.rows_drained += to_take;
1390 Ok(Some(next_task))
1391 }
1392
1393 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1394 let stream = futures::stream::unfold(self, |mut slf| async move {
1395 let next_task = slf.next_batch_task().await;
1396 let next_task = next_task.transpose().map(|next_task| {
1397 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1398 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1399 let task = async move {
1400 let next_task = next_task?;
1401 next_task.into_batch(emitted_batch_size_warning)
1402 };
1403 (task, num_rows)
1404 });
1405 next_task.map(|(task, num_rows)| {
1406 debug_assert!(num_rows <= u32::MAX as u64);
1408 let next_task = ReadBatchTask {
1409 task: task.boxed(),
1410 num_rows: num_rows as u32,
1411 };
1412 (next_task, slf)
1413 })
1414 });
1415 stream.boxed()
1416 }
1417}
1418
1419enum RootDecoderMessage {
1422 LoadedPage(LoadedPage),
1423 LegacyPage(crate::previous::decoder::DecoderReady),
1424}
1425trait RootDecoderType {
1426 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
1427 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
1428 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
1429}
1430impl RootDecoderType for StructuralStructDecoder {
1431 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1432 let RootDecoderMessage::LoadedPage(loaded_page) = message else {
1433 unreachable!()
1434 };
1435 self.accept_page(loaded_page)
1436 }
1437 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1438 self.drain_batch_task(num_rows)
1439 }
1440 fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
1441 Ok(())
1443 }
1444}
1445impl RootDecoderType for SimpleStructDecoder {
1446 fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
1447 let RootDecoderMessage::LegacyPage(legacy_page) = message else {
1448 unreachable!()
1449 };
1450 self.accept_child(legacy_page)
1451 }
1452 fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
1453 self.drain(num_rows)
1454 }
1455 fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
1456 runtime.block_on(self.wait_for_loaded(loaded_need))
1457 }
1458}
1459
1460struct BatchDecodeIterator<T: RootDecoderType> {
1462 messages: VecDeque<Result<DecoderMessage>>,
1463 root_decoder: T,
1464 rows_remaining: u64,
1465 rows_per_batch: u32,
1466 rows_scheduled: u64,
1467 rows_drained: u64,
1468 emitted_batch_size_warning: Arc<Once>,
1469 wait_for_io_runtime: tokio::runtime::Runtime,
1473 schema: Arc<ArrowSchema>,
1474}
1475
1476impl<T: RootDecoderType> BatchDecodeIterator<T> {
1477 pub fn new(
1479 messages: VecDeque<Result<DecoderMessage>>,
1480 rows_per_batch: u32,
1481 num_rows: u64,
1482 root_decoder: T,
1483 schema: Arc<ArrowSchema>,
1484 ) -> Self {
1485 Self {
1486 messages,
1487 root_decoder,
1488 rows_remaining: num_rows,
1489 rows_per_batch,
1490 rows_scheduled: 0,
1491 rows_drained: 0,
1492 wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
1493 .build()
1494 .unwrap(),
1495 emitted_batch_size_warning: Arc::new(Once::new()),
1496 schema,
1497 }
1498 }
1499
1500 fn wait_for_page(&self, unloaded_page: UnloadedPage) -> Result<LoadedPage> {
1505 match maybe_done(unloaded_page.0) {
1506 MaybeDone::Done(loaded_page) => loaded_page,
1508 MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
1510 MaybeDone::Gone => unreachable!(),
1511 }
1512 }
1513
1514 #[instrument(skip_all)]
1519 fn wait_for_io(&mut self, scheduled_need: u64) -> Result<u64> {
1520 while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
1521 let message = self.messages.pop_front().unwrap()?;
1522 self.rows_scheduled = message.scheduled_so_far;
1523 for decoder_message in message.decoders {
1524 match decoder_message {
1525 MessageType::UnloadedPage(unloaded_page) => {
1526 let loaded_page = self.wait_for_page(unloaded_page)?;
1527 self.root_decoder
1528 .accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
1529 }
1530 MessageType::DecoderReady(decoder_ready) => {
1531 if !decoder_ready.path.is_empty() {
1533 self.root_decoder
1534 .accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
1535 }
1536 }
1537 }
1538 }
1539 }
1540
1541 let loaded_need = self.rows_drained + self.rows_per_batch as u64 - 1;
1542
1543 self.root_decoder
1544 .wait(loaded_need, &self.wait_for_io_runtime)?;
1545 Ok(self.rows_scheduled)
1546 }
1547
1548 #[instrument(level = "debug", skip_all)]
1549 fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
1550 trace!(
1551 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1552 self.rows_remaining,
1553 self.rows_drained,
1554 self.rows_scheduled,
1555 );
1556 if self.rows_remaining == 0 {
1557 return Ok(None);
1558 }
1559
1560 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1561 self.rows_remaining -= to_take;
1562
1563 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1564 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1565 if scheduled_need > 0 {
1566 let desired_scheduled = scheduled_need + self.rows_scheduled;
1567 trace!(
1568 "Draining from scheduler (desire at least {} scheduled rows)",
1569 desired_scheduled
1570 );
1571 let actually_scheduled = self.wait_for_io(desired_scheduled)?;
1572 if actually_scheduled < desired_scheduled {
1573 let under_scheduled = desired_scheduled - actually_scheduled;
1574 to_take -= under_scheduled;
1575 }
1576 }
1577
1578 if to_take == 0 {
1579 return Ok(None);
1580 }
1581
1582 let next_task = self.root_decoder.drain_batch(to_take)?;
1583
1584 self.rows_drained += to_take;
1585
1586 let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
1587
1588 Ok(Some(batch))
1589 }
1590}
1591
1592impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
1593 type Item = ArrowResult<RecordBatch>;
1594
1595 fn next(&mut self) -> Option<Self::Item> {
1596 self.next_batch_task()
1597 .transpose()
1598 .map(|r| r.map_err(ArrowError::from))
1599 }
1600}
1601
1602impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
1603 fn schema(&self) -> Arc<ArrowSchema> {
1604 self.schema.clone()
1605 }
1606}
1607
1608pub struct StructuralBatchDecodeStream {
1610 context: DecoderContext,
1611 root_decoder: StructuralStructDecoder,
1612 rows_remaining: u64,
1613 rows_per_batch: u32,
1614 rows_scheduled: u64,
1615 rows_drained: u64,
1616 scheduler_exhausted: bool,
1617 emitted_batch_size_warning: Arc<Once>,
1618}
1619
1620impl StructuralBatchDecodeStream {
1621 pub fn new(
1632 scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1633 rows_per_batch: u32,
1634 num_rows: u64,
1635 root_decoder: StructuralStructDecoder,
1636 ) -> Self {
1637 Self {
1638 context: DecoderContext::new(scheduled),
1639 root_decoder,
1640 rows_remaining: num_rows,
1641 rows_per_batch,
1642 rows_scheduled: 0,
1643 rows_drained: 0,
1644 scheduler_exhausted: false,
1645 emitted_batch_size_warning: Arc::new(Once::new()),
1646 }
1647 }
1648
1649 async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
1650 if self.scheduler_exhausted {
1651 return Ok(self.rows_scheduled);
1652 }
1653 while self.rows_scheduled < scheduled_need {
1654 let next_message = self.context.source.recv().await;
1655 match next_message {
1656 Some(scan_line) => {
1657 let scan_line = scan_line?;
1658 self.rows_scheduled = scan_line.scheduled_so_far;
1659 for message in scan_line.decoders {
1660 let unloaded_page = message.into_structural();
1661 let loaded_page = unloaded_page.0.await?;
1662 self.root_decoder.accept_page(loaded_page)?;
1663 }
1664 }
1665 None => {
1666 self.scheduler_exhausted = true;
1670 return Ok(self.rows_scheduled);
1671 }
1672 }
1673 }
1674 Ok(scheduled_need)
1675 }
1676
1677 #[instrument(level = "debug", skip_all)]
1678 async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
1679 trace!(
1680 "Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
1681 self.rows_remaining,
1682 self.rows_drained,
1683 self.rows_scheduled,
1684 );
1685 if self.rows_remaining == 0 {
1686 return Ok(None);
1687 }
1688
1689 let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
1690 self.rows_remaining -= to_take;
1691
1692 let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
1693 trace!("scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}", scheduled_need, self.rows_drained, to_take, self.rows_scheduled);
1694 if scheduled_need > 0 {
1695 let desired_scheduled = scheduled_need + self.rows_scheduled;
1696 trace!(
1697 "Draining from scheduler (desire at least {} scheduled rows)",
1698 desired_scheduled
1699 );
1700 let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
1701 if actually_scheduled < desired_scheduled {
1702 let under_scheduled = desired_scheduled - actually_scheduled;
1703 to_take -= under_scheduled;
1704 }
1705 }
1706
1707 if to_take == 0 {
1708 return Ok(None);
1709 }
1710
1711 let next_task = self.root_decoder.drain_batch_task(to_take)?;
1712 self.rows_drained += to_take;
1713 Ok(Some(next_task))
1714 }
1715
1716 pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
1717 let stream = futures::stream::unfold(self, |mut slf| async move {
1718 let next_task = slf.next_batch_task().await;
1719 let next_task = next_task.transpose().map(|next_task| {
1720 let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
1721 let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
1722 let task = async move {
1723 let next_task = next_task?;
1724 next_task.into_batch(emitted_batch_size_warning)
1725 };
1726 (task, num_rows)
1727 });
1728 next_task.map(|(task, num_rows)| {
1729 debug_assert!(num_rows <= u32::MAX as u64);
1731 let next_task = ReadBatchTask {
1732 task: task.boxed(),
1733 num_rows: num_rows as u32,
1734 };
1735 (next_task, slf)
1736 })
1737 });
1738 stream.boxed()
1739 }
1740}
1741
1742#[derive(Debug)]
1743pub enum RequestedRows {
1744 Ranges(Vec<Range<u64>>),
1745 Indices(Vec<u64>),
1746}
1747
1748impl RequestedRows {
1749 pub fn num_rows(&self) -> u64 {
1750 match self {
1751 Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
1752 Self::Indices(indices) => indices.len() as u64,
1753 }
1754 }
1755
1756 pub fn trim_empty_ranges(mut self) -> Self {
1757 if let Self::Ranges(ranges) = &mut self {
1758 ranges.retain(|r| !r.is_empty());
1759 }
1760 self
1761 }
1762}
1763
1764#[derive(Debug, Clone, Default)]
1766pub struct DecoderConfig {
1767 pub cache_repetition_index: bool,
1769 pub validate_on_decode: bool,
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct SchedulerDecoderConfig {
1775 pub decoder_plugins: Arc<DecoderPlugins>,
1776 pub batch_size: u32,
1777 pub io: Arc<dyn EncodingsIo>,
1778 pub cache: Arc<LanceCache>,
1779 pub decoder_config: DecoderConfig,
1781}
1782
1783fn check_scheduler_on_drop(
1784 stream: BoxStream<'static, ReadBatchTask>,
1785 scheduler_handle: tokio::task::JoinHandle<()>,
1786) -> BoxStream<'static, ReadBatchTask> {
1787 let mut scheduler_handle = Some(scheduler_handle);
1791 let check_scheduler = stream::unfold((), move |_| {
1792 let handle = scheduler_handle.take();
1793 async move {
1794 if let Some(handle) = handle {
1795 handle.await.unwrap();
1796 }
1797 None
1798 }
1799 });
1800 stream.chain(check_scheduler).boxed()
1801}
1802
1803pub fn create_decode_stream(
1804 schema: &Schema,
1805 num_rows: u64,
1806 batch_size: u32,
1807 is_structural: bool,
1808 should_validate: bool,
1809 rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
1810) -> BoxStream<'static, ReadBatchTask> {
1811 if is_structural {
1812 let arrow_schema = ArrowSchema::from(schema);
1813 let structural_decoder = StructuralStructDecoder::new(
1814 arrow_schema.fields,
1815 should_validate,
1816 true,
1817 );
1818 StructuralBatchDecodeStream::new(rx, batch_size, num_rows, structural_decoder).into_stream()
1819 } else {
1820 let arrow_schema = ArrowSchema::from(schema);
1821 let root_fields = arrow_schema.fields;
1822
1823 let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1824 BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream()
1825 }
1826}
1827
1828pub fn create_decode_iterator(
1832 schema: &Schema,
1833 num_rows: u64,
1834 batch_size: u32,
1835 should_validate: bool,
1836 is_structural: bool,
1837 messages: VecDeque<Result<DecoderMessage>>,
1838) -> Box<dyn RecordBatchReader + Send + 'static> {
1839 let arrow_schema = Arc::new(ArrowSchema::from(schema));
1840 let root_fields = arrow_schema.fields.clone();
1841 if is_structural {
1842 let simple_struct_decoder =
1843 StructuralStructDecoder::new(root_fields, should_validate, true);
1844 Box::new(BatchDecodeIterator::new(
1845 messages,
1846 batch_size,
1847 num_rows,
1848 simple_struct_decoder,
1849 arrow_schema,
1850 ))
1851 } else {
1852 let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
1853 Box::new(BatchDecodeIterator::new(
1854 messages,
1855 batch_size,
1856 num_rows,
1857 root_decoder,
1858 arrow_schema,
1859 ))
1860 }
1861}
1862
1863fn create_scheduler_decoder(
1864 column_infos: Vec<Arc<ColumnInfo>>,
1865 requested_rows: RequestedRows,
1866 filter: FilterExpression,
1867 column_indices: Vec<u32>,
1868 target_schema: Arc<Schema>,
1869 config: SchedulerDecoderConfig,
1870) -> Result<BoxStream<'static, ReadBatchTask>> {
1871 let num_rows = requested_rows.num_rows();
1872
1873 let is_structural = column_infos[0].is_structural();
1874
1875 let (tx, rx) = mpsc::unbounded_channel();
1876
1877 let decode_stream = create_decode_stream(
1878 &target_schema,
1879 num_rows,
1880 config.batch_size,
1881 is_structural,
1882 config.decoder_config.validate_on_decode,
1883 rx,
1884 );
1885
1886 let scheduler_handle = tokio::task::spawn(async move {
1887 let mut decode_scheduler = match DecodeBatchScheduler::try_new(
1888 target_schema.as_ref(),
1889 &column_indices,
1890 &column_infos,
1891 &vec![],
1892 num_rows,
1893 config.decoder_plugins,
1894 config.io.clone(),
1895 config.cache,
1896 &filter,
1897 &config.decoder_config,
1898 )
1899 .await
1900 {
1901 Ok(scheduler) => scheduler,
1902 Err(e) => {
1903 let _ = tx.send(Err(e));
1904 return;
1905 }
1906 };
1907
1908 match requested_rows {
1909 RequestedRows::Ranges(ranges) => {
1910 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
1911 }
1912 RequestedRows::Indices(indices) => {
1913 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
1914 }
1915 }
1916 });
1917
1918 Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
1919}
1920
1921pub fn schedule_and_decode(
1927 column_infos: Vec<Arc<ColumnInfo>>,
1928 requested_rows: RequestedRows,
1929 filter: FilterExpression,
1930 column_indices: Vec<u32>,
1931 target_schema: Arc<Schema>,
1932 config: SchedulerDecoderConfig,
1933) -> BoxStream<'static, ReadBatchTask> {
1934 if requested_rows.num_rows() == 0 {
1935 return stream::empty().boxed();
1936 }
1937
1938 let requested_rows = requested_rows.trim_empty_ranges();
1941
1942 match create_scheduler_decoder(
1946 column_infos,
1947 requested_rows,
1948 filter,
1949 column_indices,
1950 target_schema,
1951 config,
1952 ) {
1953 Ok(stream) => stream,
1955 Err(e) => stream::once(std::future::ready(ReadBatchTask {
1956 num_rows: 0,
1957 task: std::future::ready(Err(e)).boxed(),
1958 }))
1959 .boxed(),
1960 }
1961}
1962
1963pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
1964 tokio::runtime::Builder::new_current_thread()
1965 .build()
1966 .unwrap()
1967});
1968
1969pub fn schedule_and_decode_blocking(
1984 column_infos: Vec<Arc<ColumnInfo>>,
1985 requested_rows: RequestedRows,
1986 filter: FilterExpression,
1987 column_indices: Vec<u32>,
1988 target_schema: Arc<Schema>,
1989 config: SchedulerDecoderConfig,
1990) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
1991 if requested_rows.num_rows() == 0 {
1992 let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
1993 return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
1994 }
1995
1996 let num_rows = requested_rows.num_rows();
1997 let is_structural = column_infos[0].is_structural();
1998
1999 let (tx, mut rx) = mpsc::unbounded_channel();
2000
2001 let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
2004 target_schema.as_ref(),
2005 &column_indices,
2006 &column_infos,
2007 &vec![],
2008 num_rows,
2009 config.decoder_plugins,
2010 config.io.clone(),
2011 config.cache,
2012 &filter,
2013 &config.decoder_config,
2014 ))?;
2015
2016 match requested_rows {
2018 RequestedRows::Ranges(ranges) => {
2019 decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
2020 }
2021 RequestedRows::Indices(indices) => {
2022 decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
2023 }
2024 }
2025
2026 let mut messages = Vec::new();
2028 while rx
2029 .recv_many(&mut messages, usize::MAX)
2030 .now_or_never()
2031 .unwrap()
2032 != 0
2033 {}
2034
2035 let decode_iterator = create_decode_iterator(
2037 &target_schema,
2038 num_rows,
2039 config.batch_size,
2040 config.decoder_config.validate_on_decode,
2041 is_structural,
2042 messages.into(),
2043 );
2044
2045 Ok(decode_iterator)
2046}
2047
2048pub trait PrimitivePageDecoder: Send + Sync {
2060 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
2092}
2093
2094pub trait PageScheduler: Send + Sync + std::fmt::Debug {
2103 fn schedule_ranges(
2115 &self,
2116 ranges: &[Range<u64>],
2117 scheduler: &Arc<dyn EncodingsIo>,
2118 top_level_row: u64,
2119 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
2120}
2121
2122pub trait PriorityRange: std::fmt::Debug + Send + Sync {
2124 fn advance(&mut self, num_rows: u64);
2125 fn current_priority(&self) -> u64;
2126 fn box_clone(&self) -> Box<dyn PriorityRange>;
2127}
2128
2129#[derive(Debug)]
2132pub struct SimplePriorityRange {
2133 priority: u64,
2134}
2135
2136impl SimplePriorityRange {
2137 fn new(priority: u64) -> Self {
2138 Self { priority }
2139 }
2140}
2141
2142impl PriorityRange for SimplePriorityRange {
2143 fn advance(&mut self, num_rows: u64) {
2144 self.priority += num_rows;
2145 }
2146
2147 fn current_priority(&self) -> u64 {
2148 self.priority
2149 }
2150
2151 fn box_clone(&self) -> Box<dyn PriorityRange> {
2152 Box::new(Self {
2153 priority: self.priority,
2154 })
2155 }
2156}
2157
2158pub struct ListPriorityRange {
2171 base: Box<dyn PriorityRange>,
2172 offsets: Arc<[u64]>,
2173 cur_index_into_offsets: usize,
2174 cur_position: u64,
2175}
2176
2177impl ListPriorityRange {
2178 pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
2179 Self {
2180 base,
2181 offsets,
2182 cur_index_into_offsets: 0,
2183 cur_position: 0,
2184 }
2185 }
2186}
2187
2188impl std::fmt::Debug for ListPriorityRange {
2189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2190 f.debug_struct("ListPriorityRange")
2191 .field("base", &self.base)
2192 .field("offsets.len()", &self.offsets.len())
2193 .field("cur_index_into_offsets", &self.cur_index_into_offsets)
2194 .field("cur_position", &self.cur_position)
2195 .finish()
2196 }
2197}
2198
2199impl PriorityRange for ListPriorityRange {
2200 fn advance(&mut self, num_rows: u64) {
2201 self.cur_position += num_rows;
2204 let mut idx_into_offsets = self.cur_index_into_offsets;
2205 while idx_into_offsets + 1 < self.offsets.len()
2206 && self.offsets[idx_into_offsets + 1] <= self.cur_position
2207 {
2208 idx_into_offsets += 1;
2209 }
2210 let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
2211 self.cur_index_into_offsets = idx_into_offsets;
2212 self.base.advance(base_rows_advanced as u64);
2213 }
2214
2215 fn current_priority(&self) -> u64 {
2216 self.base.current_priority()
2217 }
2218
2219 fn box_clone(&self) -> Box<dyn PriorityRange> {
2220 Box::new(Self {
2221 base: self.base.box_clone(),
2222 offsets: self.offsets.clone(),
2223 cur_index_into_offsets: self.cur_index_into_offsets,
2224 cur_position: self.cur_position,
2225 })
2226 }
2227}
2228
2229pub struct SchedulerContext {
2231 recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
2232 io: Arc<dyn EncodingsIo>,
2233 cache: Arc<LanceCache>,
2234 name: String,
2235 path: Vec<u32>,
2236 path_names: Vec<String>,
2237}
2238
2239pub struct ScopedSchedulerContext<'a> {
2240 pub context: &'a mut SchedulerContext,
2241}
2242
2243impl<'a> ScopedSchedulerContext<'a> {
2244 pub fn pop(self) -> &'a mut SchedulerContext {
2245 self.context.pop();
2246 self.context
2247 }
2248}
2249
2250impl SchedulerContext {
2251 pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
2252 Self {
2253 io,
2254 cache,
2255 recv: None,
2256 name: "".to_string(),
2257 path: Vec::new(),
2258 path_names: Vec::new(),
2259 }
2260 }
2261
2262 pub fn io(&self) -> &Arc<dyn EncodingsIo> {
2263 &self.io
2264 }
2265
2266 pub fn cache(&self) -> &Arc<LanceCache> {
2267 &self.cache
2268 }
2269
2270 pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
2271 self.path.push(index);
2272 self.path_names.push(name.to_string());
2273 ScopedSchedulerContext { context: self }
2274 }
2275
2276 pub fn pop(&mut self) {
2277 self.path.pop();
2278 self.path_names.pop();
2279 }
2280
2281 pub fn path_name(&self) -> String {
2282 let path = self.path_names.join("/");
2283 if self.recv.is_some() {
2284 format!("TEMP({}){}", self.name, path)
2285 } else {
2286 format!("ROOT{}", path)
2287 }
2288 }
2289
2290 pub fn current_path(&self) -> VecDeque<u32> {
2291 VecDeque::from_iter(self.path.iter().copied())
2292 }
2293
2294 #[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
2295 pub fn locate_decoder(
2296 &mut self,
2297 decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
2298 ) -> crate::previous::decoder::DecoderReady {
2299 trace!(
2300 "Scheduling decoder of type {:?} for {:?}",
2301 decoder.data_type(),
2302 self.path,
2303 );
2304 crate::previous::decoder::DecoderReady {
2305 decoder,
2306 path: self.current_path(),
2307 }
2308 }
2309}
2310
2311pub struct UnloadedPage(pub BoxFuture<'static, Result<LoadedPage>>);
2312
2313impl std::fmt::Debug for UnloadedPage {
2314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2315 f.debug_struct("UnloadedPage").finish()
2316 }
2317}
2318
2319#[derive(Debug)]
2320pub struct ScheduledScanLine {
2321 pub rows_scheduled: u64,
2322 pub decoders: Vec<MessageType>,
2323}
2324
2325pub trait StructuralSchedulingJob: std::fmt::Debug {
2326 fn schedule_next(
2327 &mut self,
2328 context: &mut SchedulerContext,
2329 ) -> Result<Option<ScheduledScanLine>>;
2330}
2331
2332pub struct FilterExpression(pub Bytes);
2340
2341impl FilterExpression {
2342 pub fn no_filter() -> Self {
2347 Self(Bytes::new())
2348 }
2349
2350 pub fn is_noop(&self) -> bool {
2352 self.0.is_empty()
2353 }
2354}
2355
2356pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
2357 fn initialize<'a>(
2358 &'a mut self,
2359 filter: &'a FilterExpression,
2360 context: &'a SchedulerContext,
2361 ) -> BoxFuture<'a, Result<()>>;
2362 fn schedule_ranges<'a>(
2363 &'a self,
2364 ranges: &[Range<u64>],
2365 filter: &FilterExpression,
2366 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
2367}
2368
2369pub trait DecodeArrayTask: Send {
2371 fn decode(self: Box<Self>) -> Result<ArrayRef>;
2373}
2374
2375impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
2376 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2377 StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
2378 }
2379}
2380
2381pub struct NextDecodeTask {
2386 pub task: Box<dyn DecodeArrayTask>,
2388 pub num_rows: u64,
2390}
2391
2392impl NextDecodeTask {
2393 #[instrument(name = "task_to_batch", level = "debug", skip_all)]
2398 fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
2399 let struct_arr = self.task.decode();
2400 match struct_arr {
2401 Ok(struct_arr) => {
2402 let batch = RecordBatch::from(struct_arr.as_struct());
2403 let size_bytes = batch.get_array_memory_size() as u64;
2404 if size_bytes > BATCH_SIZE_BYTES_WARNING {
2405 emitted_batch_size_warning.call_once(|| {
2406 let size_mb = size_bytes / 1024 / 1024;
2407 debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
2408 });
2409 }
2410 Ok(batch)
2411 }
2412 Err(e) => {
2413 let e = Error::Internal {
2414 message: format!("Error decoding batch: {}", e),
2415 location: location!(),
2416 };
2417 Err(e)
2418 }
2419 }
2420 }
2421}
2422
2423#[derive(Debug)]
2427pub enum MessageType {
2428 DecoderReady(crate::previous::decoder::DecoderReady),
2433 UnloadedPage(UnloadedPage),
2437}
2438
2439impl MessageType {
2440 pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
2441 match self {
2442 Self::DecoderReady(decoder) => decoder,
2443 Self::UnloadedPage(_) => {
2444 panic!("Expected DecoderReady but got UnloadedPage")
2445 }
2446 }
2447 }
2448
2449 pub fn into_structural(self) -> UnloadedPage {
2450 match self {
2451 Self::UnloadedPage(unloaded) => unloaded,
2452 Self::DecoderReady(_) => {
2453 panic!("Expected UnloadedPage but got DecoderReady")
2454 }
2455 }
2456 }
2457}
2458
2459pub struct DecoderMessage {
2460 pub scheduled_so_far: u64,
2461 pub decoders: Vec<MessageType>,
2462}
2463
2464pub struct DecoderContext {
2465 source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
2466}
2467
2468impl DecoderContext {
2469 pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
2470 Self { source }
2471 }
2472}
2473
2474pub struct DecodedPage {
2475 pub data: DataBlock,
2476 pub repdef: RepDefUnraveler,
2477}
2478
2479pub trait DecodePageTask: Send + std::fmt::Debug {
2480 fn decode(self: Box<Self>) -> Result<DecodedPage>;
2482}
2483
2484pub trait StructuralPageDecoder: std::fmt::Debug + Send {
2485 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
2486 fn num_rows(&self) -> u64;
2487}
2488
2489#[derive(Debug)]
2490pub struct LoadedPage {
2491 pub decoder: Box<dyn StructuralPageDecoder>,
2493 pub path: VecDeque<u32>,
2512 pub page_index: usize,
2513}
2514
2515pub struct DecodedArray {
2516 pub array: ArrayRef,
2517 pub repdef: CompositeRepDefUnraveler,
2518}
2519
2520pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
2521 fn decode(self: Box<Self>) -> Result<DecodedArray>;
2522}
2523
2524pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
2525 fn accept_page(&mut self, _child: LoadedPage) -> Result<()>;
2530 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
2532 fn data_type(&self) -> &DataType;
2534}
2535
2536#[derive(Debug, Default)]
2537pub struct DecoderPlugins {}
2538
2539pub async fn decode_batch(
2541 batch: &EncodedBatch,
2542 filter: &FilterExpression,
2543 decoder_plugins: Arc<DecoderPlugins>,
2544 should_validate: bool,
2545 version: LanceFileVersion,
2546 cache: Option<Arc<LanceCache>>,
2547) -> Result<RecordBatch> {
2548 let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
2553 let cache = if let Some(cache) = cache {
2554 cache
2555 } else {
2556 Arc::new(lance_core::cache::LanceCache::with_capacity(
2557 128 * 1024 * 1024,
2558 ))
2559 };
2560 let mut decode_scheduler = DecodeBatchScheduler::try_new(
2561 batch.schema.as_ref(),
2562 &batch.top_level_columns,
2563 &batch.page_table,
2564 &vec![],
2565 batch.num_rows,
2566 decoder_plugins,
2567 io_scheduler.clone(),
2568 cache,
2569 filter,
2570 &DecoderConfig::default(),
2571 )
2572 .await?;
2573 let (tx, rx) = unbounded_channel();
2574 decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
2575 let is_structural = version >= LanceFileVersion::V2_1;
2576 let mut decode_stream = create_decode_stream(
2577 &batch.schema,
2578 batch.num_rows,
2579 batch.num_rows as u32,
2580 is_structural,
2581 should_validate,
2582 rx,
2583 );
2584 decode_stream.next().await.unwrap().task.await
2585}
2586
2587#[cfg(test)]
2588mod tests {
2590 use super::*;
2591
2592 #[test]
2593 fn test_coalesce_indices_to_ranges_with_single_index() {
2594 let indices = vec![1];
2595 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2596 assert_eq!(ranges, vec![1..2]);
2597 }
2598
2599 #[test]
2600 fn test_coalesce_indices_to_ranges() {
2601 let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
2602 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2603 assert_eq!(ranges, vec![1..10]);
2604 }
2605
2606 #[test]
2607 fn test_coalesce_indices_to_ranges_with_gaps() {
2608 let indices = vec![1, 2, 3, 5, 6, 7, 9];
2609 let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
2610 assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
2611 }
2612}