use std::{collections::BTreeSet, io::Cursor, ops::Range, pin::Pin, sync::Arc};
use arrow_schema::Schema as ArrowSchema;
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, FutureExt, Stream, StreamExt};
use lance_arrow::DataTypeExt;
use lance_encoding::{
decoder::{
BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, DecoderMiddlewareChain,
FilterExpression, PageInfo, ReadBatchTask,
},
encoder::EncodedBatch,
EncodingsIo,
};
use log::debug;
use prost::{Message, Name};
use snafu::{location, Location};
use lance_core::{
datatypes::{Field, Schema},
Error, Result,
};
use lance_encoding::format::pb as pbenc;
use lance_io::{
scheduler::FileScheduler,
stream::{RecordBatchStream, RecordBatchStreamAdapter},
ReadBatchParams,
};
use tokio::sync::mpsc;
use crate::{
datatypes::{Fields, FieldsWithMeta},
format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION_NEXT},
};
use super::io::LanceEncodingsIo;
#[derive(Debug)]
pub struct BufferDescriptor {
pub position: u64,
pub size: u64,
}
#[derive(Debug)]
pub struct CachedFileMetadata {
pub file_schema: Arc<Schema>,
pub column_metadatas: Vec<pbfile::ColumnMetadata>,
pub column_infos: Vec<Arc<ColumnInfo>>,
pub num_rows: u64,
pub file_buffers: Vec<BufferDescriptor>,
pub num_data_bytes: u64,
pub num_column_metadata_bytes: u64,
pub num_global_buffer_bytes: u64,
pub num_footer_bytes: u64,
pub major_version: u16,
pub minor_version: u16,
}
#[derive(Debug, Clone)]
pub struct ReaderProjection {
pub schema: Arc<Schema>,
pub column_indices: Vec<u32>,
}
#[derive(Debug)]
pub struct FileReader {
scheduler: Arc<LanceEncodingsIo>,
base_projection: ReaderProjection,
num_rows: u64,
metadata: Arc<CachedFileMetadata>,
decoder_strategy: DecoderMiddlewareChain,
}
#[derive(Debug)]
struct Footer {
#[allow(dead_code)]
column_meta_start: u64,
#[allow(dead_code)]
column_meta_offsets_start: u64,
global_buff_offsets_start: u64,
num_global_buffers: u32,
num_columns: u32,
major_version: u16,
minor_version: u16,
}
const FOOTER_LEN: usize = 40;
impl FileReader {
pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
&self.metadata
}
pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
self.scheduler
.submit_single(
buffer_desc.position..buffer_desc.position + buffer_desc.size,
0,
)
.await
}
async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> {
let file_size = scheduler.reader().size().await? as u64;
let begin = if file_size < scheduler.reader().block_size() as u64 {
0
} else {
file_size - scheduler.reader().block_size() as u64
};
let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?;
Ok((tail_bytes, file_size))
}
fn decode_footer(footer_bytes: &Bytes) -> Result<Footer> {
let len = footer_bytes.len();
if len < FOOTER_LEN {
return Err(Error::io(
format!(
"does not have sufficient data, len: {}, bytes: {:?}",
len, footer_bytes
),
location!(),
));
}
let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));
let column_meta_start = cursor.read_u64::<LittleEndian>()?;
let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
let num_columns = cursor.read_u32::<LittleEndian>()?;
let major_version = cursor.read_u16::<LittleEndian>()?;
let minor_version = cursor.read_u16::<LittleEndian>()?;
if major_version != MAJOR_VERSION as u16 || minor_version != MINOR_VERSION_NEXT {
return Err(Error::io(
format!(
"Attempt to use the lance v0.2 reader to read a file with version {}.{}",
major_version, minor_version
),
location!(),
));
}
let magic_bytes = footer_bytes.slice(len - 4..);
if magic_bytes.as_ref() != MAGIC {
return Err(Error::io(
format!(
"file does not appear to be a Lance file (invalid magic: {:?})",
MAGIC
),
location!(),
));
}
Ok(Footer {
column_meta_start,
column_meta_offsets_start,
global_buff_offsets_start,
num_global_buffers,
num_columns,
major_version,
minor_version,
})
}
fn read_all_column_metadata(
column_metadata_bytes: Bytes,
footer: &Footer,
) -> Result<Vec<pbfile::ColumnMetadata>> {
let column_metadata_start = footer.column_meta_start;
let cmo_table_size = 16 * footer.num_columns as usize;
let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
(0..footer.num_columns)
.map(|col_idx| {
let offset = (col_idx * 16) as usize;
let position = LittleEndian::read_u64(&cmo_table[offset..offset + 8]);
let length = LittleEndian::read_u64(&cmo_table[offset + 8..offset + 16]);
let normalized_position = (position - column_metadata_start) as usize;
let normalized_end = normalized_position + (length as usize);
Ok(pbfile::ColumnMetadata::decode(
&column_metadata_bytes[normalized_position..normalized_end],
)?)
})
.collect::<Result<Vec<_>>>()
}
async fn optimistic_tail_read(
data: &Bytes,
start_pos: u64,
scheduler: &FileScheduler,
file_len: u64,
) -> Result<Bytes> {
let num_bytes_needed = (file_len - start_pos) as usize;
if data.len() >= num_bytes_needed {
Ok(data.slice((data.len() - num_bytes_needed)..))
} else {
let num_bytes_missing = (num_bytes_needed - data.len()) as u64;
let start = file_len - num_bytes_needed as u64;
let missing_bytes = scheduler
.submit_single(start..start + num_bytes_missing, 0)
.await?;
let mut combined = BytesMut::with_capacity(data.len() + num_bytes_missing as usize);
combined.extend(missing_bytes);
combined.extend(data);
Ok(combined.freeze())
}
}
fn do_decode_gbo_table(gbo_bytes: &Bytes, footer: &Footer) -> Result<Vec<BufferDescriptor>> {
let mut global_bufs_cursor = Cursor::new(gbo_bytes);
let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize);
for _ in 0..footer.num_global_buffers {
let buf_pos = global_bufs_cursor.read_u64::<LittleEndian>()?;
let buf_size = global_bufs_cursor.read_u64::<LittleEndian>()?;
global_buffers.push(BufferDescriptor {
position: buf_pos,
size: buf_size,
});
}
Ok(global_buffers)
}
async fn decode_gbo_table(
tail_bytes: &Bytes,
file_len: u64,
scheduler: &FileScheduler,
footer: &Footer,
) -> Result<Vec<BufferDescriptor>> {
let gbo_bytes = Self::optimistic_tail_read(
tail_bytes,
footer.global_buff_offsets_start,
scheduler,
file_len,
)
.await?;
Self::do_decode_gbo_table(&gbo_bytes, footer)
}
fn decode_schema(schema_bytes: Bytes) -> Result<(u64, lance_core::datatypes::Schema)> {
let file_descriptor = pb::FileDescriptor::decode(schema_bytes)?;
let pb_schema = file_descriptor.schema.unwrap();
let num_rows = file_descriptor.length;
let fields_with_meta = FieldsWithMeta {
fields: Fields(pb_schema.fields),
metadata: pb_schema.metadata,
};
let schema = lance_core::datatypes::Schema::from(fields_with_meta);
Ok((num_rows, schema))
}
async fn read_all_metadata(scheduler: &FileScheduler) -> Result<CachedFileMetadata> {
let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
let footer = Self::decode_footer(&tail_bytes)?;
let gbo_table = Self::decode_gbo_table(&tail_bytes, file_len, scheduler, &footer).await?;
if gbo_table.is_empty() {
return Err(Error::Internal {
message: "File did not contain any global buffers, schema expected".to_string(),
location: location!(),
});
}
let schema_start = gbo_table[0].position;
let schema_size = gbo_table[0].size;
let num_footer_bytes = file_len - schema_start;
let all_metadata_bytes =
Self::optimistic_tail_read(&tail_bytes, schema_start, scheduler, file_len).await?;
let schema_bytes = all_metadata_bytes.slice(0..schema_size as usize);
let (num_rows, schema) = Self::decode_schema(schema_bytes)?;
let column_metadata_start = (footer.column_meta_start - schema_start) as usize;
let column_metadata_end = (footer.global_buff_offsets_start - schema_start) as usize;
let column_metadata_bytes =
all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas = Self::read_all_column_metadata(column_metadata_bytes, &footer)?;
let footer_start = file_len - FOOTER_LEN as u64;
let num_data_bytes = footer.column_meta_start;
let num_global_buffer_bytes = gbo_table.iter().map(|buf| buf.size).sum::<u64>()
+ (footer_start - footer.global_buff_offsets_start);
let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;
let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice());
Ok(CachedFileMetadata {
file_schema: Arc::new(schema),
column_metadatas,
column_infos,
num_rows,
num_data_bytes,
num_column_metadata_bytes,
num_global_buffer_bytes,
num_footer_bytes,
file_buffers: gbo_table,
major_version: footer.major_version,
minor_version: footer.minor_version,
})
}
fn fetch_encoding<M: Default + Name + Sized>(encoding: &pbfile::Encoding) -> M {
match &encoding.location {
Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
Some(pbfile::encoding::Location::Direct(encoding)) => {
let encoding_buf = Bytes::from(encoding.encoding.clone());
let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
encoding_any.to_msg::<M>().unwrap()
}
Some(pbfile::encoding::Location::None(_)) => panic!(),
None => panic!(),
}
}
fn meta_to_col_infos(column_metadatas: &[pbfile::ColumnMetadata]) -> Vec<Arc<ColumnInfo>> {
column_metadatas
.iter()
.enumerate()
.map(|(col_idx, col_meta)| {
let page_infos = col_meta
.pages
.iter()
.map(|page| {
let num_rows = page.length;
let encoding = Self::fetch_encoding(page.encoding.as_ref().unwrap());
let buffer_offsets_and_sizes = Arc::from(
page.buffer_offsets
.iter()
.zip(page.buffer_sizes.iter())
.map(|(offset, size)| (*offset, *size))
.collect::<Vec<_>>(),
);
PageInfo {
buffer_offsets_and_sizes,
encoding,
num_rows,
}
})
.collect::<Vec<_>>();
let buffer_offsets_and_sizes = Arc::from(
col_meta
.buffer_offsets
.iter()
.zip(col_meta.buffer_sizes.iter())
.map(|(offset, size)| (*offset, *size))
.collect::<Vec<_>>(),
);
Arc::new(ColumnInfo {
index: col_idx as u32,
page_infos: Arc::from(page_infos),
buffer_offsets_and_sizes,
encoding: Self::fetch_encoding(col_meta.encoding.as_ref().unwrap()),
})
})
.collect::<Vec<_>>()
}
fn validate_projection(
projection: &ReaderProjection,
metadata: &CachedFileMetadata,
) -> Result<()> {
if projection.schema.fields.is_empty() {
return Err(Error::invalid_input(
"Attempt to read zero columns from the file, at least one column must be specified"
.to_string(),
location!(),
));
}
if projection.schema.fields.len() != projection.column_indices.len() {
return Err(Error::invalid_input(format!("The projection schema has {} top level fields but only {} column indices were provided", projection.schema.fields.len(), projection.column_indices.len()), location!()));
}
let mut column_indices_seen = BTreeSet::new();
for column_index in &projection.column_indices {
if !column_indices_seen.insert(*column_index) {
return Err(Error::invalid_input(
format!(
"The projection specified the column index {} more than once",
column_index
),
location!(),
));
}
if *column_index >= metadata.column_infos.len() as u32 {
return Err(Error::invalid_input(format!("The projection specified the column index {} but there are only {} columns in the file", column_index, metadata.column_infos.len()), location!()));
}
}
Ok(())
}
fn default_column_count(field: &Field) -> u32 {
if field.data_type().is_binary_like() {
2
} else {
1 + field
.children
.iter()
.map(Self::default_column_count)
.sum::<u32>()
}
}
fn default_projection(lance_schema: &Schema) -> ReaderProjection {
let schema = Arc::new(lance_schema.clone());
let mut column_indices = Vec::with_capacity(lance_schema.fields.len());
let mut column_index = 0;
for field in &lance_schema.fields {
column_indices.push(column_index);
column_index += Self::default_column_count(field);
}
ReaderProjection {
schema,
column_indices,
}
}
pub async fn try_open(
scheduler: FileScheduler,
base_projection: Option<ReaderProjection>,
decoder_strategy: DecoderMiddlewareChain,
) -> Result<Self> {
let file_metadata = Arc::new(Self::read_all_metadata(&scheduler).await?);
if let Some(base_projection) = base_projection.as_ref() {
Self::validate_projection(base_projection, &file_metadata)?;
}
let num_rows = file_metadata.num_rows;
Ok(Self {
scheduler: Arc::new(LanceEncodingsIo(scheduler)),
base_projection: base_projection
.unwrap_or(Self::default_projection(file_metadata.file_schema.as_ref())),
num_rows,
metadata: file_metadata,
decoder_strategy,
})
}
fn collect_columns(
&self,
field: &Field,
column_idx: &mut usize,
column_infos: &mut Vec<Arc<ColumnInfo>>,
) -> Result<()> {
column_infos.push(self.metadata.column_infos[*column_idx].clone());
*column_idx += 1;
if field.data_type().is_binary_like() {
column_infos.push(self.metadata.column_infos[*column_idx].clone());
*column_idx += 1;
}
for child in &field.children {
self.collect_columns(child, column_idx, column_infos)?;
}
Ok(())
}
fn collect_columns_from_projection(
&self,
projection: &ReaderProjection,
) -> Result<Vec<Arc<ColumnInfo>>> {
let mut column_infos = Vec::with_capacity(projection.column_indices.len());
for (field, starting_column) in projection
.schema
.fields
.iter()
.zip(projection.column_indices.iter())
{
let mut starting_column = *starting_column as usize;
self.collect_columns(field, &mut starting_column, &mut column_infos)?;
}
Ok(column_infos)
}
#[allow(clippy::too_many_arguments)]
async fn do_read_range(
column_infos: Vec<Arc<ColumnInfo>>,
scheduler: Arc<dyn EncodingsIo>,
num_rows: u64,
decoder_strategy: DecoderMiddlewareChain,
range: Range<u64>,
batch_size: u32,
projection: &ReaderProjection,
filter: FilterExpression,
) -> Result<BoxStream<'static, ReadBatchTask>> {
debug!(
"Reading range {:?} with batch_size {} from columns {:?}",
range,
batch_size,
column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
);
let mut decode_scheduler = DecodeBatchScheduler::try_new(
&projection.schema,
&column_infos,
&vec![],
num_rows,
&decoder_strategy,
&scheduler,
)
.await?;
let root_decoder = decode_scheduler.new_root_decoder_ranges(&[range.clone()]);
let (tx, rx) = mpsc::unbounded_channel();
let num_rows_to_read = range.end - range.start;
tokio::task::spawn(async move {
decode_scheduler.schedule_range(range, &filter, tx, scheduler)
});
Ok(BatchDecodeStream::new(rx, batch_size, num_rows_to_read, root_decoder).into_stream())
}
fn read_range(
&self,
range: Range<u64>,
batch_size: u32,
projection: &ReaderProjection,
filter: FilterExpression,
) -> Result<BoxStream<'static, ReadBatchTask>> {
let range = range.clone();
let projection = projection.clone();
let column_infos = self.collect_columns_from_projection(&projection)?;
let scheduler = self.scheduler.clone();
let num_rows = self.num_rows;
let decoder_strategy = self.decoder_strategy.clone();
let stream_fut = async move {
let maybe_stream = Self::do_read_range(
column_infos,
scheduler,
num_rows,
decoder_strategy,
range,
batch_size,
&projection,
filter,
)
.await;
match maybe_stream {
Ok(stream) => stream,
Err(err) => futures::stream::once(std::future::ready(ReadBatchTask {
num_rows: 0,
task: std::future::ready(Err(err)).boxed(),
}))
.boxed(),
}
};
Ok(futures::stream::once(stream_fut).flatten().boxed())
}
async fn do_take_rows(
column_infos: Vec<Arc<ColumnInfo>>,
scheduler: Arc<dyn EncodingsIo>,
num_rows: u64,
decoder_strategy: DecoderMiddlewareChain,
indices: Vec<u64>,
batch_size: u32,
projection: &ReaderProjection,
) -> Result<BoxStream<'static, ReadBatchTask>> {
debug!(
"Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
indices.len(),
indices[0],
indices[indices.len() - 1],
batch_size,
column_infos.iter().map(|ci| ci.index).collect::<Vec<_>>()
);
let mut decode_scheduler = DecodeBatchScheduler::try_new(
&projection.schema,
&column_infos,
&vec![],
num_rows,
&decoder_strategy,
&scheduler,
)
.await?;
let root_decoder = decode_scheduler.new_root_decoder_indices(&indices);
let (tx, rx) = mpsc::unbounded_channel();
let num_rows_to_read = indices.len() as u64;
tokio::task::spawn(async move {
decode_scheduler.schedule_take(&indices, &FilterExpression::no_filter(), tx, scheduler)
});
Ok(BatchDecodeStream::new(rx, batch_size, num_rows_to_read, root_decoder).into_stream())
}
fn take_rows(
&self,
indices: Vec<u64>,
batch_size: u32,
projection: &ReaderProjection,
) -> Result<BoxStream<'static, ReadBatchTask>> {
let projection = projection.clone();
let column_infos = self.collect_columns_from_projection(&projection)?;
let scheduler = self.scheduler.clone();
let num_rows = self.num_rows;
let decoder_strategy = self.decoder_strategy.clone();
let stream_fut = async move {
let maybe_stream = Self::do_take_rows(
column_infos,
scheduler,
num_rows,
decoder_strategy,
indices,
batch_size,
&projection,
)
.await;
match maybe_stream {
Ok(stream) => stream,
Err(err) => futures::stream::once(std::future::ready(ReadBatchTask {
num_rows: 0,
task: std::future::ready(Err(err)).boxed(),
}))
.boxed(),
}
};
Ok(futures::stream::once(stream_fut).flatten().boxed())
}
pub fn read_tasks(
&self,
params: ReadBatchParams,
batch_size: u32,
projection: &ReaderProjection,
filter: FilterExpression,
) -> Result<Pin<Box<dyn Stream<Item = ReadBatchTask> + Send>>> {
Self::validate_projection(projection, &self.metadata)?;
let verify_bound = |params: &ReadBatchParams, bound: u64, inclusive: bool| {
if bound > self.num_rows || bound == self.num_rows && inclusive {
Err(Error::invalid_input(
format!(
"cannot read {:?} from file with {} rows",
params, self.num_rows
),
location!(),
))
} else {
Ok(())
}
};
match ¶ms {
ReadBatchParams::Indices(indices) => {
for idx in indices {
match idx {
None => {
return Err(Error::invalid_input(
"Null value in indices array",
location!(),
));
}
Some(idx) => {
verify_bound(¶ms, idx as u64, true)?;
}
}
}
let indices = indices.iter().map(|idx| idx.unwrap() as u64).collect();
self.take_rows(indices, batch_size, projection)
}
ReadBatchParams::Range(range) => {
verify_bound(¶ms, range.end as u64, false)?;
self.read_range(
range.start as u64..range.end as u64,
batch_size,
projection,
filter,
)
}
ReadBatchParams::RangeFrom(range) => {
verify_bound(¶ms, range.start as u64, true)?;
self.read_range(
range.start as u64..self.num_rows,
batch_size,
projection,
filter,
)
}
ReadBatchParams::RangeTo(range) => {
verify_bound(¶ms, range.end as u64, false)?;
self.read_range(0..range.end as u64, batch_size, projection, filter)
}
ReadBatchParams::RangeFull => {
self.read_range(0..self.num_rows, batch_size, projection, filter)
}
}
}
pub fn read_stream_projected(
&self,
params: ReadBatchParams,
batch_size: u32,
batch_readahead: u32,
projection: &ReaderProjection,
filter: FilterExpression,
) -> Result<Pin<Box<dyn RecordBatchStream>>> {
let tasks_stream = self.read_tasks(params, batch_size, projection, filter)?;
let batch_stream = tasks_stream
.map(|task| task.task)
.buffered(batch_readahead as usize)
.boxed();
let arrow_schema = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
Ok(Box::pin(RecordBatchStreamAdapter::new(
arrow_schema,
batch_stream,
)))
}
pub fn read_stream(
&self,
params: ReadBatchParams,
batch_size: u32,
batch_readahead: u32,
filter: FilterExpression,
) -> Result<Pin<Box<dyn RecordBatchStream>>> {
self.read_stream_projected(
params,
batch_size,
batch_readahead,
&self.base_projection,
filter,
)
}
pub fn schema(&self) -> &Arc<Schema> {
&self.metadata.file_schema
}
}
pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
if let Some(encoding) = &page.encoding {
if let Some(style) = &encoding.location {
match style {
pbfile::encoding::Location::Indirect(indirect) => {
format!(
"IndirectEncoding(pos={},size={})",
indirect.buffer_location, indirect.buffer_length
)
}
pbfile::encoding::Location::Direct(direct) => {
let encoding_any =
prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
.expect("failed to deserialize encoding as protobuf");
if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
match encoding {
Ok(encoding) => {
format!("{:#?}", encoding)
}
Err(err) => {
format!("Unsupported(decode_err={})", err)
}
}
} else {
format!("Unrecognized(type_url={})", encoding_any.type_url)
}
}
pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
}
} else {
"MISSING STYLE".to_string()
}
} else {
"MISSING".to_string()
}
}
pub trait EncodedBatchReaderExt {
fn try_from_mini_lance(bytes: Bytes, schema: &Schema) -> Result<Self>
where
Self: Sized;
fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
where
Self: Sized;
}
impl EncodedBatchReaderExt for EncodedBatch {
fn try_from_mini_lance(bytes: Bytes, schema: &Schema) -> Result<Self>
where
Self: Sized,
{
let footer = FileReader::decode_footer(&bytes)?;
let column_metadata_start = footer.column_meta_start as usize;
let column_metadata_end = footer.global_buff_offsets_start as usize;
let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas =
FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
let page_table = FileReader::meta_to_col_infos(&column_metadatas);
Ok(Self {
data: bytes,
num_rows: page_table
.first()
.map(|col| {
col.page_infos
.iter()
.map(|page| page.num_rows as u64)
.sum::<u64>()
})
.unwrap_or(0),
page_table,
schema: Arc::new(schema.clone()),
})
}
fn try_from_self_described_lance(bytes: Bytes) -> Result<Self>
where
Self: Sized,
{
let footer = FileReader::decode_footer(&bytes)?;
let gbo_table = FileReader::do_decode_gbo_table(
&bytes.slice(footer.global_buff_offsets_start as usize..),
&footer,
)?;
if gbo_table.is_empty() {
return Err(Error::Internal {
message: "File did not contain any global buffers, schema expected".to_string(),
location: location!(),
});
}
let schema_start = gbo_table[0].position as usize;
let schema_size = gbo_table[0].size as usize;
let schema_bytes = bytes.slice(schema_start..(schema_start + schema_size));
let (_, schema) = FileReader::decode_schema(schema_bytes)?;
let column_metadata_start = footer.column_meta_start as usize;
let column_metadata_end = footer.global_buff_offsets_start as usize;
let column_metadata_bytes = bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas =
FileReader::read_all_column_metadata(column_metadata_bytes, &footer)?;
let page_table = FileReader::meta_to_col_infos(&column_metadatas);
Ok(Self {
data: bytes,
num_rows: page_table
.first()
.map(|col| {
col.page_infos
.iter()
.map(|page| page.num_rows as u64)
.sum::<u64>()
})
.unwrap_or(0),
page_table,
schema: Arc::new(schema.clone()),
})
}
}
#[cfg(test)]
pub mod tests {
use std::{pin::Pin, sync::Arc};
use arrow_array::{
types::{Float64Type, Int32Type},
RecordBatch,
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema};
use bytes::Bytes;
use futures::{prelude::stream::TryStreamExt, StreamExt};
use lance_arrow::RecordBatchExt;
use lance_core::datatypes::Schema;
use lance_datagen::{array, gen, BatchCount, ByteCount, RowCount};
use lance_encoding::{
decoder::{decode_batch, DecoderMiddlewareChain, FilterExpression},
encoder::{encode_batch, CoreFieldEncodingStrategy, EncodedBatch},
};
use lance_io::stream::RecordBatchStream;
use log::debug;
use crate::v2::{
reader::{EncodedBatchReaderExt, FileReader, ReaderProjection},
testing::{write_lance_file, FsFixture},
writer::{EncodedBatchWriteExt, FileWriter, FileWriterOptions},
};
async fn create_some_file(fs: &FsFixture) -> (Arc<Schema>, Vec<RecordBatch>) {
let location_type = DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Float64, true),
Field::new("y", DataType::Float64, true),
]));
let categories_type = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)));
let reader = gen()
.col("score", array::rand::<Float64Type>())
.col("location", array::rand_type(&location_type))
.col("categories", array::rand_type(&categories_type))
.into_reader_rows(RowCount::from(1000), BatchCount::from(100));
write_lance_file(reader, fs, FileWriterOptions::default()).await
}
type Transformer = Box<dyn Fn(&RecordBatch) -> RecordBatch>;
async fn verify_expected(
expected: &[RecordBatch],
mut actual: Pin<Box<dyn RecordBatchStream>>,
read_size: u32,
transform: Option<Transformer>,
) {
let mut remaining = expected.iter().map(|batch| batch.num_rows()).sum::<usize>() as u32;
let mut expected_iter = expected.iter().map(|batch| {
if let Some(transform) = &transform {
transform(batch)
} else {
batch.clone()
}
});
let mut next_expected = expected_iter.next().unwrap().clone();
while let Some(actual) = actual.next().await {
let mut actual = actual.unwrap();
let mut rows_to_verify = actual.num_rows() as u32;
let expected_length = remaining.min(read_size);
assert_eq!(expected_length, rows_to_verify);
while rows_to_verify > 0 {
let next_slice_len = (next_expected.num_rows() as u32).min(rows_to_verify);
assert_eq!(
next_expected.slice(0, next_slice_len as usize),
actual.slice(0, next_slice_len as usize)
);
remaining -= next_slice_len;
rows_to_verify -= next_slice_len;
if remaining > 0 {
if next_slice_len == next_expected.num_rows() as u32 {
next_expected = expected_iter.next().unwrap().clone();
} else {
next_expected = next_expected.slice(
next_slice_len as usize,
next_expected.num_rows() - next_slice_len as usize,
);
}
}
if rows_to_verify > 0 {
actual = actual.slice(
next_slice_len as usize,
actual.num_rows() - next_slice_len as usize,
);
}
}
}
assert_eq!(remaining, 0);
}
#[tokio::test]
async fn test_round_trip() {
let fs = FsFixture::default();
let (_, data) = create_some_file(&fs).await;
for read_size in [32, 1024, 1024 * 1024] {
let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
let file_reader =
FileReader::try_open(file_scheduler, None, DecoderMiddlewareChain::default())
.await
.unwrap();
let schema = file_reader.schema();
assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
let batch_stream = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
read_size,
16,
FilterExpression::no_filter(),
)
.unwrap();
verify_expected(&data, batch_stream, read_size, None).await;
}
}
#[test_log::test(tokio::test)]
async fn test_encoded_batch_round_trip() {
let data = gen()
.col("x", array::rand::<Int32Type>())
.col("y", array::rand_utf8(ByteCount::from(16), false))
.into_batch_rows(RowCount::from(10000))
.unwrap();
let lance_schema = Arc::new(Schema::try_from(data.schema().as_ref()).unwrap());
let encoded_batch = encode_batch(
&data,
lance_schema.clone(),
&CoreFieldEncodingStrategy::default(),
4096,
)
.await
.unwrap();
let bytes = encoded_batch.try_to_self_described_lance().unwrap();
let decoded_batch = EncodedBatch::try_from_self_described_lance(bytes).unwrap();
let decoded = decode_batch(
&decoded_batch,
&FilterExpression::no_filter(),
&DecoderMiddlewareChain::default(),
)
.await
.unwrap();
assert_eq!(data, decoded);
let bytes = encoded_batch.try_to_mini_lance().unwrap();
let decoded_batch =
EncodedBatch::try_from_mini_lance(bytes, lance_schema.as_ref()).unwrap();
let decoded = decode_batch(
&decoded_batch,
&FilterExpression::no_filter(),
&DecoderMiddlewareChain::default(),
)
.await
.unwrap();
assert_eq!(data, decoded);
}
#[test_log::test(tokio::test)]
async fn test_projection() {
let fs = FsFixture::default();
let (schema, data) = create_some_file(&fs).await;
let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
for columns in [
vec!["score"],
vec!["location"],
vec!["categories"],
vec!["score.x"],
vec!["score", "categories"],
vec!["score", "location"],
vec!["location", "categories"],
vec!["score.y", "location", "categories"],
] {
debug!("Testing round trip with projection {:?}", columns);
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
DecoderMiddlewareChain::default(),
)
.await
.unwrap();
let projection = Arc::new(schema.project(&columns).unwrap());
let projection = ReaderProjection {
column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
schema: projection,
};
let batch_stream = file_reader
.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
&projection,
FilterExpression::no_filter(),
)
.unwrap();
let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
verify_expected(
&data,
batch_stream,
1024,
Some(Box::new(move |batch: &RecordBatch| {
batch.project_by_schema(&projection_arrow).unwrap()
})),
)
.await;
let file_reader = FileReader::try_open(
file_scheduler.clone(),
Some(projection.clone()),
DecoderMiddlewareChain::default(),
)
.await
.unwrap();
let batch_stream = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
FilterExpression::no_filter(),
)
.unwrap();
let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
verify_expected(
&data,
batch_stream,
1024,
Some(Box::new(move |batch: &RecordBatch| {
batch.project_by_schema(&projection_arrow).unwrap()
})),
)
.await;
}
let empty_projection = ReaderProjection {
column_indices: Vec::default(),
schema: Arc::new(Schema::default()),
};
assert!(FileReader::try_open(
file_scheduler.clone(),
Some(empty_projection),
DecoderMiddlewareChain::default()
)
.await
.is_err());
let arrow_schema = ArrowSchema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("y", DataType::Int32, true),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let projection_with_dupes = ReaderProjection {
column_indices: vec![0, 0],
schema: Arc::new(schema),
};
assert!(FileReader::try_open(
file_scheduler.clone(),
Some(projection_with_dupes),
DecoderMiddlewareChain::default()
)
.await
.is_err());
}
struct EnvVarGuard {
key: String,
original_value: Option<String>,
}
impl EnvVarGuard {
fn new(key: &str, new_value: &str) -> Self {
let original_value = std::env::var(key).ok();
std::env::set_var(key, new_value);
Self {
key: key.to_string(),
original_value,
}
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
if let Some(ref value) = self.original_value {
std::env::set_var(&self.key, value);
} else {
std::env::remove_var(&self.key);
}
}
}
#[test_log::test(tokio::test)]
async fn test_compressing_buffer() {
let fs = FsFixture::default();
let _env_guard = EnvVarGuard::new("LANCE_PAGE_COMPRESSION", "zstd");
let (schema, data) = create_some_file(&fs).await;
let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
DecoderMiddlewareChain::default(),
)
.await
.unwrap();
let projection = schema.project(&["score"]).unwrap();
let projection = ReaderProjection {
column_indices: projection.fields.iter().map(|f| f.id as u32).collect(),
schema: Arc::new(projection),
};
let batch_stream = file_reader
.read_stream_projected(
lance_io::ReadBatchParams::RangeFull,
1024,
16,
&projection,
FilterExpression::no_filter(),
)
.unwrap();
let projection_arrow = Arc::new(ArrowSchema::from(projection.schema.as_ref()));
verify_expected(
&data,
batch_stream,
1024,
Some(Box::new(move |batch: &RecordBatch| {
batch.project_by_schema(&projection_arrow).unwrap()
})),
)
.await;
}
#[tokio::test]
async fn test_read_all() {
let fs = FsFixture::default();
let (_, data) = create_some_file(&fs).await;
let total_rows = data.iter().map(|batch| batch.num_rows()).sum::<usize>();
let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
DecoderMiddlewareChain::default(),
)
.await
.unwrap();
let batches = file_reader
.read_stream(
lance_io::ReadBatchParams::RangeFull,
total_rows as u32,
16,
FilterExpression::no_filter(),
)
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), total_rows);
}
#[tokio::test]
async fn test_global_buffers() {
let fs = FsFixture::default();
let lance_schema =
lance_core::datatypes::Schema::try_from(&ArrowSchema::new(vec![Field::new(
"foo",
DataType::Int32,
true,
)]))
.unwrap();
let mut file_writer = FileWriter::try_new(
fs.object_store.create(&fs.tmp_path).await.unwrap(),
fs.tmp_path.to_string(),
lance_schema.clone(),
FileWriterOptions::default(),
)
.unwrap();
let test_bytes = Bytes::from_static(b"hello");
let buf_index = file_writer
.add_global_buffer(test_bytes.clone())
.await
.unwrap();
assert_eq!(buf_index, 1);
file_writer.finish().await.unwrap();
let file_scheduler = fs.scheduler.open_file(&fs.tmp_path).await.unwrap();
let file_reader = FileReader::try_open(
file_scheduler.clone(),
None,
DecoderMiddlewareChain::default(),
)
.await
.unwrap();
let buf = file_reader.read_global_buffer(1).await.unwrap();
assert_eq!(buf, test_bytes);
}
}