use std::collections::VecDeque;
use std::sync::{LazyLock, Once, OnceLock};
use std::{ops::Range, sync::Arc};
use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
use bytes::Bytes;
use futures::future::{BoxFuture, MaybeDone, maybe_done};
use futures::stream::{self, BoxStream};
use futures::{FutureExt, StreamExt};
use lance_arrow::DataTypeExt;
use lance_core::cache::LanceCache;
use lance_core::datatypes::{BLOB_DESC_LANCE_FIELD, Field, Schema};
use lance_core::utils::futures::FinallyStreamExt;
use lance_core::utils::parse::parse_env_as_bool;
use log::{debug, trace, warn};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, unbounded_channel};
use lance_core::error::LanceOptionExt;
use lance_core::{ArrowResult, Error, Result};
use tracing::instrument;
use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy};
use crate::data::DataBlock;
use crate::encoder::EncodedBatch;
use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler;
use crate::encodings::logical::list::StructuralListScheduler;
use crate::encodings::logical::map::StructuralMapScheduler;
use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler;
use crate::encodings::logical::r#struct::{StructuralStructDecoder, StructuralStructScheduler};
use crate::format::pb::{self, column_encoding};
use crate::format::pb21;
use crate::previous::decoder::LogicalPageDecoder;
use crate::previous::encodings::logical::list::OffsetPageInfo;
use crate::previous::encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler};
use crate::previous::encodings::logical::{
binary::BinaryFieldScheduler, blob::BlobFieldScheduler, list::ListFieldScheduler,
primitive::PrimitiveFieldScheduler,
};
use crate::repdef::{CompositeRepDefUnraveler, RepDefUnraveler};
use crate::version::LanceFileVersion;
use crate::{BufferScheduler, EncodingsIo};
const BATCH_SIZE_BYTES_WARNING: u64 = 10 * 1024 * 1024;
const ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE: &str =
"LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE";
const ENV_LANCE_READ_CACHE_REPETITION_INDEX: &str = "LANCE_READ_CACHE_REPETITION_INDEX";
fn default_cache_repetition_index() -> bool {
static DEFAULT_CACHE_REPETITION_INDEX: OnceLock<bool> = OnceLock::new();
*DEFAULT_CACHE_REPETITION_INDEX
.get_or_init(|| parse_env_as_bool(ENV_LANCE_READ_CACHE_REPETITION_INDEX, true))
}
#[derive(Debug)]
pub enum PageEncoding {
Legacy(pb::ArrayEncoding),
Structural(pb21::PageLayout),
}
impl PageEncoding {
pub fn as_legacy(&self) -> &pb::ArrayEncoding {
match self {
Self::Legacy(enc) => enc,
Self::Structural(_) => panic!("Expected a legacy encoding"),
}
}
pub fn as_structural(&self) -> &pb21::PageLayout {
match self {
Self::Structural(enc) => enc,
Self::Legacy(_) => panic!("Expected a structural encoding"),
}
}
pub fn is_structural(&self) -> bool {
matches!(self, Self::Structural(_))
}
}
#[derive(Debug)]
pub struct PageInfo {
pub num_rows: u64,
pub priority: u64,
pub encoding: PageEncoding,
pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
}
#[derive(Debug, Clone)]
pub struct ColumnInfo {
pub index: u32,
pub page_infos: Arc<[PageInfo]>,
pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
pub encoding: pb::ColumnEncoding,
}
impl ColumnInfo {
pub fn new(
index: u32,
page_infos: Arc<[PageInfo]>,
buffer_offsets_and_sizes: Vec<(u64, u64)>,
encoding: pb::ColumnEncoding,
) -> Self {
Self {
index,
page_infos,
buffer_offsets_and_sizes: buffer_offsets_and_sizes.into_boxed_slice().into(),
encoding,
}
}
pub fn is_structural(&self) -> bool {
self.page_infos
.first()
.map(|page| page.encoding.is_structural())
.unwrap_or(false)
}
}
enum RootScheduler {
Structural(Box<dyn StructuralFieldScheduler>),
Legacy(Arc<dyn crate::previous::decoder::FieldScheduler>),
}
impl RootScheduler {
fn as_legacy(&self) -> &Arc<dyn crate::previous::decoder::FieldScheduler> {
match self {
Self::Structural(_) => panic!("Expected a legacy scheduler"),
Self::Legacy(s) => s,
}
}
fn as_structural(&self) -> &dyn StructuralFieldScheduler {
match self {
Self::Structural(s) => s.as_ref(),
Self::Legacy(_) => panic!("Expected a structural scheduler"),
}
}
}
pub struct DecodeBatchScheduler {
root_scheduler: RootScheduler,
pub root_fields: Fields,
cache: Arc<LanceCache>,
}
pub struct ColumnInfoIter<'a> {
column_infos: Vec<Arc<ColumnInfo>>,
column_indices: &'a [u32],
column_info_pos: usize,
column_indices_pos: usize,
}
impl<'a> ColumnInfoIter<'a> {
pub fn new(column_infos: Vec<Arc<ColumnInfo>>, column_indices: &'a [u32]) -> Self {
let initial_pos = column_indices.first().copied().unwrap_or(0) as usize;
Self {
column_infos,
column_indices,
column_info_pos: initial_pos,
column_indices_pos: 0,
}
}
pub fn peek(&self) -> &Arc<ColumnInfo> {
&self.column_infos[self.column_info_pos]
}
pub fn peek_transform(&mut self, transform: impl FnOnce(Arc<ColumnInfo>) -> Arc<ColumnInfo>) {
let column_info = self.column_infos[self.column_info_pos].clone();
let transformed = transform(column_info);
self.column_infos[self.column_info_pos] = transformed;
}
pub fn expect_next(&mut self) -> Result<&Arc<ColumnInfo>> {
self.next().ok_or_else(|| {
Error::invalid_input(
"there were more fields in the schema than provided column indices / infos",
)
})
}
fn next(&mut self) -> Option<&Arc<ColumnInfo>> {
if self.column_info_pos < self.column_infos.len() {
let info = &self.column_infos[self.column_info_pos];
self.column_info_pos += 1;
Some(info)
} else {
None
}
}
pub(crate) fn next_top_level(&mut self) {
self.column_indices_pos += 1;
if self.column_indices_pos < self.column_indices.len() {
self.column_info_pos = self.column_indices[self.column_indices_pos] as usize;
} else {
self.column_info_pos = self.column_infos.len();
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct FileBuffers<'a> {
pub positions_and_sizes: &'a [(u64, u64)],
}
#[derive(Clone, Copy, Debug)]
pub struct ColumnBuffers<'a, 'b> {
pub file_buffers: FileBuffers<'a>,
pub positions_and_sizes: &'b [(u64, u64)],
}
#[derive(Clone, Copy, Debug)]
pub struct PageBuffers<'a, 'b, 'c> {
pub column_buffers: ColumnBuffers<'a, 'b>,
pub positions_and_sizes: &'c [(u64, u64)],
}
#[derive(Debug)]
pub struct CoreFieldDecoderStrategy {
pub validate_data: bool,
pub decompressor_strategy: Arc<dyn DecompressionStrategy>,
pub cache_repetition_index: bool,
}
impl Default for CoreFieldDecoderStrategy {
fn default() -> Self {
Self {
validate_data: false,
decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
cache_repetition_index: false,
}
}
}
impl CoreFieldDecoderStrategy {
pub fn with_cache_repetition_index(mut self, cache_repetition_index: bool) -> Self {
self.cache_repetition_index = cache_repetition_index;
self
}
pub fn from_decoder_config(config: &DecoderConfig) -> Self {
Self {
validate_data: config.validate_on_decode,
decompressor_strategy: Arc::new(DefaultDecompressionStrategy {}),
cache_repetition_index: config.cache_repetition_index,
}
}
fn ensure_values_encoded(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
let column_encoding = column_info
.encoding
.column_encoding
.as_ref()
.ok_or_else(|| {
Error::invalid_input(format!(
"the column at index {} was missing a ColumnEncoding",
column_info.index
))
})?;
if matches!(
column_encoding,
pb::column_encoding::ColumnEncoding::Values(_)
) {
Ok(())
} else {
Err(Error::invalid_input(format!(
"the column at index {} mapping to the input field {} has column encoding {:?} and no decoder is registered to handle it",
column_info.index, field_name, column_encoding
)))
}
}
fn is_structural_primitive(data_type: &DataType) -> bool {
if data_type.is_primitive() {
true
} else {
match data_type {
DataType::Dictionary(_, value_type) => Self::is_structural_primitive(value_type),
DataType::Boolean
| DataType::Null
| DataType::FixedSizeBinary(_)
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8
| DataType::LargeUtf8 => true,
DataType::FixedSizeList(inner, _) => {
Self::is_structural_primitive(inner.data_type())
}
_ => false,
}
}
}
fn is_primitive_legacy(data_type: &DataType) -> bool {
if data_type.is_primitive() {
true
} else {
match data_type {
DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
DataType::FixedSizeList(inner, _) => Self::is_primitive_legacy(inner.data_type()),
_ => false,
}
}
}
fn create_primitive_scheduler(
&self,
field: &Field,
column: &ColumnInfo,
buffers: FileBuffers,
) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
Self::ensure_values_encoded(column, &field.name)?;
let column_buffers = ColumnBuffers {
file_buffers: buffers,
positions_and_sizes: &column.buffer_offsets_and_sizes,
};
Ok(Box::new(PrimitiveFieldScheduler::new(
column.index,
field.data_type(),
column.page_infos.clone(),
column_buffers,
self.validate_data,
)))
}
fn check_simple_struct(column_info: &ColumnInfo, field_name: &str) -> Result<()> {
Self::ensure_values_encoded(column_info, field_name)?;
if column_info.page_infos.len() != 1 {
return Err(Error::invalid_input_source(format!("Due to schema we expected a struct column but we received a column with {} pages and right now we only support struct columns with 1 page", column_info.page_infos.len()).into()));
}
let encoding = &column_info.page_infos[0].encoding;
match encoding.as_legacy().array_encoding.as_ref().unwrap() {
pb::array_encoding::ArrayEncoding::Struct(_) => Ok(()),
_ => Err(Error::invalid_input_source(format!("Expected a struct encoding because we have a struct field in the schema but got the encoding {:?}", encoding).into())),
}
}
fn check_packed_struct(column_info: &ColumnInfo) -> bool {
let encoding = &column_info.page_infos[0].encoding;
matches!(
encoding.as_legacy().array_encoding.as_ref().unwrap(),
pb::array_encoding::ArrayEncoding::PackedStruct(_)
)
}
fn create_list_scheduler(
&self,
list_field: &Field,
column_infos: &mut ColumnInfoIter,
buffers: FileBuffers,
offsets_column: &ColumnInfo,
) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
Self::ensure_values_encoded(offsets_column, &list_field.name)?;
let offsets_column_buffers = ColumnBuffers {
file_buffers: buffers,
positions_and_sizes: &offsets_column.buffer_offsets_and_sizes,
};
let items_scheduler =
self.create_legacy_field_scheduler(&list_field.children[0], column_infos, buffers)?;
let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
.page_infos
.iter()
.filter(|offsets_page| offsets_page.num_rows > 0)
.map(|offsets_page| {
if let Some(pb::array_encoding::ArrayEncoding::List(list_encoding)) =
&offsets_page.encoding.as_legacy().array_encoding
{
let inner = PageInfo {
buffer_offsets_and_sizes: offsets_page.buffer_offsets_and_sizes.clone(),
encoding: PageEncoding::Legacy(
list_encoding.offsets.as_ref().unwrap().as_ref().clone(),
),
num_rows: offsets_page.num_rows,
priority: 0,
};
(
inner,
OffsetPageInfo {
offsets_in_page: offsets_page.num_rows,
null_offset_adjustment: list_encoding.null_offset_adjustment,
num_items_referenced_by_page: list_encoding.num_items,
},
)
} else {
panic!("Expected a list column");
}
})
.unzip();
let inner = Arc::new(PrimitiveFieldScheduler::new(
offsets_column.index,
DataType::UInt64,
Arc::from(inner_infos.into_boxed_slice()),
offsets_column_buffers,
self.validate_data,
)) as Arc<dyn crate::previous::decoder::FieldScheduler>;
let items_field = match list_field.data_type() {
DataType::List(inner) => inner,
DataType::LargeList(inner) => inner,
_ => unreachable!(),
};
let offset_type = if matches!(list_field.data_type(), DataType::List(_)) {
DataType::Int32
} else {
DataType::Int64
};
Ok(Box::new(ListFieldScheduler::new(
inner,
items_scheduler.into(),
items_field,
offset_type,
null_offset_adjustments,
)))
}
fn unwrap_blob(column_info: &ColumnInfo) -> Option<ColumnInfo> {
if let column_encoding::ColumnEncoding::Blob(blob) =
column_info.encoding.column_encoding.as_ref().unwrap()
{
let mut column_info = column_info.clone();
column_info.encoding = blob.inner.as_ref().unwrap().as_ref().clone();
Some(column_info)
} else {
None
}
}
fn create_structural_field_scheduler(
&self,
field: &Field,
column_infos: &mut ColumnInfoIter,
) -> Result<Box<dyn StructuralFieldScheduler>> {
let data_type = field.data_type();
if Self::is_structural_primitive(&data_type) {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);
column_infos.next_top_level();
return Ok(scheduler);
}
match &data_type {
DataType::Struct(fields) => {
if field.is_packed_struct() {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);
column_infos.next_top_level();
return Ok(scheduler);
}
if field.is_blob() {
let column_info = column_infos.peek();
if column_info.page_infos.iter().any(|page| {
matches!(
page.encoding,
PageEncoding::Structural(pb21::PageLayout {
layout: Some(pb21::page_layout::Layout::BlobLayout(_))
})
)
}) {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
self.cache_repetition_index,
field,
)?);
column_infos.next_top_level();
return Ok(scheduler);
}
}
let mut child_schedulers = Vec::with_capacity(field.children.len());
for field in field.children.iter() {
let field_scheduler =
self.create_structural_field_scheduler(field, column_infos)?;
child_schedulers.push(field_scheduler);
}
let fields = fields.clone();
Ok(
Box::new(StructuralStructScheduler::new(child_schedulers, fields))
as Box<dyn StructuralFieldScheduler>,
)
}
DataType::List(_) | DataType::LargeList(_) => {
let child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
Ok(Box::new(StructuralListScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
DataType::FixedSizeList(inner, dimension)
if matches!(inner.data_type(), DataType::Struct(_)) =>
{
let child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
Ok(Box::new(StructuralFixedSizeListScheduler::new(
child_scheduler,
*dimension,
)) as Box<dyn StructuralFieldScheduler>)
}
DataType::Map(_, keys_sorted) => {
if *keys_sorted {
return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", *keys_sorted).into()));
}
let entries_child = field.children.first().expect_ok()?;
let child_scheduler =
self.create_structural_field_scheduler(entries_child, column_infos)?;
Ok(Box::new(StructuralMapScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
_ => todo!("create_structural_field_scheduler for {}", data_type),
}
}
fn create_legacy_field_scheduler(
&self,
field: &Field,
column_infos: &mut ColumnInfoIter,
buffers: FileBuffers,
) -> Result<Box<dyn crate::previous::decoder::FieldScheduler>> {
let data_type = field.data_type();
if Self::is_primitive_legacy(&data_type) {
let column_info = column_infos.expect_next()?;
let scheduler = self.create_primitive_scheduler(field, column_info, buffers)?;
return Ok(scheduler);
} else if data_type.is_binary_like() {
let column_info = column_infos.expect_next()?.clone();
if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
let desc_scheduler =
self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
return Ok(blob_scheduler);
}
if let Some(page_info) = column_info.page_infos.first() {
if matches!(
page_info.encoding.as_legacy(),
pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::List(..))
}
) {
let list_type = if matches!(data_type, DataType::Utf8 | DataType::Binary) {
DataType::List(Arc::new(ArrowField::new("item", DataType::UInt8, false)))
} else {
DataType::LargeList(Arc::new(ArrowField::new(
"item",
DataType::UInt8,
false,
)))
};
let list_field = Field::try_from(ArrowField::new(
field.name.clone(),
list_type,
field.nullable,
))
.unwrap();
let list_scheduler = self.create_list_scheduler(
&list_field,
column_infos,
buffers,
&column_info,
)?;
let binary_scheduler = Box::new(BinaryFieldScheduler::new(
list_scheduler.into(),
field.data_type(),
));
return Ok(binary_scheduler);
} else {
let scheduler =
self.create_primitive_scheduler(field, &column_info, buffers)?;
return Ok(scheduler);
}
} else {
return self.create_primitive_scheduler(field, &column_info, buffers);
}
}
match &data_type {
DataType::FixedSizeList(inner, _dimension) => {
if Self::is_primitive_legacy(inner.data_type()) {
let primitive_col = column_infos.expect_next()?;
let scheduler =
self.create_primitive_scheduler(field, primitive_col, buffers)?;
Ok(scheduler)
} else {
todo!()
}
}
DataType::Dictionary(_key_type, value_type) => {
if Self::is_primitive_legacy(value_type) || value_type.is_binary_like() {
let primitive_col = column_infos.expect_next()?;
let scheduler =
self.create_primitive_scheduler(field, primitive_col, buffers)?;
Ok(scheduler)
} else {
Err(Error::not_supported_source(
format!(
"No way to decode into a dictionary field of type {}",
value_type
)
.into(),
))
}
}
DataType::List(_) | DataType::LargeList(_) => {
let offsets_column = column_infos.expect_next()?.clone();
column_infos.next_top_level();
self.create_list_scheduler(field, column_infos, buffers, &offsets_column)
}
DataType::Struct(fields) => {
let column_info = column_infos.expect_next()?;
if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
return self.create_primitive_scheduler(field, &blob_col, buffers);
}
if Self::check_packed_struct(column_info) {
self.create_primitive_scheduler(field, column_info, buffers)
} else {
Self::check_simple_struct(column_info, &field.name).unwrap();
let num_rows = column_info
.page_infos
.iter()
.map(|page| page.num_rows)
.sum();
let mut child_schedulers = Vec::with_capacity(field.children.len());
for field in &field.children {
column_infos.next_top_level();
let field_scheduler =
self.create_legacy_field_scheduler(field, column_infos, buffers)?;
child_schedulers.push(Arc::from(field_scheduler));
}
let fields = fields.clone();
Ok(Box::new(SimpleStructScheduler::new(
child_schedulers,
fields,
num_rows,
)))
}
}
_ => todo!(),
}
}
}
fn root_column(num_rows: u64) -> ColumnInfo {
let num_root_pages = num_rows.div_ceil(u32::MAX as u64);
let final_page_num_rows = num_rows % (u32::MAX as u64);
let root_pages = (0..num_root_pages)
.map(|i| PageInfo {
num_rows: if i == num_root_pages - 1 {
final_page_num_rows
} else {
u64::MAX
},
encoding: PageEncoding::Legacy(pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
pb::SimpleStruct {},
)),
}),
priority: 0, buffer_offsets_and_sizes: Arc::new([]),
})
.collect::<Vec<_>>();
ColumnInfo {
buffer_offsets_and_sizes: Arc::new([]),
encoding: pb::ColumnEncoding {
column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
},
index: u32::MAX,
page_infos: Arc::from(root_pages),
}
}
pub enum RootDecoder {
Structural(StructuralStructDecoder),
Legacy(SimpleStructDecoder),
}
impl RootDecoder {
pub fn into_structural(self) -> StructuralStructDecoder {
match self {
Self::Structural(decoder) => decoder,
Self::Legacy(_) => panic!("Expected a structural decoder"),
}
}
pub fn into_legacy(self) -> SimpleStructDecoder {
match self {
Self::Legacy(decoder) => decoder,
Self::Structural(_) => panic!("Expected a legacy decoder"),
}
}
}
impl DecodeBatchScheduler {
#[allow(clippy::too_many_arguments)]
pub async fn try_new<'a>(
schema: &'a Schema,
column_indices: &[u32],
column_infos: &[Arc<ColumnInfo>],
file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
num_rows: u64,
_decoder_plugins: Arc<DecoderPlugins>,
io: Arc<dyn EncodingsIo>,
cache: Arc<LanceCache>,
filter: &FilterExpression,
decoder_config: &DecoderConfig,
) -> Result<Self> {
assert!(num_rows > 0);
let buffers = FileBuffers {
positions_and_sizes: file_buffer_positions_and_sizes,
};
let arrow_schema = ArrowSchema::from(schema);
let root_fields = arrow_schema.fields().clone();
let root_type = DataType::Struct(root_fields.clone());
let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
root_field.children.clone_from(&schema.fields);
root_field
.metadata
.insert("__lance_decoder_root".to_string(), "true".to_string());
if column_infos.is_empty() || column_infos[0].is_structural() {
let mut column_iter = ColumnInfoIter::new(column_infos.to_vec(), column_indices);
let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
let mut root_scheduler =
strategy.create_structural_field_scheduler(&root_field, &mut column_iter)?;
let context = SchedulerContext::new(io, cache.clone());
root_scheduler.initialize(filter, &context).await?;
Ok(Self {
root_scheduler: RootScheduler::Structural(root_scheduler),
root_fields,
cache,
})
} else {
let mut columns = Vec::with_capacity(column_infos.len() + 1);
columns.push(Arc::new(root_column(num_rows)));
columns.extend(column_infos.iter().cloned());
let adjusted_column_indices = [0_u32]
.into_iter()
.chain(column_indices.iter().map(|i| i.saturating_add(1)))
.collect::<Vec<_>>();
let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
let strategy = CoreFieldDecoderStrategy::from_decoder_config(decoder_config);
let root_scheduler =
strategy.create_legacy_field_scheduler(&root_field, &mut column_iter, buffers)?;
let context = SchedulerContext::new(io, cache.clone());
root_scheduler.initialize(filter, &context).await?;
Ok(Self {
root_scheduler: RootScheduler::Legacy(root_scheduler.into()),
root_fields,
cache,
})
}
}
#[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
pub fn from_scheduler(
root_scheduler: Arc<dyn crate::previous::decoder::FieldScheduler>,
root_fields: Fields,
cache: Arc<LanceCache>,
) -> Self {
Self {
root_scheduler: RootScheduler::Legacy(root_scheduler),
root_fields,
cache,
}
}
fn do_schedule_ranges_structural(
&mut self,
ranges: &[Range<u64>],
filter: &FilterExpression,
io: Arc<dyn EncodingsIo>,
mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
) {
let root_scheduler = self.root_scheduler.as_structural();
let mut context = SchedulerContext::new(io, self.cache.clone());
let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
if let Err(schedule_ranges_err) = maybe_root_job {
schedule_action(Err(schedule_ranges_err));
return;
}
let mut root_job = maybe_root_job.unwrap();
let mut num_rows_scheduled = 0;
loop {
let maybe_next_scan_lines = root_job.schedule_next(&mut context);
if let Err(err) = maybe_next_scan_lines {
schedule_action(Err(err));
return;
}
let next_scan_lines = maybe_next_scan_lines.unwrap();
if next_scan_lines.is_empty() {
return;
}
for next_scan_line in next_scan_lines {
trace!(
"Scheduled scan line of {} rows and {} decoders",
next_scan_line.rows_scheduled,
next_scan_line.decoders.len()
);
num_rows_scheduled += next_scan_line.rows_scheduled;
if !schedule_action(Ok(DecoderMessage {
scheduled_so_far: num_rows_scheduled,
decoders: next_scan_line.decoders,
})) {
return;
}
}
}
}
fn do_schedule_ranges_legacy(
&mut self,
ranges: &[Range<u64>],
filter: &FilterExpression,
io: Arc<dyn EncodingsIo>,
mut schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
priority: Option<Box<dyn PriorityRange>>,
) {
let root_scheduler = self.root_scheduler.as_legacy();
let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
trace!(
"Scheduling {} ranges across {}..{} ({} rows){}",
ranges.len(),
ranges.first().unwrap().start,
ranges.last().unwrap().end,
rows_requested,
priority
.as_ref()
.map(|p| format!(" (priority={:?})", p))
.unwrap_or_default()
);
let mut context = SchedulerContext::new(io, self.cache.clone());
let maybe_root_job = root_scheduler.schedule_ranges(ranges, filter);
if let Err(schedule_ranges_err) = maybe_root_job {
schedule_action(Err(schedule_ranges_err));
return;
}
let mut root_job = maybe_root_job.unwrap();
let mut num_rows_scheduled = 0;
let mut rows_to_schedule = root_job.num_rows();
let mut priority = priority.unwrap_or(Box::new(SimplePriorityRange::new(0)));
trace!("Scheduled ranges refined to {} rows", rows_to_schedule);
while rows_to_schedule > 0 {
let maybe_next_scan_line = root_job.schedule_next(&mut context, priority.as_ref());
if let Err(schedule_next_err) = maybe_next_scan_line {
schedule_action(Err(schedule_next_err));
return;
}
let next_scan_line = maybe_next_scan_line.unwrap();
priority.advance(next_scan_line.rows_scheduled);
num_rows_scheduled += next_scan_line.rows_scheduled;
rows_to_schedule -= next_scan_line.rows_scheduled;
trace!(
"Scheduled scan line of {} rows and {} decoders",
next_scan_line.rows_scheduled,
next_scan_line.decoders.len()
);
if !schedule_action(Ok(DecoderMessage {
scheduled_so_far: num_rows_scheduled,
decoders: next_scan_line.decoders,
})) {
return;
}
trace!("Finished scheduling {} ranges", ranges.len());
}
}
fn do_schedule_ranges(
&mut self,
ranges: &[Range<u64>],
filter: &FilterExpression,
io: Arc<dyn EncodingsIo>,
schedule_action: impl FnMut(Result<DecoderMessage>) -> bool,
priority: Option<Box<dyn PriorityRange>>,
) {
match &self.root_scheduler {
RootScheduler::Legacy(_) => {
self.do_schedule_ranges_legacy(ranges, filter, io, schedule_action, priority)
}
RootScheduler::Structural(_) => {
self.do_schedule_ranges_structural(ranges, filter, io, schedule_action)
}
}
}
pub fn schedule_ranges_to_vec(
&mut self,
ranges: &[Range<u64>],
filter: &FilterExpression,
io: Arc<dyn EncodingsIo>,
priority: Option<Box<dyn PriorityRange>>,
) -> Result<Vec<DecoderMessage>> {
let mut decode_messages = Vec::new();
self.do_schedule_ranges(
ranges,
filter,
io,
|msg| {
decode_messages.push(msg);
true
},
priority,
);
decode_messages.into_iter().collect::<Result<Vec<_>>>()
}
#[instrument(skip_all)]
pub fn schedule_ranges(
&mut self,
ranges: &[Range<u64>],
filter: &FilterExpression,
sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
scheduler: Arc<dyn EncodingsIo>,
) {
self.do_schedule_ranges(
ranges,
filter,
scheduler,
|msg| {
match sink.send(msg) {
Ok(_) => true,
Err(SendError { .. }) => {
debug!(
"schedule_ranges aborting early since decoder appears to have been dropped"
);
false
}
}
},
None,
)
}
#[instrument(skip_all)]
pub fn schedule_range(
&mut self,
range: Range<u64>,
filter: &FilterExpression,
sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
scheduler: Arc<dyn EncodingsIo>,
) {
self.schedule_ranges(&[range], filter, sink, scheduler)
}
pub fn schedule_take(
&mut self,
indices: &[u64],
filter: &FilterExpression,
sink: mpsc::UnboundedSender<Result<DecoderMessage>>,
scheduler: Arc<dyn EncodingsIo>,
) {
debug_assert!(indices.windows(2).all(|w| w[0] < w[1]));
if indices.is_empty() {
return;
}
trace!("Scheduling take of {} rows", indices.len());
let ranges = Self::indices_to_ranges(indices);
self.schedule_ranges(&ranges, filter, sink, scheduler)
}
fn indices_to_ranges(indices: &[u64]) -> Vec<Range<u64>> {
let mut ranges = Vec::new();
let mut start = indices[0];
for window in indices.windows(2) {
if window[1] != window[0] + 1 {
ranges.push(start..window[0] + 1);
start = window[1];
}
}
ranges.push(start..*indices.last().unwrap() + 1);
ranges
}
}
pub struct ReadBatchTask {
pub task: BoxFuture<'static, Result<RecordBatch>>,
pub num_rows: u32,
}
pub struct BatchDecodeStream {
context: DecoderContext,
root_decoder: SimpleStructDecoder,
rows_remaining: u64,
rows_per_batch: u32,
rows_scheduled: u64,
rows_drained: u64,
scheduler_exhausted: bool,
emitted_batch_size_warning: Arc<Once>,
}
impl BatchDecodeStream {
pub fn new(
scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
rows_per_batch: u32,
num_rows: u64,
root_decoder: SimpleStructDecoder,
) -> Self {
Self {
context: DecoderContext::new(scheduled),
root_decoder,
rows_remaining: num_rows,
rows_per_batch,
rows_scheduled: 0,
rows_drained: 0,
scheduler_exhausted: false,
emitted_batch_size_warning: Arc::new(Once::new()),
}
}
fn accept_decoder(&mut self, decoder: crate::previous::decoder::DecoderReady) -> Result<()> {
if decoder.path.is_empty() {
Ok(())
} else {
self.root_decoder.accept_child(decoder)
}
}
#[instrument(level = "debug", skip_all)]
async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
if self.scheduler_exhausted {
return Ok(self.rows_scheduled);
}
while self.rows_scheduled < scheduled_need {
let next_message = self.context.source.recv().await;
match next_message {
Some(scan_line) => {
let scan_line = scan_line?;
self.rows_scheduled = scan_line.scheduled_so_far;
for message in scan_line.decoders {
self.accept_decoder(message.into_legacy())?;
}
}
None => {
self.scheduler_exhausted = true;
return Ok(self.rows_scheduled);
}
}
}
Ok(scheduled_need)
}
#[instrument(level = "debug", skip_all)]
async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
trace!(
"Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
self.rows_remaining, self.rows_drained, self.rows_scheduled,
);
if self.rows_remaining == 0 {
return Ok(None);
}
let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
self.rows_remaining -= to_take;
let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
trace!(
"scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
scheduled_need, self.rows_drained, to_take, self.rows_scheduled
);
if scheduled_need > 0 {
let desired_scheduled = scheduled_need + self.rows_scheduled;
trace!(
"Draining from scheduler (desire at least {} scheduled rows)",
desired_scheduled
);
let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
if actually_scheduled < desired_scheduled {
let under_scheduled = desired_scheduled - actually_scheduled;
to_take -= under_scheduled;
}
}
if to_take == 0 {
return Ok(None);
}
let loaded_need = self.rows_drained + to_take - 1;
trace!(
"Waiting for I/O (desire at least {} fully loaded rows)",
loaded_need
);
self.root_decoder.wait_for_loaded(loaded_need).await?;
let next_task = self.root_decoder.drain(to_take)?;
self.rows_drained += to_take;
Ok(Some(next_task))
}
pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
let stream = futures::stream::unfold(self, |mut slf| async move {
let next_task = slf.next_batch_task().await;
let next_task = next_task.transpose().map(|next_task| {
let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
let task = async move {
let next_task = next_task?;
tokio::spawn(async move { next_task.into_batch(emitted_batch_size_warning) })
.await
.map_err(|err| Error::wrapped(err.into()))?
};
(task, num_rows)
});
next_task.map(|(task, num_rows)| {
debug_assert!(num_rows <= u32::MAX as u64);
let next_task = ReadBatchTask {
task: task.boxed(),
num_rows: num_rows as u32,
};
(next_task, slf)
})
});
stream.boxed()
}
}
enum RootDecoderMessage {
LoadedPage(LoadedPageShard),
LegacyPage(crate::previous::decoder::DecoderReady),
}
trait RootDecoderType {
fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()>;
fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()>;
}
impl RootDecoderType for StructuralStructDecoder {
fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
let RootDecoderMessage::LoadedPage(loaded_page) = message else {
unreachable!()
};
self.accept_page(loaded_page)
}
fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
self.drain_batch_task(num_rows)
}
fn wait(&mut self, _: u64, _: &tokio::runtime::Runtime) -> Result<()> {
Ok(())
}
}
impl RootDecoderType for SimpleStructDecoder {
fn accept_message(&mut self, message: RootDecoderMessage) -> Result<()> {
let RootDecoderMessage::LegacyPage(legacy_page) = message else {
unreachable!()
};
self.accept_child(legacy_page)
}
fn drain_batch(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
self.drain(num_rows)
}
fn wait(&mut self, loaded_need: u64, runtime: &tokio::runtime::Runtime) -> Result<()> {
runtime.block_on(self.wait_for_loaded(loaded_need))
}
}
struct BatchDecodeIterator<T: RootDecoderType> {
messages: VecDeque<Result<DecoderMessage>>,
root_decoder: T,
rows_remaining: u64,
rows_per_batch: u32,
rows_scheduled: u64,
rows_drained: u64,
emitted_batch_size_warning: Arc<Once>,
wait_for_io_runtime: tokio::runtime::Runtime,
schema: Arc<ArrowSchema>,
}
impl<T: RootDecoderType> BatchDecodeIterator<T> {
pub fn new(
messages: VecDeque<Result<DecoderMessage>>,
rows_per_batch: u32,
num_rows: u64,
root_decoder: T,
schema: Arc<ArrowSchema>,
) -> Self {
Self {
messages,
root_decoder,
rows_remaining: num_rows,
rows_per_batch,
rows_scheduled: 0,
rows_drained: 0,
wait_for_io_runtime: tokio::runtime::Builder::new_current_thread()
.build()
.unwrap(),
emitted_batch_size_warning: Arc::new(Once::new()),
schema,
}
}
fn wait_for_page(&self, unloaded_page: UnloadedPageShard) -> Result<LoadedPageShard> {
match maybe_done(unloaded_page.0) {
MaybeDone::Done(loaded_page) => loaded_page,
MaybeDone::Future(fut) => self.wait_for_io_runtime.block_on(fut),
MaybeDone::Gone => unreachable!(),
}
}
#[instrument(skip_all)]
fn wait_for_io(&mut self, scheduled_need: u64, to_take: u64) -> Result<u64> {
while self.rows_scheduled < scheduled_need && !self.messages.is_empty() {
let message = self.messages.pop_front().unwrap()?;
self.rows_scheduled = message.scheduled_so_far;
for decoder_message in message.decoders {
match decoder_message {
MessageType::UnloadedPage(unloaded_page) => {
let loaded_page = self.wait_for_page(unloaded_page)?;
self.root_decoder
.accept_message(RootDecoderMessage::LoadedPage(loaded_page))?;
}
MessageType::DecoderReady(decoder_ready) => {
if !decoder_ready.path.is_empty() {
self.root_decoder
.accept_message(RootDecoderMessage::LegacyPage(decoder_ready))?;
}
}
}
}
}
let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
self.root_decoder
.wait(loaded_need, &self.wait_for_io_runtime)?;
Ok(self.rows_scheduled)
}
#[instrument(level = "debug", skip_all)]
fn next_batch_task(&mut self) -> Result<Option<RecordBatch>> {
trace!(
"Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
self.rows_remaining, self.rows_drained, self.rows_scheduled,
);
if self.rows_remaining == 0 {
return Ok(None);
}
let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
self.rows_remaining -= to_take;
let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
trace!(
"scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
scheduled_need, self.rows_drained, to_take, self.rows_scheduled
);
if scheduled_need > 0 {
let desired_scheduled = scheduled_need + self.rows_scheduled;
trace!(
"Draining from scheduler (desire at least {} scheduled rows)",
desired_scheduled
);
let actually_scheduled = self.wait_for_io(desired_scheduled, to_take)?;
if actually_scheduled < desired_scheduled {
let under_scheduled = desired_scheduled - actually_scheduled;
to_take -= under_scheduled;
}
}
if to_take == 0 {
return Ok(None);
}
let next_task = self.root_decoder.drain_batch(to_take)?;
self.rows_drained += to_take;
let batch = next_task.into_batch(self.emitted_batch_size_warning.clone())?;
Ok(Some(batch))
}
}
impl<T: RootDecoderType> Iterator for BatchDecodeIterator<T> {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
self.next_batch_task()
.transpose()
.map(|r| r.map_err(ArrowError::from))
}
}
impl<T: RootDecoderType> RecordBatchReader for BatchDecodeIterator<T> {
fn schema(&self) -> Arc<ArrowSchema> {
self.schema.clone()
}
}
pub struct StructuralBatchDecodeStream {
context: DecoderContext,
root_decoder: StructuralStructDecoder,
rows_remaining: u64,
rows_per_batch: u32,
rows_scheduled: u64,
rows_drained: u64,
scheduler_exhausted: bool,
emitted_batch_size_warning: Arc<Once>,
spawn_batch_decode_tasks: bool,
}
impl StructuralBatchDecodeStream {
pub fn new(
scheduled: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
rows_per_batch: u32,
num_rows: u64,
root_decoder: StructuralStructDecoder,
spawn_batch_decode_tasks: bool,
) -> Self {
Self {
context: DecoderContext::new(scheduled),
root_decoder,
rows_remaining: num_rows,
rows_per_batch,
rows_scheduled: 0,
rows_drained: 0,
scheduler_exhausted: false,
emitted_batch_size_warning: Arc::new(Once::new()),
spawn_batch_decode_tasks,
}
}
#[instrument(level = "debug", skip_all)]
async fn wait_for_scheduled(&mut self, scheduled_need: u64) -> Result<u64> {
if self.scheduler_exhausted {
return Ok(self.rows_scheduled);
}
while self.rows_scheduled < scheduled_need {
let next_message = self.context.source.recv().await;
match next_message {
Some(scan_line) => {
let scan_line = scan_line?;
self.rows_scheduled = scan_line.scheduled_so_far;
for message in scan_line.decoders {
let unloaded_page = message.into_structural();
let loaded_page = unloaded_page.0.await?;
self.root_decoder.accept_page(loaded_page)?;
}
}
None => {
self.scheduler_exhausted = true;
return Ok(self.rows_scheduled);
}
}
}
Ok(scheduled_need)
}
#[instrument(level = "debug", skip_all)]
async fn next_batch_task(&mut self) -> Result<Option<NextDecodeTask>> {
trace!(
"Draining batch task (rows_remaining={} rows_drained={} rows_scheduled={})",
self.rows_remaining, self.rows_drained, self.rows_scheduled,
);
if self.rows_remaining == 0 {
return Ok(None);
}
let mut to_take = self.rows_remaining.min(self.rows_per_batch as u64);
self.rows_remaining -= to_take;
let scheduled_need = (self.rows_drained + to_take).saturating_sub(self.rows_scheduled);
trace!(
"scheduled_need = {} because rows_drained = {} and to_take = {} and rows_scheduled = {}",
scheduled_need, self.rows_drained, to_take, self.rows_scheduled
);
if scheduled_need > 0 {
let desired_scheduled = scheduled_need + self.rows_scheduled;
trace!(
"Draining from scheduler (desire at least {} scheduled rows)",
desired_scheduled
);
let actually_scheduled = self.wait_for_scheduled(desired_scheduled).await?;
if actually_scheduled < desired_scheduled {
let under_scheduled = desired_scheduled - actually_scheduled;
to_take -= under_scheduled;
}
}
if to_take == 0 {
return Ok(None);
}
let next_task = self.root_decoder.drain_batch_task(to_take)?;
self.rows_drained += to_take;
Ok(Some(next_task))
}
pub fn into_stream(self) -> BoxStream<'static, ReadBatchTask> {
let stream = futures::stream::unfold(self, |mut slf| async move {
let next_task = slf.next_batch_task().await;
let next_task = next_task.transpose().map(|next_task| {
let num_rows = next_task.as_ref().map(|t| t.num_rows).unwrap_or(0);
let emitted_batch_size_warning = slf.emitted_batch_size_warning.clone();
let spawn_batch_decode_tasks = slf.spawn_batch_decode_tasks;
let task = async move {
let next_task = next_task?;
if spawn_batch_decode_tasks {
tokio::spawn(
async move { next_task.into_batch(emitted_batch_size_warning) },
)
.await
.map_err(|err| Error::wrapped(err.into()))?
} else {
next_task.into_batch(emitted_batch_size_warning)
}
};
(task, num_rows)
});
next_task.map(|(task, num_rows)| {
debug_assert!(num_rows <= u32::MAX as u64);
let next_task = ReadBatchTask {
task: task.boxed(),
num_rows: num_rows as u32,
};
(next_task, slf)
})
});
stream.boxed()
}
}
#[derive(Debug)]
pub enum RequestedRows {
Ranges(Vec<Range<u64>>),
Indices(Vec<u64>),
}
impl RequestedRows {
pub fn num_rows(&self) -> u64 {
match self {
Self::Ranges(ranges) => ranges.iter().map(|r| r.end - r.start).sum(),
Self::Indices(indices) => indices.len() as u64,
}
}
pub fn trim_empty_ranges(mut self) -> Self {
if let Self::Ranges(ranges) = &mut self {
ranges.retain(|r| !r.is_empty());
}
self
}
}
#[derive(Debug, Clone)]
pub struct DecoderConfig {
pub cache_repetition_index: bool,
pub validate_on_decode: bool,
}
impl Default for DecoderConfig {
fn default() -> Self {
Self {
cache_repetition_index: default_cache_repetition_index(),
validate_on_decode: false,
}
}
}
#[derive(Debug, Clone)]
pub struct SchedulerDecoderConfig {
pub decoder_plugins: Arc<DecoderPlugins>,
pub batch_size: u32,
pub io: Arc<dyn EncodingsIo>,
pub cache: Arc<LanceCache>,
pub decoder_config: DecoderConfig,
}
fn check_scheduler_on_drop(
stream: BoxStream<'static, ReadBatchTask>,
scheduler_handle: tokio::task::JoinHandle<()>,
) -> BoxStream<'static, ReadBatchTask> {
let mut scheduler_handle = Some(scheduler_handle);
let check_scheduler = stream::unfold((), move |_| {
let handle = scheduler_handle.take();
async move {
if let Some(handle) = handle {
handle.await.unwrap();
}
None
}
});
stream.chain(check_scheduler).boxed()
}
pub fn create_decode_stream(
schema: &Schema,
num_rows: u64,
batch_size: u32,
is_structural: bool,
should_validate: bool,
spawn_structural_batch_decode_tasks: bool,
rx: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
) -> Result<BoxStream<'static, ReadBatchTask>> {
if is_structural {
let arrow_schema = ArrowSchema::from(schema);
let structural_decoder = StructuralStructDecoder::new(
arrow_schema.fields,
should_validate,
true,
)?;
Ok(StructuralBatchDecodeStream::new(
rx,
batch_size,
num_rows,
structural_decoder,
spawn_structural_batch_decode_tasks,
)
.into_stream())
} else {
let arrow_schema = ArrowSchema::from(schema);
let root_fields = arrow_schema.fields;
let simple_struct_decoder = SimpleStructDecoder::new(root_fields, num_rows);
Ok(BatchDecodeStream::new(rx, batch_size, num_rows, simple_struct_decoder).into_stream())
}
}
pub fn create_decode_iterator(
schema: &Schema,
num_rows: u64,
batch_size: u32,
should_validate: bool,
is_structural: bool,
messages: VecDeque<Result<DecoderMessage>>,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let arrow_schema = Arc::new(ArrowSchema::from(schema));
let root_fields = arrow_schema.fields.clone();
if is_structural {
let simple_struct_decoder =
StructuralStructDecoder::new(root_fields, should_validate, true)?;
Ok(Box::new(BatchDecodeIterator::new(
messages,
batch_size,
num_rows,
simple_struct_decoder,
arrow_schema,
)))
} else {
let root_decoder = SimpleStructDecoder::new(root_fields, num_rows);
Ok(Box::new(BatchDecodeIterator::new(
messages,
batch_size,
num_rows,
root_decoder,
arrow_schema,
)))
}
}
fn create_scheduler_decoder(
column_infos: Vec<Arc<ColumnInfo>>,
requested_rows: RequestedRows,
filter: FilterExpression,
column_indices: Vec<u32>,
target_schema: Arc<Schema>,
config: SchedulerDecoderConfig,
) -> Result<BoxStream<'static, ReadBatchTask>> {
let num_rows = requested_rows.num_rows();
let is_structural = column_infos[0].is_structural();
let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
let spawn_structural_batch_decode_tasks = match mode.ok().as_deref() {
Some("always") => true,
Some("never") => false,
_ => matches!(requested_rows, RequestedRows::Ranges(_)),
};
let (tx, rx) = mpsc::unbounded_channel();
let decode_stream = create_decode_stream(
&target_schema,
num_rows,
config.batch_size,
is_structural,
config.decoder_config.validate_on_decode,
spawn_structural_batch_decode_tasks,
rx,
)?;
let scheduler_handle = tokio::task::spawn(async move {
let mut decode_scheduler = match DecodeBatchScheduler::try_new(
target_schema.as_ref(),
&column_indices,
&column_infos,
&vec![],
num_rows,
config.decoder_plugins,
config.io.clone(),
config.cache,
&filter,
&config.decoder_config,
)
.await
{
Ok(scheduler) => scheduler,
Err(e) => {
let _ = tx.send(Err(e));
return;
}
};
match requested_rows {
RequestedRows::Ranges(ranges) => {
decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
}
RequestedRows::Indices(indices) => {
decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
}
}
});
Ok(check_scheduler_on_drop(decode_stream, scheduler_handle))
}
pub fn schedule_and_decode(
column_infos: Vec<Arc<ColumnInfo>>,
requested_rows: RequestedRows,
filter: FilterExpression,
column_indices: Vec<u32>,
target_schema: Arc<Schema>,
config: SchedulerDecoderConfig,
) -> BoxStream<'static, ReadBatchTask> {
if requested_rows.num_rows() == 0 {
return stream::empty().boxed();
}
let requested_rows = requested_rows.trim_empty_ranges();
let io = config.io.clone();
match create_scheduler_decoder(
column_infos,
requested_rows,
filter,
column_indices,
target_schema,
config,
) {
Ok(stream) => stream.finally(move || drop(io)).boxed(),
Err(e) => stream::once(std::future::ready(ReadBatchTask {
num_rows: 0,
task: std::future::ready(Err(e)).boxed(),
}))
.boxed(),
}
}
pub static WAITER_RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
});
pub fn schedule_and_decode_blocking(
column_infos: Vec<Arc<ColumnInfo>>,
requested_rows: RequestedRows,
filter: FilterExpression,
column_indices: Vec<u32>,
target_schema: Arc<Schema>,
config: SchedulerDecoderConfig,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
if requested_rows.num_rows() == 0 {
let arrow_schema = Arc::new(ArrowSchema::from(target_schema.as_ref()));
return Ok(Box::new(RecordBatchIterator::new(vec![], arrow_schema)));
}
let num_rows = requested_rows.num_rows();
let is_structural = column_infos[0].is_structural();
let (tx, mut rx) = mpsc::unbounded_channel();
let mut decode_scheduler = WAITER_RT.block_on(DecodeBatchScheduler::try_new(
target_schema.as_ref(),
&column_indices,
&column_infos,
&vec![],
num_rows,
config.decoder_plugins,
config.io.clone(),
config.cache,
&filter,
&config.decoder_config,
))?;
match requested_rows {
RequestedRows::Ranges(ranges) => {
decode_scheduler.schedule_ranges(&ranges, &filter, tx, config.io)
}
RequestedRows::Indices(indices) => {
decode_scheduler.schedule_take(&indices, &filter, tx, config.io)
}
}
let mut messages = Vec::new();
while rx
.recv_many(&mut messages, usize::MAX)
.now_or_never()
.unwrap()
!= 0
{}
let decode_iterator = create_decode_iterator(
&target_schema,
num_rows,
config.batch_size,
config.decoder_config.validate_on_decode,
is_structural,
messages.into(),
)?;
Ok(decode_iterator)
}
pub trait PrimitivePageDecoder: Send + Sync {
fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock>;
}
pub trait PageScheduler: Send + Sync + std::fmt::Debug {
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
scheduler: &Arc<dyn EncodingsIo>,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>;
}
pub trait PriorityRange: std::fmt::Debug + Send + Sync {
fn advance(&mut self, num_rows: u64);
fn current_priority(&self) -> u64;
fn box_clone(&self) -> Box<dyn PriorityRange>;
}
#[derive(Debug)]
pub struct SimplePriorityRange {
priority: u64,
}
impl SimplePriorityRange {
fn new(priority: u64) -> Self {
Self { priority }
}
}
impl PriorityRange for SimplePriorityRange {
fn advance(&mut self, num_rows: u64) {
self.priority += num_rows;
}
fn current_priority(&self) -> u64 {
self.priority
}
fn box_clone(&self) -> Box<dyn PriorityRange> {
Box::new(Self {
priority: self.priority,
})
}
}
pub struct ListPriorityRange {
base: Box<dyn PriorityRange>,
offsets: Arc<[u64]>,
cur_index_into_offsets: usize,
cur_position: u64,
}
impl ListPriorityRange {
pub(crate) fn new(base: Box<dyn PriorityRange>, offsets: Arc<[u64]>) -> Self {
Self {
base,
offsets,
cur_index_into_offsets: 0,
cur_position: 0,
}
}
}
impl std::fmt::Debug for ListPriorityRange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ListPriorityRange")
.field("base", &self.base)
.field("offsets.len()", &self.offsets.len())
.field("cur_index_into_offsets", &self.cur_index_into_offsets)
.field("cur_position", &self.cur_position)
.finish()
}
}
impl PriorityRange for ListPriorityRange {
fn advance(&mut self, num_rows: u64) {
self.cur_position += num_rows;
let mut idx_into_offsets = self.cur_index_into_offsets;
while idx_into_offsets + 1 < self.offsets.len()
&& self.offsets[idx_into_offsets + 1] <= self.cur_position
{
idx_into_offsets += 1;
}
let base_rows_advanced = idx_into_offsets - self.cur_index_into_offsets;
self.cur_index_into_offsets = idx_into_offsets;
self.base.advance(base_rows_advanced as u64);
}
fn current_priority(&self) -> u64 {
self.base.current_priority()
}
fn box_clone(&self) -> Box<dyn PriorityRange> {
Box::new(Self {
base: self.base.box_clone(),
offsets: self.offsets.clone(),
cur_index_into_offsets: self.cur_index_into_offsets,
cur_position: self.cur_position,
})
}
}
pub struct SchedulerContext {
recv: Option<mpsc::UnboundedReceiver<DecoderMessage>>,
io: Arc<dyn EncodingsIo>,
cache: Arc<LanceCache>,
name: String,
path: Vec<u32>,
path_names: Vec<String>,
}
pub struct ScopedSchedulerContext<'a> {
pub context: &'a mut SchedulerContext,
}
impl<'a> ScopedSchedulerContext<'a> {
pub fn pop(self) -> &'a mut SchedulerContext {
self.context.pop();
self.context
}
}
impl SchedulerContext {
pub fn new(io: Arc<dyn EncodingsIo>, cache: Arc<LanceCache>) -> Self {
Self {
io,
cache,
recv: None,
name: "".to_string(),
path: Vec::new(),
path_names: Vec::new(),
}
}
pub fn io(&self) -> &Arc<dyn EncodingsIo> {
&self.io
}
pub fn cache(&self) -> &Arc<LanceCache> {
&self.cache
}
pub fn push(&'_ mut self, name: &str, index: u32) -> ScopedSchedulerContext<'_> {
self.path.push(index);
self.path_names.push(name.to_string());
ScopedSchedulerContext { context: self }
}
pub fn pop(&mut self) {
self.path.pop();
self.path_names.pop();
}
pub fn path_name(&self) -> String {
let path = self.path_names.join("/");
if self.recv.is_some() {
format!("TEMP({}){}", self.name, path)
} else {
format!("ROOT{}", path)
}
}
pub fn current_path(&self) -> VecDeque<u32> {
VecDeque::from_iter(self.path.iter().copied())
}
#[deprecated(since = "0.29.1", note = "This is for legacy 2.0 paths")]
pub fn locate_decoder(
&mut self,
decoder: Box<dyn crate::previous::decoder::LogicalPageDecoder>,
) -> crate::previous::decoder::DecoderReady {
trace!(
"Scheduling decoder of type {:?} for {:?}",
decoder.data_type(),
self.path,
);
crate::previous::decoder::DecoderReady {
decoder,
path: self.current_path(),
}
}
}
pub struct UnloadedPageShard(pub BoxFuture<'static, Result<LoadedPageShard>>);
impl std::fmt::Debug for UnloadedPageShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnloadedPage").finish()
}
}
#[derive(Debug)]
pub struct ScheduledScanLine {
pub rows_scheduled: u64,
pub decoders: Vec<MessageType>,
}
pub trait StructuralSchedulingJob: std::fmt::Debug {
fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>>;
}
pub struct FilterExpression(pub Bytes);
impl FilterExpression {
pub fn no_filter() -> Self {
Self(Bytes::new())
}
pub fn is_noop(&self) -> bool {
self.0.is_empty()
}
}
pub trait StructuralFieldScheduler: Send + std::fmt::Debug {
fn initialize<'a>(
&'a mut self,
filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>>;
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>>;
}
pub trait DecodeArrayTask: Send {
fn decode(self: Box<Self>) -> Result<ArrayRef>;
}
impl DecodeArrayTask for Box<dyn StructuralDecodeArrayTask> {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
StructuralDecodeArrayTask::decode(*self).map(|decoded_array| decoded_array.array)
}
}
pub struct NextDecodeTask {
pub task: Box<dyn DecodeArrayTask>,
pub num_rows: u64,
}
impl NextDecodeTask {
#[instrument(name = "task_to_batch", level = "debug", skip_all)]
fn into_batch(self, emitted_batch_size_warning: Arc<Once>) -> Result<RecordBatch> {
let struct_arr = self.task.decode();
match struct_arr {
Ok(struct_arr) => {
let batch = RecordBatch::from(struct_arr.as_struct());
let size_bytes = batch.get_array_memory_size() as u64;
if size_bytes > BATCH_SIZE_BYTES_WARNING {
emitted_batch_size_warning.call_once(|| {
let size_mb = size_bytes / 1024 / 1024;
debug!("Lance read in a single batch that contained more than {}MiB of data. You may want to consider reducing the batch size.", size_mb);
});
}
Ok(batch)
}
Err(e) => {
let e = Error::internal(format!("Error decoding batch: {}", e));
Err(e)
}
}
}
}
#[derive(Debug)]
pub enum MessageType {
DecoderReady(crate::previous::decoder::DecoderReady),
UnloadedPage(UnloadedPageShard),
}
impl MessageType {
pub fn into_legacy(self) -> crate::previous::decoder::DecoderReady {
match self {
Self::DecoderReady(decoder) => decoder,
Self::UnloadedPage(_) => {
panic!("Expected DecoderReady but got UnloadedPage")
}
}
}
pub fn into_structural(self) -> UnloadedPageShard {
match self {
Self::UnloadedPage(unloaded) => unloaded,
Self::DecoderReady(_) => {
panic!("Expected UnloadedPage but got DecoderReady")
}
}
}
}
pub struct DecoderMessage {
pub scheduled_so_far: u64,
pub decoders: Vec<MessageType>,
}
pub struct DecoderContext {
source: mpsc::UnboundedReceiver<Result<DecoderMessage>>,
}
impl DecoderContext {
pub fn new(source: mpsc::UnboundedReceiver<Result<DecoderMessage>>) -> Self {
Self { source }
}
}
pub struct DecodedPage {
pub data: DataBlock,
pub repdef: RepDefUnraveler,
}
pub trait DecodePageTask: Send + std::fmt::Debug {
fn decode(self: Box<Self>) -> Result<DecodedPage>;
}
pub trait StructuralPageDecoder: std::fmt::Debug + Send {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>>;
fn num_rows(&self) -> u64;
}
#[derive(Debug)]
pub struct LoadedPageShard {
pub decoder: Box<dyn StructuralPageDecoder>,
pub path: VecDeque<u32>,
}
pub struct DecodedArray {
pub array: ArrayRef,
pub repdef: CompositeRepDefUnraveler,
}
pub trait StructuralDecodeArrayTask: std::fmt::Debug + Send {
fn decode(self: Box<Self>) -> Result<DecodedArray>;
}
pub trait StructuralFieldDecoder: std::fmt::Debug + Send {
fn accept_page(&mut self, _child: LoadedPageShard) -> Result<()>;
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>>;
fn data_type(&self) -> &DataType;
}
#[derive(Debug, Default)]
pub struct DecoderPlugins {}
pub async fn decode_batch(
batch: &EncodedBatch,
filter: &FilterExpression,
decoder_plugins: Arc<DecoderPlugins>,
should_validate: bool,
version: LanceFileVersion,
cache: Option<Arc<LanceCache>>,
) -> Result<RecordBatch> {
let io_scheduler = Arc::new(BufferScheduler::new(batch.data.clone())) as Arc<dyn EncodingsIo>;
let cache = if let Some(cache) = cache {
cache
} else {
Arc::new(lance_core::cache::LanceCache::with_capacity(
128 * 1024 * 1024,
))
};
let mut decode_scheduler = DecodeBatchScheduler::try_new(
batch.schema.as_ref(),
&batch.top_level_columns,
&batch.page_table,
&vec![],
batch.num_rows,
decoder_plugins,
io_scheduler.clone(),
cache,
filter,
&DecoderConfig::default(),
)
.await?;
let (tx, rx) = unbounded_channel();
decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
let is_structural = version >= LanceFileVersion::V2_1;
let mode = std::env::var(ENV_LANCE_STRUCTURAL_BATCH_DECODE_SPAWN_MODE);
let spawn_structural_batch_decode_tasks = !matches!(mode.ok().as_deref(), Some("never"));
let mut decode_stream = create_decode_stream(
&batch.schema,
batch.num_rows,
batch.num_rows as u32,
is_structural,
should_validate,
spawn_structural_batch_decode_tasks,
rx,
)?;
decode_stream.next().await.unwrap().task.await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_coalesce_indices_to_ranges_with_single_index() {
let indices = vec![1];
let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
assert_eq!(ranges, vec![1..2]);
}
#[test]
fn test_coalesce_indices_to_ranges() {
let indices = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
assert_eq!(ranges, vec![1..10]);
}
#[test]
fn test_coalesce_indices_to_ranges_with_gaps() {
let indices = vec![1, 2, 3, 5, 6, 7, 9];
let ranges = DecodeBatchScheduler::indices_to_ranges(&indices);
assert_eq!(ranges, vec![1..4, 5..8, 9..10]);
}
}