use std::{
collections::{BTreeMap, BTreeSet},
io::Cursor,
ops::Range,
pin::Pin,
sync::Arc,
};
use arrow_array::RecordBatchReader;
use arrow_schema::Schema as ArrowSchema;
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
use bytes::{Bytes, BytesMut};
use deepsize::{Context, DeepSizeOf};
use futures::{Stream, StreamExt, stream::BoxStream};
use lance_encoding::{
EncodingsIo,
decoder::{
ColumnInfo, DecoderConfig, DecoderPlugins, FilterExpression, PageEncoding, PageInfo,
ReadBatchTask, RequestedRows, SchedulerDecoderConfig, schedule_and_decode,
schedule_and_decode_blocking,
},
encoder::EncodedBatch,
version::LanceFileVersion,
};
use log::debug;
use object_store::path::Path;
use prost::{Message, Name};
use lance_core::{
Error, Result,
cache::LanceCache,
datatypes::{Field, Schema},
};
use lance_encoding::format::pb as pbenc;
use lance_encoding::format::pb21 as pbenc21;
use lance_io::{
ReadBatchParams,
scheduler::FileScheduler,
stream::{RecordBatchStream, RecordBatchStreamAdapter},
};
use crate::{
datatypes::{Fields, FieldsWithMeta},
format::{MAGIC, MAJOR_VERSION, MINOR_VERSION, pb, pbfile},
io::LanceEncodingsIo,
writer::PAGE_BUFFER_ALIGNMENT,
};
pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024;
#[derive(Debug, DeepSizeOf)]
pub struct BufferDescriptor {
pub position: u64,
pub size: u64,
}
#[derive(Debug)]
pub struct FileStatistics {
pub columns: Vec<ColumnStatistics>,
}
#[derive(Debug)]
pub struct ColumnStatistics {
pub num_pages: usize,
pub size_bytes: u64,
}
#[derive(Debug)]
pub struct CachedFileMetadata {
pub file_schema: Arc<Schema>,
pub column_metadatas: Vec<pbfile::ColumnMetadata>,
pub column_infos: Vec<Arc<ColumnInfo>>,
pub num_rows: u64,
pub file_buffers: Vec<BufferDescriptor>,
pub num_data_bytes: u64,
pub num_column_metadata_bytes: u64,
pub num_global_buffer_bytes: u64,
pub num_footer_bytes: u64,
pub major_version: u16,
pub minor_version: u16,
}
impl DeepSizeOf for CachedFileMetadata {
fn deep_size_of_children(&self, context: &mut Context) -> usize {
self.file_schema.deep_size_of_children(context)
+ self
.file_buffers
.iter()
.map(|file_buffer| file_buffer.deep_size_of_children(context))
.sum::<usize>()
}
}
impl CachedFileMetadata {
pub fn version(&self) -> LanceFileVersion {
match (self.major_version, self.minor_version) {
(0, 3) => LanceFileVersion::V2_0,
(2, 0) => LanceFileVersion::V2_0,
(2, 1) => LanceFileVersion::V2_1,
(2, 2) => LanceFileVersion::V2_2,
(2, 3) => LanceFileVersion::V2_3,
_ => panic!(
"Unsupported version: {}.{}",
self.major_version, self.minor_version
),
}
}
}
#[derive(Debug, Clone)]
pub struct ReaderProjection {
pub schema: Arc<Schema>,
pub column_indices: Vec<u32>,
}
impl ReaderProjection {
fn from_field_ids_helper<'a>(
file_version: LanceFileVersion,
fields: impl Iterator<Item = &'a Field>,
field_id_to_column_index: &BTreeMap<u32, u32>,
column_indices: &mut Vec<u32>,
) -> Result<()> {
for field in fields {
let is_structural = file_version >= LanceFileVersion::V2_1;
if (!is_structural
|| field.children.is_empty()
|| field.is_blob()
|| field.is_packed_struct())
&& let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied()
{
column_indices.push(column_idx);
}
if !is_structural || (!field.is_blob() && !field.is_packed_struct()) {
Self::from_field_ids_helper(
file_version,
field.children.iter(),
field_id_to_column_index,
column_indices,
)?;
}
}
Ok(())
}
pub fn from_field_ids(
file_version: LanceFileVersion,
schema: &Schema,
field_id_to_column_index: &BTreeMap<u32, u32>,
) -> Result<Self> {
let mut column_indices = Vec::new();
Self::from_field_ids_helper(
file_version,
schema.fields.iter(),
field_id_to_column_index,
&mut column_indices,
)?;
let projection = Self {
schema: Arc::new(schema.clone()),
column_indices,
};
Ok(projection)
}
pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self {
let schema = Arc::new(schema.clone());
let is_structural = version >= LanceFileVersion::V2_1;
let mut column_indices = vec![];
let mut curr_column_idx = 0;
let mut packed_struct_fields_num = 0;
for field in schema.fields_pre_order() {
if packed_struct_fields_num > 0 {
packed_struct_fields_num -= 1;
continue;
}
if field.is_packed_struct() {
column_indices.push(curr_column_idx);
curr_column_idx += 1;
packed_struct_fields_num = field.children.len();
} else if field.children.is_empty() || !is_structural {
column_indices.push(curr_column_idx);
curr_column_idx += 1;
}
}
Self {
schema,
column_indices,
}
}
pub fn from_column_names(
file_version: LanceFileVersion,
schema: &Schema,
column_names: &[&str],
) -> Result<Self> {
let field_id_to_column_index = schema
.fields_pre_order()
.filter(|field| {
file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct()
})
.enumerate()
.map(|(idx, field)| (field.id as u32, idx as u32))
.collect::<BTreeMap<_, _>>();
let projected = schema.project(column_names)?;
let mut column_indices = Vec::new();
Self::from_field_ids_helper(
file_version,
projected.fields.iter(),
&field_id_to_column_index,
&mut column_indices,
)?;
Ok(Self {
schema: Arc::new(projected),
column_indices,
})
}
}
#[derive(Clone, Debug)]
pub struct FileReaderOptions {
pub decoder_config: DecoderConfig,
pub read_chunk_size: u64,
}
impl Default for FileReaderOptions {
fn default() -> Self {
Self {
decoder_config: DecoderConfig::default(),
read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
}
}
}
#[derive(Debug)]
pub struct FileReader {
scheduler: Arc<dyn EncodingsIo>,
base_projection: ReaderProjection,
num_rows: u64,
metadata: Arc<CachedFileMetadata>,
decoder_plugins: Arc<DecoderPlugins>,
cache: Arc<LanceCache>,
options: FileReaderOptions,
}
#[derive(Debug)]
struct Footer {
#[allow(dead_code)]
column_meta_start: u64,
#[allow(dead_code)]
column_meta_offsets_start: u64,
global_buff_offsets_start: u64,
num_global_buffers: u32,
num_columns: u32,
major_version: u16,
minor_version: u16,
}
const FOOTER_LEN: usize = 40;
impl FileReader {
pub fn with_scheduler(&self, scheduler: Arc<dyn EncodingsIo>) -> Self {
Self {
scheduler,
base_projection: self.base_projection.clone(),
cache: self.cache.clone(),
decoder_plugins: self.decoder_plugins.clone(),
metadata: self.metadata.clone(),
options: self.options.clone(),
num_rows: self.num_rows,
}
}
pub fn num_rows(&self) -> u64 {
self.num_rows
}
pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
&self.metadata
}
pub fn file_statistics(&self) -> FileStatistics {
let column_metadatas = &self.metadata().column_metadatas;
let column_stats = column_metadatas
.iter()
.map(|col_metadata| {
let num_pages = col_metadata.pages.len();
let size_bytes = col_metadata
.pages
.iter()
.map(|page| page.buffer_sizes.iter().sum::<u64>())
.sum::<u64>();
ColumnStatistics {
num_pages,
size_bytes,
}
})
.collect();
FileStatistics {
columns: column_stats,
}
}
pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?;
self.scheduler
.submit_single(
buffer_desc.position..buffer_desc.position + buffer_desc.size,
0,
)
.await
}
async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
let file_size = scheduler.reader().size().await? as u64;
let begin = if file_size < scheduler.reader().block_size() as u64 {
0
} else {
file_size - scheduler.reader().block_size() as u64
};
let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
Ok((tail_bytes, file_size))
}
fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
let len = footer_bytes.len();
if len < FOOTER_LEN {
return Err(Error::invalid_input(format!(
"does not have sufficient data, len: {}, bytes: {:?}",
len, footer_bytes
)));
}
let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
let column_meta_start = cursor.read_u64::<LittleEndian>()?;
let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
let num_columns = cursor.read_u32::<LittleEndian>()?;
let major_version = cursor.read_u16::<LittleEndian>()?;
let minor_version = cursor.read_u16::<LittleEndian>()?;
if major_version == MAJOR_VERSION as u16 && minor_version == MINOR_VERSION as u16 {
return Err(Error::version_conflict(
"Attempt to use the lance v2 reader to read a legacy file".to_string(),
major_version,
minor_version,
));
}
let magic_bytes = footer_bytes.slice(len - 4..);
if magic_bytes.as_ref() != MAGIC {
return Err(Error::invalid_input(format!(
"file does not appear to be a Lance file (invalid magic: {:?})",
MAGIC
)));
}
Ok(Footer {
column_meta_start,
column_meta_offsets_start,
global_buff_offsets_start,
num_global_buffers,
num_columns,
major_version,
minor_version,
})
}
fn read_all_column_metadata(
column_metadata_bytes: Bytes,
footer: &Footer,
) -> Result<Vec<pbfile::ColumnMetadata>> {
let column_metadata_start = footer.column_meta_start;
let cmo_table_size = 16 * footer.num_columns as usize;
let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
(0..footer.num_columns)
.map(|col_idx| {
let offset = (col_idx * 16) as usize;
let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
let normalized_position = (position - column_metadata_start) as usize;
let normalized_end = normalized_position + (length as usize);
Ok(pbfile::ColumnMetadata::decode(
&column_metadata_bytes[normalized_position..normalized_end],
)?)
})
.collect::<Result<Vec<_>>>()
}
async fn optimistic_tail_read(
data: &Bytes,
start_pos: u64,
scheduler: &FileScheduler,
file_len: u64,
) -> Result<Bytes> {
let num_bytes_needed = (file_len - start_pos) as usize;
if data.len() >= num_bytes_needed {
Ok(data.slice((data.len() - num_bytes_needed)..))
} else {
let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
let start = file_len - num_bytes_needed as u64;
let missing_bytes = scheduler
.submit_single(start..start + num_bytes_missing, 0)
.await?;
let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
combined.extend(missing_bytes);
combined.extend(data);
Ok(combined.freeze())
}
}
fn do_decode_gbo_table(
gbo_bytes: &Bytes,
footer: &Footer,
version: LanceFileVersion,
) -> Result<Vec<BufferDescriptor>> {
let mut global_bufs_cursor = Cursor::new(gbo_bytes);
let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
for _ in 0..footer.num_global_buffers {
let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
assert!(
version < LanceFileVersion::V2_1 || buf_pos % PAGE_BUFFER_ALIGNMENT as u64 == 0
);
let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
global_buffers.push(BufferDescriptor {
position: buf_pos,
size: buf_size,
});
}
Ok(global_buffers)
}
async fn decode_gbo_table(
tail_bytes: &Bytes,
file_len: u64,
scheduler: &FileScheduler,
footer: &Footer,
version: LanceFileVersion,
) -> Result<Vec<BufferDescriptor>> {
let gbo_bytes = Self::optimistic_tail_read(
tail_bytes,
footer.global_buff_offsets_start,
scheduler,
file_len,
)
.await?;
Self::do_decode_gbo_table(&gbo_bytes, footer, version)
}
fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
let pb_schema = file_descriptor.schema.unwrap();
let num_rows = file_descriptor.length;
let fields_with_meta = FieldsWithMeta {
fields: Fields(pb_schema.fields),
metadata: pb_schema.metadata,
};
let schema = lance_core::datatypes::Schema::from(fields_with_meta);
Ok((num_rows, schema))
}
pub async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
let footer = Self::decode_footer(&tail_bytes)?;
let file_version = LanceFileVersion::try_from_major_minor(
footer.major_version as u32,
footer.minor_version as u32,
)?;
let gbo_table =
Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer, file_version).await?;
if gbo_table.is_empty() {
return Err(Error::internal(
"File did not contain any global buffers, schema expected".to_string(),
));
}
let schema_start = gbo_table[0].position;
let schema_size = gbo_table[0].size;
let num_footer_bytes = file_len - schema_start;
let all_metadata_bytes =
Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
let column_metadata_bytes =
all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>();
let num_data_bytes = footer.column_meta_start - num_global_buffer_bytes;
let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);
Ok(CachedFileMetadata {
file_schema: Arc::new(schema),
column_metadatas,
column_infos,
num_rows,
num_data_bytes,
num_column_metadata_bytes,
num_global_buffer_bytes,
num_footer_bytes,
file_buffers: gbo_table,
major_version: footer.major_version,
minor_version: footer.minor_version,
})
}
fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
match &encoding.location {
Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
Some(pbfile::encoding::Location::Direct(encoding)) => {
let encoding_buf = Bytes::from(encoding.encoding.clone());
let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
encoding_any.to_msg::<M>().unwrap()
}
Some(pbfile::encoding::Location::None(_)) => panic!(),
None => panic!(),
}
}
fn meta_to_col_infos(
column_metadatas: &[pbfile::ColumnMetadata],
file_version: LanceFileVersion,
) -> Vec<Arc<ColumnInfo>> {
column_metadatas
.iter()
.enumerate()
.map(|(col_idx, col_meta)| {
let page_infos = col_meta
.pages
.iter()
.map(|page| {
let num_rows = page.length;
let encoding = match file_version {
LanceFileVersion::V2_0 => {
PageEncoding::Legacy(Self::fetch_encoding::<pbenc::ArrayEncoding>(
page.encoding.as_ref().unwrap(),
))
}
_ => PageEncoding::Structural(Self::fetch_encoding::<
pbenc21::PageLayout,
>(
page.encoding.as_ref().unwrap()
)),
};
let buffer_offsets_and_sizes = Arc::from(
page.buffer_offsets
.iter()
.zip(page.buffer_sizes.iter())
.map(|(offset, size)| {
assert!(
file_version < LanceFileVersion::V2_1
|| offset % PAGE_BUFFER_ALIGNMENT as u64 == 0
);
(*offset, *size)
})
.collect::<Vec<_>>(),
);
PageInfo {
buffer_offsets_and_sizes,
encoding,
num_rows,
priority: page.priority,
}
})
.collect::<Vec<_>>();
let buffer_offsets_and_sizes = Arc::from(
col_meta
.buffer_offsets
.iter()
.zip(col_meta.buffer_sizes.iter())
.map(|(offset, size)| (*offset, *size))
.collect::<Vec<_>>(),
);
Arc::new(ColumnInfo {
index: col_idx as u32,
page_infos: Arc::from(page_infos),
buffer_offsets_and_sizes,
encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
})
})
.collect::<Vec<_>>()
}
fn validate_projection(
projection: &ReaderProjection,
metadata: &CachedFileMetadata,
) -> Result<()> {
if projection.schema.fields.is_empty() {
return Err(Error::invalid_input(
"Attempt to read zero columns from the file, at least one column must be specified"
.to_string(),
));
}
let mut column_indices_seen = BTreeSet::new();
for column_index in &projection.column_indices {
if !column_indices_seen.insert(*column_index) {
return Err(Error::invalid_input(format!(
"The projection specified the column index {} more than once",
column_index
)));
}
if *column_index >= metadata.column_infos.len() as u32 {
return Err(Error::invalid_input(format!(
"The projection specified the column index {} but there are only {} columns in the file",
column_index,
metadata.column_infos.len()
)));
}
}
Ok(())
}
pub async fn try_open(
scheduler: FileScheduler,
base_projection: Option<ReaderProjection>,
decoder_plugins: Arc<DecoderPlugins>,
cache: &LanceCache,
options: FileReaderOptions,
) -> Result<Self> {
let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
let path = scheduler.reader().path().clone();
let encodings_io =
LanceEncodingsIo::new(scheduler).with_read_chunk_size(options.read_chunk_size);
Self::try_open_with_file_metadata(
Arc::new(encodings_io),
path,
base_projection,
decoder_plugins,
file_metadata,
cache,
options,
)
.await
}
pub async fn try_open_with_file_metadata(
scheduler: Arc<dyn EncodingsIo>,
path: Path,
base_projection: Option<ReaderProjection>,
decoder_plugins: Arc<DecoderPlugins>,
file_metadata: Arc<CachedFileMetadata>,
cache: &LanceCache,
options: FileReaderOptions,
) -> Result<Self> {
let cache = Arc::new(cache.with_key_prefix(path.as_ref()));
if let Some(base_projection) = base_projection.as_ref() {
Self::validate_projection(base_projection, &file_metadata)?;
}
let num_rows = file_metadata.num_rows;
Ok(Self {
scheduler,
base_projection: base_projection.unwrap_or(ReaderProjection::from_whole_schema(
file_metadata.file_schema.as_ref(),
file_metadata.version(),
)),
num_rows,
metadata: file_metadata,
decoder_plugins,
cache,
options,
})
}
fn collect_columns_from_projection(
&self,
_projection: &ReaderProjection,
) -> Result<Vec<Arc<ColumnInfo>>> {
Ok(self.metadata.column_infos.clone())
}
#[allow(clippy::too_many_arguments)]
fn do_read_range(
column_infos: Vec<Arc<ColumnInfo>>,
io: Arc<dyn EncodingsIo>,
cache: Arc<LanceCache>,
num_rows: u64,
decoder_plugins: Arc<DecoderPlugins>,
range: Range<u64>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
decoder_config: DecoderConfig,
) -> Result<BoxStream<'static, ReadBatchTask>> {
debug!(
"Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
range,
batch_size,
num_rows,
column_infos.len(),
projection.schema.fields.len(),
);
let config = SchedulerDecoderConfig {
batch_size,
cache,
decoder_plugins,
io,
decoder_config,
};
let requested_rows = RequestedRows::Ranges(vec![range]);
Ok(schedule_and_decode(
column_infos,
requested_rows,
filter,
projection.column_indices,
projection.schema,
config,
))
}
fn read_range(
&self,
range: Range<u64>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
) -> Result<BoxStream<'static, ReadBatchTask>> {
Self::do_read_range(
self.collect_columns_from_projection(&projection)?,
self.scheduler.clone(),
self.cache.clone(),
self.num_rows,
self.decoder_plugins.clone(),
range,
batch_size,
projection,
filter,
self.options.decoder_config.clone(),
)
}
#[allow(clippy::too_many_arguments)]
fn do_take_rows(
column_infos: Vec<Arc<ColumnInfo>>,
io: Arc<dyn EncodingsIo>,
cache: Arc<LanceCache>,
decoder_plugins: Arc<DecoderPlugins>,
indices: Vec<u64>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
decoder_config: DecoderConfig,
) -> Result<BoxStream<'static, ReadBatchTask>> {
debug!(
"Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
indices.len(),
indices[0],
indices[indices.len() - 1],
batch_size,
column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
);
let config = SchedulerDecoderConfig {
batch_size,
cache,
decoder_plugins,
io,
decoder_config,
};
let requested_rows = RequestedRows::Indices(indices);
Ok(schedule_and_decode(
column_infos,
requested_rows,
filter,
projection.column_indices,
projection.schema,
config,
))
}
fn take_rows(
&self,
indices: Vec<u64>,
batch_size: u32,
projection: ReaderProjection,
) -> Result<BoxStream<'static, ReadBatchTask>> {
Self::do_take_rows(
self.collect_columns_from_projection(&projection)?,
self.scheduler.clone(),
self.cache.clone(),
self.decoder_plugins.clone(),
indices,
batch_size,
projection,
FilterExpression::no_filter(),
self.options.decoder_config.clone(),
)
}
#[allow(clippy::too_many_arguments)]
fn do_read_ranges(
column_infos: Vec<Arc<ColumnInfo>>,
io: Arc<dyn EncodingsIo>,
cache: Arc<LanceCache>,
decoder_plugins: Arc<DecoderPlugins>,
ranges: Vec<Range<u64>>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
decoder_config: DecoderConfig,
) -> Result<BoxStream<'static, ReadBatchTask>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
debug!(
"Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
ranges.len(),
num_rows,
ranges[0].start,
ranges[ranges.len() - 1].end,
batch_size,
column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
);
let config = SchedulerDecoderConfig {
batch_size,
cache,
decoder_plugins,
io,
decoder_config,
};
let requested_rows = RequestedRows::Ranges(ranges);
Ok(schedule_and_decode(
column_infos,
requested_rows,
filter,
projection.column_indices,
projection.schema,
config,
))
}
fn read_ranges(
&self,
ranges: Vec<Range<u64>>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
) -> Result<BoxStream<'static, ReadBatchTask>> {
Self::do_read_ranges(
self.collect_columns_from_projection(&projection)?,
self.scheduler.clone(),
self.cache.clone(),
self.decoder_plugins.clone(),
ranges,
batch_size,
projection,
filter,
self.options.decoder_config.clone(),
)
}
pub fn read_tasks(
&self,
params: ReadBatchParams,
batch_size: u32,
projection: Option<ReaderProjection>,
filter: FilterExpression,
) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
let projection = projection.unwrap_or_else(|| self.base_projection.clone());
Self::validate_projection(&projection, &self.metadata)?;
let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
if bound > self.num_rows || bound == self.num_rows && inclusive {
Err(Error::invalid_input(format!(
"cannot read {:?} from file with {} rows",
params, self.num_rows
)))
} else {
Ok(())
}
};
match ¶ms {
ReadBatchParams::Indices(indices) => {
for idx in indices {
match idx {
None => {
return Err(Error::invalid_input("Null value in indices array"));
}
Some(idx) => {
verify_bound(¶ms, idx as u64, true)?;
}
}
}
let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
self.take_rows(indices, batch_size, projection)
}
ReadBatchParams::Range(range) => {
verify_bound(¶ms, range.end as u64, false)?;
self.read_range(
range.start as u64..range.end as u64,
batch_size,
projection,
filter,
)
}
ReadBatchParams::Ranges(ranges) => {
let mut ranges_u64 = Vec::with_capacity(ranges.len());
for range in ranges.as_ref() {
verify_bound(¶ms, range.end, false)?;
ranges_u64.push(range.start..range.end);
}
self.read_ranges(ranges_u64, batch_size, projection, filter)
}
ReadBatchParams::RangeFrom(range) => {
verify_bound(¶ms, range.start as u64, true)?;
self.read_range(
range.start as u64..self.num_rows,
batch_size,
projection,
filter,
)
}
ReadBatchParams::RangeTo(range) => {
verify_bound(¶ms, range.end as u64, false)?;
self.read_range(0..range.end as u64, batch_size, projection, filter)
}
ReadBatchParams::RangeFull => {
self.read_range(0..self.num_rows, batch_size, projection, filter)
}
}
}
pub fn read_stream_projected(
&self,
params: ReadBatchParams,
batch_size: u32,
batch_readahead: u32,
projection: ReaderProjection,
filter: FilterExpression,
) -> Result<Pin<Box<dyn RecordBatchStream>>> {
let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
let tasks_stream = self.read_tasks(params, batch_size, Some(projection), filter)?;
let batch_stream = tasks_stream
.map(|task| task.task)
.buffered(batch_readahead as usize)
.boxed();
Ok(Box::pin(RecordBatchStreamAdapter::new(
arrow_schema,
batch_stream,
)))
}
fn take_rows_blocking(
&self,
indices: Vec<u64>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let column_infos = self.collect_columns_from_projection(&projection)?;
debug!(
"Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
indices.len(),
indices[0],
indices[indices.len() - 1],
batch_size,
column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
);
let config = SchedulerDecoderConfig {
batch_size,
cache: self.cache.clone(),
decoder_plugins: self.decoder_plugins.clone(),
io: self.scheduler.clone(),
decoder_config: self.options.decoder_config.clone(),
};
let requested_rows = RequestedRows::Indices(indices);
schedule_and_decode_blocking(
column_infos,
requested_rows,
filter,
projection.column_indices,
projection.schema,
config,
)
}
fn read_ranges_blocking(
&self,
ranges: Vec<Range<u64>>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let column_infos = self.collect_columns_from_projection(&projection)?;
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
debug!(
"Taking {} ranges ({} rows) spread across range {}..{} with batch_size {} from columns {:?}",
ranges.len(),
num_rows,
ranges[0].start,
ranges[ranges.len() - 1].end,
batch_size,
column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
);
let config = SchedulerDecoderConfig {
batch_size,
cache: self.cache.clone(),
decoder_plugins: self.decoder_plugins.clone(),
io: self.scheduler.clone(),
decoder_config: self.options.decoder_config.clone(),
};
let requested_rows = RequestedRows::Ranges(ranges);
schedule_and_decode_blocking(
column_infos,
requested_rows,
filter,
projection.column_indices,
projection.schema,
config,
)
}
fn read_range_blocking(
&self,
range: Range<u64>,
batch_size: u32,
projection: ReaderProjection,
filter: FilterExpression,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let column_infos = self.collect_columns_from_projection(&projection)?;
let num_rows = self.num_rows;
debug!(
"Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
range,
batch_size,
num_rows,
column_infos.len(),
projection.schema.fields.len(),
);
let config = SchedulerDecoderConfig {
batch_size,
cache: self.cache.clone(),
decoder_plugins: self.decoder_plugins.clone(),
io: self.scheduler.clone(),
decoder_config: self.options.decoder_config.clone(),
};
let requested_rows = RequestedRows::Ranges(vec![range]);
schedule_and_decode_blocking(
column_infos,
requested_rows,
filter,
projection.column_indices,
projection.schema,
config,
)
}
pub fn read_stream_projected_blocking(
&self,
params: ReadBatchParams,
batch_size: u32,
projection: Option<ReaderProjection>,
filter: FilterExpression,
) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let projection = projection.unwrap_or_else(|| self.base_projection.clone());
Self::validate_projection(&projection, &self.metadata)?;
let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
if bound > self.num_rows || bound == self.num_rows && inclusive {
Err(Error::invalid_input(format!(
"cannot read {:?} from file with {} rows",
params, self.num_rows
)))
} else {
Ok(())
}
};
match ¶ms {
ReadBatchParams::Indices(indices) => {
for idx in indices {
match idx {
None => {
return Err(Error::invalid_input("Null value in indices array"));
}
Some(idx) => {
verify_bound(¶ms, idx as u64, true)?;
}
}
}
let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
self.take_rows_blocking(indices, batch_size, projection, filter)
}
ReadBatchParams::Range(range) => {
verify_bound(¶ms, range.end as u64, false)?;
self.read_range_blocking(
range.start as u64..range.end as u64,
batch_size,
projection,
filter,
)
}
ReadBatchParams::Ranges(ranges) => {
let mut ranges_u64 = Vec::with_capacity(ranges.len());
for range in ranges.as_ref() {
verify_bound(¶ms, range.end, false)?;
ranges_u64.push(range.start..range.end);
}
self.read_ranges_blocking(ranges_u64, batch_size, projection, filter)
}
ReadBatchParams::RangeFrom(range) => {
verify_bound(¶ms, range.start as u64, true)?;
self.read_range_blocking(
range.start as u64..self.num_rows,
batch_size,
projection,
filter,
)
}
ReadBatchParams::RangeTo(range) => {
verify_bound(¶ms, range.end as u64, false)?;
self.read_range_blocking(0..range.end as u64, batch_size, projection, filter)
}
ReadBatchParams::RangeFull => {
self.read_range_blocking(0..self.num_rows, batch_size, projection, filter)
}
}
}
pub fn read_stream(
&self,
params: ReadBatchParams,
batch_size: u32,
batch_readahead: u32,
filter: FilterExpression,
) -> Result<Pin<Box<dyn RecordBatchStream>>> {
self.read_stream_projected(
params,
batch_size,
batch_readahead,
self.base_projection.clone(),
filter,
)
}
pub fn schema(&self) -> &Arc<Schema> {
&self.metadata.file_schema
}
}
pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
if let Some(encoding) = &page.encoding {
if let Some(style) = &encoding.location {
match style {
pbfile::encoding::Location::Indirect(indirect) => {
format!(
"IndirectEncoding(pos={},size={})",
indirect.buffer_location, indirect.buffer_length
)
}
pbfile::encoding::Location::Direct(direct) => {
let encoding_any =
prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
.expect("failed to deserialize encoding as protobuf");
if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
match encoding {
Ok(encoding) => {
format!("{:#?}", encoding)
}
Err(err) => {
format!("Unsupported(decode_err={})", err)
}
}
} else if encoding_any.type_url == "/lance.encodings21.PageLayout" {
let encoding = encoding_any.to_msg::<pbenc21::PageLayout>();
match encoding {
Ok(encoding) => {
format!("{:#?}", encoding)
}
Err(err) => {
format!("Unsupported(decode_err={})", err)
}
}
} else {
format!("Unrecognized(type_url={})", encoding_any.type_url)
}
}
pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
}
} else {
"MISSING STYLE".to_string()
}
} else {
"MISSING".to_string()
}
}
pub trait EncodedBatchReaderExt {
fn try_from_mini_lance(
bytes: Bytes,
schema: &Schema,
version: LanceFileVersion,
) -> Result<Self>
where
Self: Sized;
fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
where
Self: Sized;
}
impl EncodedBatchReaderExt for EncodedBatch {
fn try_from_mini_lance(
bytes: Bytes,
schema: &Schema,
file_version: LanceFileVersion,
) -> Result<Self>
where
Self: Sized,
{
let projection = ReaderProjection::from_whole_schema(schema, file_version);
let footer = FileReader::decode_footer(&bytes)?;
let column_metadata_start = footer.column_meta_start as usize;
let column_metadata_end = footer.global_buff_offsets_start as usize;
let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas =
FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
let file_version = LanceFileVersion::try_from_major_minor(
footer.major_version as u32,
footer.minor_version as u32,
)?;
let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
Ok(Self {
data: bytes,
num_rows: page_table
.first()
.map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
.unwrap_or(0),
page_table,
top_level_columns: projection.column_indices,
schema: Arc::new(schema.clone()),
})
}
fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
where
Self: Sized,
{
let footer = FileReader::decode_footer(&bytes)?;
let file_version = LanceFileVersion::try_from_major_minor(
footer.major_version as u32,
footer.minor_version as u32,
)?;
let gbo_table = FileReader::do_decode_gbo_table(
&bytes.slice(footer.global_buff_offsets_start as usize..),
&footer,
file_version,
)?;
if gbo_table.is_empty() {
return Err(Error::internal(
"File did not contain any global buffers, schema expected".to_string(),
));
}
let schema_start = gbo_table[0].position as usize;
let schema_size = gbo_table[0].size as usize;
let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
let (_, schema) = FileReader::decode_schema(schema_bytes)?;
let projection = ReaderProjection::from_whole_schema(&schema, file_version);
let column_metadata_start = footer.column_meta_start as usize;
let column_metadata_end = footer.global_buff_offsets_start as usize;
let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas =
FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
let page_table = FileReader::meta_to_col_infos(&column_metadatas, file_version);
Ok(Self {
data: bytes,
num_rows: page_table
.first()
.map(|col| col.page_infos.iter().map(|page| page.num_rows).sum::<u64>())
.unwrap_or(0),
page_table,
top_level_columns: projection.column_indices,
schema: Arc::new(schema),
})
}
}
#[cfg(test)]
pub mod tests {
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
use arrow_array::{
RecordBatch, UInt32Array,
types::{Float64Type, Int32Type},
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
use bytes::Bytes;
use futures::{StreamExt, prelude::stream::TryStreamExt};
use lance_arrow::RecordBatchExt;
use lance_core::{ArrowResult, datatypes::Schema};
use lance_datagen::{BatchCount, ByteCount, RowCount, array, gen_batch};
use lance_encoding::{
decoder::{DecodeBatchScheduler, DecoderPlugins, FilterExpression, decode_batch},
encoder::{EncodedBatch, EncodingOptions, default_encoding_strategy, encode_batch},
version::LanceFileVersion,
};
use lance_io::{stream::RecordBatchStream, utils::CachedFileSize};
use log::debug;
use rstest::rstest;
use tokio::sync::mpsc;
use crate::reader::{EncodedBatchReaderExt, FileReader, FileReaderOptions, ReaderProjection};
use crate::testing::{FsFixture, WrittenFile, test_cache, write_lance_file};
use crate::writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions};
use lance_encoding::decoder::DecoderConfig;
async fn create_some_file(fs: &FsFixture, version: LanceFileVersion) -> WrittenFile {
let location_type = DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Float64, true),
Field::new("y", DataType::Float64, true),
]));
let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
let mut reader = gen_batch()
.col("score", array::rand::<Float64Type>())
.col("location", array::rand_type(&location_type))
.col("categories", array::rand_type(&categories_type))
.col("binary", array::rand_type(&DataType::Binary));
if version <= LanceFileVersion::V2_0 {
reader = reader.col("large_bin", array::rand_type(&DataType::LargeBinary));
}
let reader = reader.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
write_lance_file(
reader,
fs,
FileWriterOptions {
format_version: Some(version),
..Default::default()
},
)
.await
}
type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
async fn verify_expected(
expected: &[RecordBatch],
mut actual: Pin<Box<dyn RecordBatchStream>>,
read_size: u32,
transform: Option<Transformer>,
) {
let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
let mut expected_iter = expected.iter().map(|batch| {
if let Some(transform) = &transform {
transform(batch)
} else {
batch.clone()
}
});
let mut next_expected = expected_iter.next().unwrap().clone();
while let Some(actual) = actual.next().await {
let mut actual = actual.unwrap();
let mut rows_to_verify = actual.num_rows() as u32;
let expected_length = remaining.min(read_size);
assert_eq!(expected_length, rows_to_verify);
while rows_to_verify > 0 {
let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
assert_eq!(
next_expected.slice(0, next_slice_len as usize),
actual.slice(0, next_slice_len as usize)
);
remaining -= next_slice_len;
rows_to_verify -= next_slice_len;
if remaining > 0 {
if next_slice_len == next_expected.num_rows() as u32 {
next_expected = expected_iter.next().unwrap().clone();
} else {
next_expected = next_expected.slice(
next_slice_len as usize,
next_expected.num_rows() - next_slice_len as usize,
);
}
}
if rows_to_verify > 0 {
actual = actual.slice(
next_slice_len as usize,
actual.num_rows() - next_slice_len as usize,
);
}
}
}
assert_eq!(remaining, 0);
}
#[tokio::test]
async fn test_round_trip() {
let fs = FsFixture::default();
let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
for read_size in [32, 1024, 1024 * 1024] {
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler,
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let schema = file_reader.schema();
assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
let batch_stream = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
read_size,
16,
FilterExpression::no_filter(),
)
.unwrap();
verify_expected(&data, batch_stream, read_size, None).await;
}
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_encoded_batch_round_trip(
#[values(LanceFileVersion::V2_0)] version: LanceFileVersion,
) {
let data = gen_batch()
.col("x", array::rand::<Int32Type>())
.col("y", array::rand_utf8(ByteCount::from(16), false))
.into_batch_rows(RowCount::from(10000))
.unwrap();
let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
let encoding_options = EncodingOptions {
cache_bytes_per_column: 4096,
max_page_bytes: 32 * 1024 * 1024,
keep_original_array: true,
buffer_alignment: 64,
version,
};
let encoding_strategy = default_encoding_strategy(version);
let encoded_batch = encode_batch(
&data,
lance_schema.clone(),
encoding_strategy.as_ref(),
&encoding_options,
)
.await
.unwrap();
let bytes = encoded_batch.try_to_self_described_lance(version).unwrap();
let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
let decoded = decode_batch(
&decoded_batch,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
version,
None,
)
.await
.unwrap();
assert_eq!(data, decoded);
let bytes = encoded_batch.try_to_mini_lance(version).unwrap();
let decoded_batch =
EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref(), LanceFileVersion::V2_0)
.unwrap();
let decoded = decode_batch(
&decoded_batch,
&FilterExpression::no_filter(),
Arc::<DecoderPlugins>::default(),
false,
version,
None,
)
.await
.unwrap();
assert_eq!(data, decoded);
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_projection(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
version: LanceFileVersion,
) {
let fs = FsFixture::default();
let written_file = create_some_file(&fs, version).await;
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let field_id_mapping = written_file
.field_id_mapping
.iter()
.copied()
.collect::<BTreeMap<_, _>>();
let empty_projection = ReaderProjection {
column_indices: Vec::default(),
schema: Arc::new(Schema::default()),
};
for columns in [
vec!["score"],
vec!["location"],
vec!["categories"],
vec!["score.x"],
vec!["score", "categories"],
vec!["score", "location"],
vec!["location", "categories"],
vec!["score.y", "location", "categories"],
] {
debug!("Testing round trip with projection {:?}", columns);
for use_field_ids in [true, false] {
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let projected_schema = written_file.schema.project(&columns).unwrap();
let projection = if use_field_ids {
ReaderProjection::from_field_ids(
file_reader.metadata.version(),
&projected_schema,
&field_id_mapping,
)
.unwrap()
} else {
ReaderProjection::from_column_names(
file_reader.metadata.version(),
&written_file.schema,
&columns,
)
.unwrap()
};
let batch_stream = file_reader
.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
projection.clone(),
FilterExpression::no_filter(),
)
.unwrap();
let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
verify_expected(
&written_file.data,
batch_stream,
1024,
Some(Box::new(move |batch: &RecordBatch| {
batch.project_by_schema(&projection_arrow).unwrap()
})),
)
.await;
let file_reader = FileReader::try_open(
file_scheduler.clone(),
Some(projection.clone()),
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let batch_stream = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
FilterExpression::no_filter(),
)
.unwrap();
let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
verify_expected(
&written_file.data,
batch_stream,
1024,
Some(Box::new(move |batch: &RecordBatch| {
batch.project_by_schema(&projection_arrow).unwrap()
})),
)
.await;
assert!(
file_reader
.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
empty_projection.clone(),
FilterExpression::no_filter(),
)
.is_err()
);
}
}
assert!(
FileReader::try_open(
file_scheduler.clone(),
Some(empty_projection),
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.is_err()
);
let arrow_schema = ArrowSchema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let projection_with_dupes = ReaderProjection {
column_indices: vec![0, 0],
schema: Arc::new(schema),
};
assert!(
FileReader::try_open(
file_scheduler.clone(),
Some(projection_with_dupes),
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.is_err()
);
}
#[test_log::test(tokio::test)]
async fn test_compressing_buffer() {
let fs = FsFixture::default();
let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let mut projection = written_file.schema.project(&["score"]).unwrap();
for field in projection.fields.iter_mut() {
field
.metadata
.insert("lance:compression".to_string(), "zstd".to_string());
}
let projection = ReaderProjection {
column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
schema: Arc::new(projection),
};
let batch_stream = file_reader
.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
projection.clone(),
FilterExpression::no_filter(),
)
.unwrap();
let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
verify_expected(
&written_file.data,
batch_stream,
1024,
Some(Box::new(move |batch: &RecordBatch| {
batch.project_by_schema(&projection_arrow).unwrap()
})),
)
.await;
}
#[tokio::test]
async fn test_read_all() {
let fs = FsFixture::default();
let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let batches = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
total_rows as u32,
16,
FilterExpression::no_filter(),
)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), total_rows);
}
#[rstest]
#[tokio::test]
async fn test_blocking_take(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1, LanceFileVersion::V2_2)]
version: LanceFileVersion,
) {
let fs = FsFixture::default();
let WrittenFile { data, schema, .. } = create_some_file(&fs, version).await;
let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
Some(ReaderProjection::from_column_names(version, &schema, &["score"]).unwrap()),
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let batches = tokio::task::spawn_blocking(move || {
file_reader
.read_stream_projected_blocking(
lance_io::ReadBatchParams::Indices(UInt32Array::from(vec![0, 1, 2, 3, 4])),
total_rows as u32,
None,
FilterExpression::no_filter(),
)
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap()
})
.await
.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 5);
assert_eq!(batches[0].num_columns(), 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_drop_in_progress() {
let fs = FsFixture::default();
let WrittenFile { data, .. } = create_some_file(&fs, LanceFileVersion::V2_0).await;
let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let mut batches = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
(total_rows / 10) as u32,
16,
FilterExpression::no_filter(),
)
.unwrap();
drop(file_reader);
let batch = batches.next().await.unwrap().unwrap();
assert!(batch.num_rows() > 0);
drop(batches);
}
#[tokio::test]
async fn drop_while_scheduling() {
let fs = FsFixture::default();
let written_file = create_some_file(&fs, LanceFileVersion::V2_0).await;
let total_rows = written_file
.data
.iter()
.map(|batch| batch.num_rows())
.sum::<usize>();
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let projection =
ReaderProjection::from_whole_schema(&written_file.schema, LanceFileVersion::V2_0);
let column_infos = file_reader
.collect_columns_from_projection(&projection)
.unwrap();
let mut decode_scheduler = DecodeBatchScheduler::try_new(
&projection.schema,
&projection.column_indices,
&column_infos,
&vec![],
total_rows as u64,
Arc::<DecoderPlugins>::default(),
file_reader.scheduler.clone(),
test_cache(),
&FilterExpression::no_filter(),
&DecoderConfig::default(),
)
.await
.unwrap();
let range = 0..total_rows as u64;
let (tx, rx) = mpsc::unbounded_channel();
drop(rx);
decode_scheduler.schedule_range(
range,
&FilterExpression::no_filter(),
tx,
file_reader.scheduler.clone(),
)
}
#[tokio::test]
async fn test_read_empty_range() {
let fs = FsFixture::default();
create_some_file(&fs, LanceFileVersion::V2_0).await;
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let batches = file_reader
.read_stream(
lance_io::ReadBatchParams::Range(0..0),
1024,
16,
FilterExpression::no_filter(),
)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 0);
let batches = file_reader
.read_stream(
lance_io::ReadBatchParams::Ranges(Arc::new([0..1, 2..2])),
1024,
16,
FilterExpression::no_filter(),
)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 1);
}
#[tokio::test]
async fn test_global_buffers() {
let fs = FsFixture::default();
let lance_schema =
lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
"foo",
DataType::Int32,
true,
)]))
.unwrap();
let mut file_writer = FileWriter::try_new(
fs.object_store.create(&fs.tmp_path).await.unwrap(),
lance_schema.clone(),
FileWriterOptions::default(),
)
.unwrap();
let test_bytes = Bytes::from_static(b"hello");
let buf_index = file_writer
.add_global_buffer(test_bytes.clone())
.await
.unwrap();
assert_eq!(buf_index, 1);
file_writer.finish().await.unwrap();
let file_scheduler = fs
.scheduler
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
.await
.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
Arc::<DecoderPlugins>::default(),
&test_cache(),
FileReaderOptions::default(),
)
.await
.unwrap();
let buf = file_reader.read_global_buffer(1).await.unwrap();
assert_eq!(buf, test_bytes);
}
}