pub mod session;
pub mod write;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::Range;
use std::sync::Arc;
use arrow::compute::concat_batches;
use arrow_array::cast::as_primitive_array;
use arrow_array::{
RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, new_null_array,
};
use arrow_schema::Schema as ArrowSchema;
use datafusion::logical_expr::Expr;
use datafusion::scalar::ScalarValue;
use futures::future::try_join_all;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, join, stream};
use lance_arrow::{RecordBatchExt, SchemaExt};
use lance_core::datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions};
use lance_core::utils::deletion::DeletionVector;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::{Error, Result, cache::CacheKey, datatypes::Schema};
use lance_core::{
ROW_ADDR, ROW_ADDR_FIELD, ROW_CREATED_AT_VERSION_FIELD, ROW_ID, ROW_ID_FIELD,
ROW_LAST_UPDATED_AT_VERSION_FIELD,
};
use lance_datafusion::utils::StreamingWriteSource;
use lance_encoding::decoder::DecoderPlugins;
use lance_file::previous::reader::{
FileReader as PreviousFileReader, read_batch as previous_read_batch,
};
use lance_file::reader::{CachedFileMetadata, FileReaderOptions, ReaderProjection};
use lance_file::version::LanceFileVersion;
use lance_file::{LanceEncodingsIo, determine_file_version};
use lance_io::ReadBatchParams;
use lance_io::scheduler::{FileScheduler, ScanScheduler, SchedulerConfig};
use lance_io::utils::CachedFileSize;
use lance_table::format::{DataFile, DeletionFile, Fragment};
use lance_table::io::deletion::{deletion_file_path, write_deletion_file};
use lance_table::rowids::RowIdSequence;
use lance_table::utils::stream::{
ReadBatchFutStream, ReadBatchTask, ReadBatchTaskStream, RowIdAndDeletesConfig,
wrap_with_row_id_and_delete,
};
use self::write::FragmentCreateBuilder;
use super::hash_joiner::HashJoiner;
use super::rowids::load_row_id_sequence;
use super::scanner::Scanner;
use super::updater::Updater;
use super::{NewColumnTransform, WriteParams, schema_evolution};
use crate::dataset::Dataset;
use crate::dataset::fragment::session::FragmentSession;
use crate::io::deletion::read_dataset_deletion_file;
#[derive(Debug, Clone)]
pub struct FileFragment {
dataset: Arc<Dataset>,
pub(super) metadata: Fragment,
}
const DEFAULT_BATCH_READ_SIZE: u32 = 1024;
#[allow(clippy::len_without_is_empty)]
pub trait GenericFileReader: std::fmt::Debug + Send + Sync {
fn read_range_tasks(
&self,
range: Range<u64>,
batch_size: u32,
projection: Arc<lance_core::datatypes::Schema>,
) -> Result<ReadBatchTaskStream>;
fn read_ranges_tasks(
&self,
ranges: Arc<[Range<u64>]>,
batch_size: u32,
projection: Arc<lance_core::datatypes::Schema>,
) -> Result<ReadBatchTaskStream>;
fn read_all_tasks(
&self,
batch_size: u32,
projection: Arc<lance_core::datatypes::Schema>,
) -> Result<ReadBatchTaskStream>;
fn take_all_tasks(
&self,
indices: &[u32],
batch_size: u32,
projection: Arc<lance_core::datatypes::Schema>,
take_priority: Option<u32>,
) -> Result<ReadBatchTaskStream>;
fn len(&self) -> u32;
fn projection(&self) -> &Arc<Schema>;
fn storage_stats(&self) -> Vec<(u32, u64)>;
fn clone_box(&self) -> Box<dyn GenericFileReader>;
fn is_legacy(&self) -> bool;
fn as_legacy(&self) -> &PreviousFileReader {
self.as_legacy_opt()
.expect("legacy function called on v2 file")
}
fn as_legacy_opt(&self) -> Option<&PreviousFileReader>;
fn as_legacy_opt_mut(&mut self) -> Option<&mut PreviousFileReader>;
}
fn ranges_to_tasks(
reader: &PreviousFileReader,
ranges: Vec<(i32, Range<usize>)>,
projection: Arc<Schema>,
) -> ReadBatchTaskStream {
let reader = reader.clone();
stream::iter(ranges)
.map(move |(batch_idx, range)| {
let num_rows = range.end - range.start;
let reader = reader.clone();
let projection = projection.clone();
let task = tokio::task::spawn(async move {
previous_read_batch(
&reader,
&ReadBatchParams::Range(range.clone()),
&projection,
batch_idx,
)
.await
})
.map(|task_out| task_out.unwrap())
.boxed();
ReadBatchTask {
task,
num_rows: num_rows as u32,
}
})
.boxed()
}
#[derive(Clone, Debug)]
struct V1Reader {
reader: PreviousFileReader,
projection: Arc<Schema>,
}
impl V1Reader {
fn new(reader: PreviousFileReader, projection: Arc<Schema>) -> Self {
Self { reader, projection }
}
}
impl GenericFileReader for V1Reader {
fn read_range_tasks(
&self,
range: Range<u64>,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
let mut to_skip = range.start as u32;
let mut remaining = range.end as u32 - to_skip;
let mut ranges = Vec::new();
let mut batch_idx = 0;
while remaining > 0 {
let next_batch_len = self.reader.num_rows_in_batch(batch_idx) as u32;
let next_batch_idx = batch_idx;
batch_idx += 1;
if to_skip >= next_batch_len {
to_skip -= next_batch_len;
continue;
}
let batch_start = to_skip;
to_skip = 0;
let batch_end = next_batch_len.min(batch_start + remaining);
remaining -= batch_end - batch_start;
for chunk_start in (batch_start..batch_end).step_by(batch_size as usize) {
let chunk_end = (chunk_start + batch_size).min(batch_end);
ranges.push((next_batch_idx, (chunk_start as usize..chunk_end as usize)));
}
}
Ok(ranges_to_tasks(&self.reader, ranges, projection))
}
fn read_all_tasks(
&self,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
let ranges = (0..self.reader.num_batches())
.flat_map(move |batch_idx| {
let rows_in_batch = self.reader.num_rows_in_batch(batch_idx as i32);
(0..rows_in_batch)
.step_by(batch_size as usize)
.map(move |start| {
let end = (start + batch_size as usize).min(rows_in_batch);
(batch_idx as i32, start..end)
})
})
.collect::<Vec<_>>();
Ok(ranges_to_tasks(&self.reader, ranges, projection))
}
fn read_ranges_tasks(
&self,
_ranges: Arc<[Range<u64>]>,
_batch_size: u32,
_projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
Err(Error::internal(
"Attempt to perform FilteredRead on v1 files".to_string(),
))
}
fn take_all_tasks(
&self,
indices: &[u32],
_batch_size: u32,
projection: Arc<Schema>,
_take_priority: Option<u32>,
) -> Result<ReadBatchTaskStream> {
let indices_vec = indices.to_vec();
let reader = self.reader.clone();
let task_fut = async move { reader.take(&indices_vec, projection.as_ref()).await }.boxed();
let task = std::future::ready(ReadBatchTask {
task: task_fut,
num_rows: indices.len() as u32,
})
.boxed();
Ok(futures::stream::once(task).boxed())
}
fn projection(&self) -> &Arc<Schema> {
&self.projection
}
fn len(&self) -> u32 {
self.reader.len() as u32
}
fn storage_stats(&self) -> Vec<(u32, u64)> {
Vec::new()
}
fn clone_box(&self) -> Box<dyn GenericFileReader> {
Box::new(self.clone())
}
fn is_legacy(&self) -> bool {
true
}
fn as_legacy_opt(&self) -> Option<&PreviousFileReader> {
Some(&self.reader)
}
fn as_legacy_opt_mut(&mut self) -> Option<&mut PreviousFileReader> {
Some(&mut self.reader)
}
}
mod v2_adapter {
use lance_encoding::decoder::FilterExpression;
use super::*;
#[derive(Debug, Clone)]
pub struct Reader {
reader: Arc<lance_file::reader::FileReader>,
projection: Arc<Schema>,
field_id_to_column_idx: Arc<BTreeMap<u32, u32>>,
default_priority: u32,
file_scheduler: FileScheduler,
}
impl Reader {
pub fn new(
reader: Arc<lance_file::reader::FileReader>,
projection: Arc<Schema>,
field_id_to_column_idx: Arc<BTreeMap<u32, u32>>,
default_priority: u32,
file_scheduler: FileScheduler,
) -> Self {
Self {
reader,
projection,
field_id_to_column_idx,
default_priority,
file_scheduler,
}
}
}
impl GenericFileReader for Reader {
fn read_range_tasks(
&self,
range: Range<u64>,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
let projection = ReaderProjection::from_field_ids(
self.reader.metadata().version(),
projection.as_ref(),
self.field_id_to_column_idx.as_ref(),
)?;
Ok(self
.reader
.read_tasks(
ReadBatchParams::Range(range.start as usize..range.end as usize),
batch_size,
Some(projection),
FilterExpression::no_filter(),
)?
.map(|v2_task| ReadBatchTask {
task: v2_task.task.map_err(Error::from).boxed(),
num_rows: v2_task.num_rows,
})
.boxed())
}
fn read_ranges_tasks(
&self,
ranges: Arc<[Range<u64>]>,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
let projection = ReaderProjection::from_field_ids(
self.reader.metadata().version(),
projection.as_ref(),
self.field_id_to_column_idx.as_ref(),
)?;
Ok(self
.reader
.read_tasks(
ReadBatchParams::Ranges(ranges),
batch_size,
Some(projection),
FilterExpression::no_filter(),
)?
.map(|v2_task| ReadBatchTask {
task: v2_task.task.map_err(Error::from).boxed(),
num_rows: v2_task.num_rows,
})
.boxed())
}
fn read_all_tasks(
&self,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
let projection = ReaderProjection::from_field_ids(
self.reader.metadata().version(),
projection.as_ref(),
self.field_id_to_column_idx.as_ref(),
)?;
Ok(self
.reader
.read_tasks(
ReadBatchParams::RangeFull,
batch_size,
Some(projection),
FilterExpression::no_filter(),
)?
.map(|v2_task| ReadBatchTask {
task: v2_task.task.map_err(Error::from).boxed(),
num_rows: v2_task.num_rows,
})
.boxed())
}
fn take_all_tasks(
&self,
indices: &[u32],
batch_size: u32,
projection: Arc<Schema>,
take_priority: Option<u32>,
) -> Result<ReadBatchTaskStream> {
let indices = UInt32Array::from(indices.to_vec());
let projection = ReaderProjection::from_field_ids(
self.reader.metadata().version(),
projection.as_ref(),
self.field_id_to_column_idx.as_ref(),
)?;
let reader = if let Some(take_priority) = take_priority {
let op_priority = ((take_priority as u64) << 32) | self.default_priority as u64;
let scheduler = self.file_scheduler.with_priority(op_priority);
Arc::new(
self.reader
.with_scheduler(Arc::new(LanceEncodingsIo::new(scheduler))),
)
} else {
self.reader.clone()
};
Ok(reader
.read_tasks(
ReadBatchParams::Indices(indices),
batch_size,
Some(projection),
FilterExpression::no_filter(),
)?
.map(|v2_task| ReadBatchTask {
task: v2_task.task.map_err(Error::from).boxed(),
num_rows: v2_task.num_rows,
})
.boxed())
}
fn storage_stats(&self) -> Vec<(u32, u64)> {
let file_statistics = self.reader.file_statistics();
let column_idx_to_field_id = self
.field_id_to_column_idx
.iter()
.map(|(field_id, column_idx)| (*column_idx, *field_id))
.collect::<HashMap<_, _>>();
let mut stats = Vec::new();
let mut current_field_id = 0;
for (column_idx, col_stats) in file_statistics.columns.iter().enumerate() {
if let Some(field_id) = column_idx_to_field_id.get(&(column_idx as u32)) {
current_field_id = *field_id;
}
stats.push((current_field_id, col_stats.size_bytes));
}
stats
}
fn projection(&self) -> &Arc<Schema> {
&self.projection
}
fn len(&self) -> u32 {
self.reader.metadata().num_rows as u32
}
fn clone_box(&self) -> Box<dyn GenericFileReader> {
Box::new(self.clone())
}
fn is_legacy(&self) -> bool {
false
}
fn as_legacy_opt(&self) -> Option<&PreviousFileReader> {
None
}
fn as_legacy_opt_mut(&mut self) -> Option<&mut PreviousFileReader> {
None
}
}
}
#[derive(Debug, Clone)]
struct NullReader {
schema: Arc<Schema>,
num_rows: u32,
}
impl NullReader {
fn new(schema: Arc<Schema>, num_rows: u32) -> Self {
Self { schema, num_rows }
}
fn batch(projection: Arc<ArrowSchema>, num_rows: usize) -> RecordBatch {
let columns = projection
.fields()
.iter()
.map(|f| new_null_array(f.data_type(), num_rows))
.collect::<Vec<_>>();
RecordBatch::try_new(projection, columns).unwrap()
}
}
impl GenericFileReader for NullReader {
fn read_range_tasks(
&self,
range: Range<u64>,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
self.read_ranges_tasks(vec![range].into(), batch_size, projection)
}
fn read_ranges_tasks(
&self,
ranges: Arc<[Range<u64>]>,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
let mut remaining_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let projection: Arc<ArrowSchema> = Arc::new(projection.as_ref().into());
let task_iter = std::iter::from_fn(move || {
if remaining_rows == 0 {
return None;
}
let num_rows = remaining_rows.min(batch_size as u64) as usize;
remaining_rows -= num_rows as u64;
let batch = Self::batch(projection.clone(), num_rows);
let task = ReadBatchTask {
task: futures::future::ready(Ok(batch)).boxed(),
num_rows: num_rows as u32,
};
Some(task)
});
Ok(futures::stream::iter(task_iter).boxed())
}
fn read_all_tasks(
&self,
batch_size: u32,
projection: Arc<Schema>,
) -> Result<ReadBatchTaskStream> {
self.read_ranges_tasks(vec![0..self.num_rows as u64].into(), batch_size, projection)
}
fn take_all_tasks(
&self,
indices: &[u32],
batch_size: u32,
projection: Arc<Schema>,
_take_priority: Option<u32>,
) -> Result<ReadBatchTaskStream> {
let num_rows = indices.len() as u64;
self.read_ranges_tasks(vec![0..num_rows].into(), batch_size, projection)
}
fn storage_stats(&self) -> Vec<(u32, u64)> {
Vec::new()
}
fn projection(&self) -> &Arc<Schema> {
&self.schema
}
fn len(&self) -> u32 {
self.num_rows
}
fn clone_box(&self) -> Box<dyn GenericFileReader> {
Box::new(self.clone())
}
fn is_legacy(&self) -> bool {
false
}
fn as_legacy_opt(&self) -> Option<&PreviousFileReader> {
None
}
fn as_legacy_opt_mut(&mut self) -> Option<&mut PreviousFileReader> {
None
}
}
#[derive(Debug, Default)]
pub struct FragReadConfig {
pub with_row_id: bool,
pub with_row_address: bool,
pub with_row_last_updated_at_version: bool,
pub with_row_created_at_version: bool,
pub scan_scheduler: Option<Arc<ScanScheduler>>,
pub reader_priority: Option<u32>,
pub file_reader_options: Option<FileReaderOptions>,
}
impl FragReadConfig {
pub fn with_row_id(mut self, value: bool) -> Self {
self.with_row_id = value;
self
}
pub fn with_row_address(mut self, value: bool) -> Self {
self.with_row_address = value;
self
}
pub fn with_row_last_updated_at_version(mut self, value: bool) -> Self {
self.with_row_last_updated_at_version = value;
self
}
pub fn with_row_created_at_version(mut self, value: bool) -> Self {
self.with_row_created_at_version = value;
self
}
pub fn has_system_cols(&self) -> bool {
self.with_row_id
|| self.with_row_address
|| self.with_row_last_updated_at_version
|| self.with_row_created_at_version
}
pub fn with_scan_scheduler(mut self, value: Arc<ScanScheduler>) -> Self {
self.scan_scheduler = Some(value);
self
}
pub fn with_reader_priority(mut self, value: u32) -> Self {
self.reader_priority = Some(value);
self
}
pub fn with_file_reader_options(mut self, value: FileReaderOptions) -> Self {
self.file_reader_options = Some(value);
self
}
}
impl FileFragment {
pub fn new(dataset: Arc<Dataset>, metadata: Fragment) -> Self {
Self { dataset, metadata }
}
pub async fn create(
dataset_uri: &str,
id: usize,
source: impl StreamingWriteSource,
params: Option<WriteParams>,
) -> Result<Fragment> {
let mut builder = FragmentCreateBuilder::new(dataset_uri);
if let Some(params) = params.as_ref() {
builder = builder.write_params(params);
}
builder.write(source, Some(id as u64)).await
}
pub async fn create_fragments(
dataset_uri: &str,
source: impl StreamingWriteSource,
params: Option<WriteParams>,
) -> Result<Vec<Fragment>> {
let mut builder = FragmentCreateBuilder::new(dataset_uri);
if let Some(params) = params.as_ref() {
builder = builder.write_params(params);
}
builder.write_fragments(source).await
}
pub async fn create_from_file(
filename: &str,
dataset: &Dataset,
fragment_id: usize,
physical_rows: Option<usize>,
) -> Result<Fragment> {
let filepath = dataset.data_dir().child(filename);
let file_version =
determine_file_version(dataset.object_store.as_ref(), &filepath, None).await?;
if file_version != dataset.manifest.data_storage_format.lance_file_version()? {
return Err(Error::invalid_input(format!(
"File version mismatch. Dataset version: {:?} Fragment version: {:?}",
dataset.manifest.data_storage_format.lance_file_version()?,
file_version
)));
}
if file_version == LanceFileVersion::Legacy {
let fragment = Fragment::with_file_legacy(
fragment_id as u64,
filename,
dataset.schema(),
physical_rows,
);
Ok(fragment)
} else {
let mut frag = Fragment::new(fragment_id as u64);
let scheduler = ScanScheduler::new(
dataset.object_store.clone(),
SchedulerConfig::max_bandwidth(&dataset.object_store),
);
let file_scheduler = scheduler
.open_file(&filepath, &CachedFileSize::unknown())
.await?;
let reader = lance_file::reader::FileReader::try_open(
file_scheduler,
None,
Arc::<DecoderPlugins>::default(),
&dataset.metadata_cache.file_metadata_cache(&filepath),
dataset.file_reader_options.clone().unwrap_or_default(),
)
.await?;
reader
.schema()
.check_compatible(dataset.schema(), &SchemaCompareOptions::default())?;
let projection = lance_file::reader::ReaderProjection::from_whole_schema(
dataset.schema(),
reader.metadata().version(),
);
let physical_rows = reader.metadata().num_rows as usize;
frag.physical_rows = Some(physical_rows);
frag.id = fragment_id as u64;
let column_indices = projection
.column_indices
.into_iter()
.map(|c| c as i32)
.collect();
frag.add_file(
filename,
dataset.schema().field_ids(),
column_indices,
&file_version,
None,
);
Ok(frag)
}
}
pub(crate) async fn storage_stats(
&self,
dataset_schema: &Schema,
scan_scheduler: Arc<ScanScheduler>,
) -> Result<Vec<(u32, u64)>> {
let mut stats = Vec::new();
for reader in self
.open_readers(
dataset_schema,
&FragReadConfig::default().with_scan_scheduler(scan_scheduler),
)
.await?
{
stats.extend(reader.storage_stats());
}
Ok(stats)
}
pub fn dataset(&self) -> &Dataset {
self.dataset.as_ref()
}
pub fn schema(&self) -> &Schema {
self.dataset.schema()
}
pub fn metadata(&self) -> &Fragment {
&self.metadata
}
pub fn id(&self) -> usize {
self.metadata.id as usize
}
pub fn num_data_files(&self) -> usize {
self.metadata.files.len()
}
pub fn data_file_for_field(&self, field_id: u32) -> Option<&DataFile> {
self.metadata
.files
.iter()
.find(|f| f.fields.contains(&(field_id as i32)))
}
pub async fn open(
&self,
projection: &Schema,
read_config: FragReadConfig,
) -> Result<FragmentReader> {
let open_files = self.open_readers(projection, &read_config);
let deletion_vec_load = self.get_deletion_vector();
let row_id_load = if self.dataset.manifest.uses_stable_row_ids() {
futures::future::Either::Left(
load_row_id_sequence(&self.dataset, &self.metadata).map_ok(Some),
)
} else {
futures::future::Either::Right(futures::future::ready(Ok(None)))
};
let (opened_files, deletion_vec, row_id_sequence) =
join!(open_files, deletion_vec_load, row_id_load);
let opened_files = opened_files?;
let deletion_vec = deletion_vec?;
let row_id_sequence = row_id_sequence?;
if opened_files.is_empty() && !read_config.has_system_cols() {
return Err(Error::not_found(format!(
"No data files found for schema: {}, fragment_id={}",
projection,
self.id()
)));
}
let num_physical_rows = self.physical_rows().await?;
let mut reader = FragmentReader::try_new(
self.id(),
deletion_vec,
row_id_sequence,
opened_files,
ArrowSchema::from(projection),
self.count_rows(None).await?,
num_physical_rows,
Arc::new(self.metadata.clone()),
)?;
if read_config.with_row_id {
reader.with_row_id();
}
if read_config.with_row_address {
reader.with_row_address();
}
if read_config.with_row_last_updated_at_version {
reader.with_row_last_updated_at_version();
}
if read_config.with_row_created_at_version {
reader.with_row_created_at_version();
}
Ok(reader)
}
fn get_field_id_offset(data_file: &DataFile) -> u32 {
data_file.fields.first().copied().unwrap_or(0) as u32
}
async fn open_reader(
&self,
data_file: &DataFile,
projection: Option<&Schema>,
read_config: &FragReadConfig,
) -> Result<Option<Box<dyn GenericFileReader>>> {
let full_schema = self.dataset.schema();
let data_file_schema = data_file.schema(full_schema);
let projection = projection.unwrap_or(full_schema);
let schema_per_file = Arc::new(projection.intersection_ignore_types(&data_file_schema)?);
if data_file.is_legacy_file() {
let max_field_id = data_file.fields.iter().max().unwrap();
if !schema_per_file.fields.is_empty() {
let path = self
.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str());
let field_id_offset = Self::get_field_id_offset(data_file);
let reader = PreviousFileReader::try_new_with_fragment_id(
&self.dataset.object_store,
&path,
self.schema().clone(),
self.id() as u32,
field_id_offset as i32,
*max_field_id,
Some(&self.dataset.metadata_cache.file_metadata_cache(&path)),
)
.await?;
let initialized_schema = reader.schema().project_by_schema(
schema_per_file.as_ref(),
OnMissing::Error,
OnTypeMismatch::Error,
)?;
let reader = V1Reader::new(reader, Arc::new(initialized_schema));
Ok(Some(Box::new(reader)))
} else {
Ok(None)
}
} else if schema_per_file.fields.is_empty() {
Ok(None)
} else {
let path = self
.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str());
let (store_scheduler, reader_priority) = if let Some(base_id) = data_file.base_id {
let object_store = self.dataset.object_store_for_base(base_id).await?;
let config = SchedulerConfig::max_bandwidth(&object_store);
(
ScanScheduler::new(object_store, config),
read_config.reader_priority.unwrap_or(0),
)
} else if let Some(scan_scheduler) = read_config.scan_scheduler.as_ref() {
(
scan_scheduler.clone(),
read_config.reader_priority.unwrap_or(0),
)
} else {
(
ScanScheduler::new(
self.dataset.object_store.clone(),
SchedulerConfig::max_bandwidth(&self.dataset.object_store),
),
0,
)
};
let file_scheduler = store_scheduler
.open_file_with_priority(&path, reader_priority as u64, &data_file.file_size_bytes)
.await?;
let file_metadata = self.get_file_metadata(&file_scheduler).await?;
let path = file_scheduler.reader().path().clone();
let metadata_cache = self.dataset.metadata_cache.file_metadata_cache(&path);
let reader = Arc::new(
lance_file::reader::FileReader::try_open_with_file_metadata(
Arc::new(LanceEncodingsIo::new(file_scheduler.clone())),
path,
None,
Arc::<DecoderPlugins>::default(),
file_metadata,
&metadata_cache,
read_config
.file_reader_options
.clone()
.or_else(|| self.dataset.file_reader_options.clone())
.unwrap_or_default(),
)
.await?,
);
let field_id_to_column_idx = Arc::new(BTreeMap::from_iter(
data_file
.fields
.iter()
.copied()
.zip(data_file.column_indices.iter().copied())
.filter_map(|(field_id, column_index)| {
if column_index < 0 {
None
} else {
Some((field_id as u32, column_index as u32))
}
}),
));
let reader = v2_adapter::Reader::new(
reader,
schema_per_file,
field_id_to_column_idx,
reader_priority,
file_scheduler,
);
Ok(Some(Box::new(reader)))
}
}
async fn open_readers(
&self,
projection: &Schema,
read_config: &FragReadConfig,
) -> Result<Vec<Box<dyn GenericFileReader>>> {
let mut opened_files = vec![];
for data_file in &self.metadata.files {
if let Some(reader) = self
.open_reader(data_file, Some(projection), read_config)
.await?
{
opened_files.push(reader);
}
}
let num_rows = self.physical_rows().await?;
let field_ids_in_files = opened_files
.iter()
.flat_map(|r| r.projection().fields_pre_order().map(|f| f.id))
.filter(|id| *id >= 0)
.collect::<HashSet<_>>();
let mut missing_fields = projection.field_ids();
missing_fields.retain(|f| !field_ids_in_files.contains(f) && *f >= 0);
if !missing_fields.is_empty() {
let missing_projection = projection.project_by_ids(&missing_fields, true);
let null_reader = NullReader::new(Arc::new(missing_projection), num_rows as u32);
opened_files.push(Box::new(null_reader));
}
Ok(opened_files)
}
pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
match filter {
Some(expr) => self
.scan()
.project(&Vec::<String>::default())
.unwrap()
.with_row_id()
.filter(&expr)?
.count_rows()
.await
.map(|v| v as usize),
None => {
let total_rows = self.physical_rows();
let deletion_count = self.count_deletions();
let (total_rows, deletion_count) =
futures::future::try_join(total_rows, deletion_count).await?;
Ok(total_rows - deletion_count)
}
}
}
pub async fn count_deletions(&self) -> Result<usize> {
match &self.metadata().deletion_file {
Some(DeletionFile {
num_deleted_rows: Some(num_deleted),
..
}) => Ok(*num_deleted),
_ => {
let deleletion_vector = self.get_deletion_vector().await?;
if let Some(deletion_vector) = deleletion_vector {
Ok(deletion_vector.len())
} else {
Ok(0)
}
}
}
}
pub fn fast_physical_rows(&self) -> Result<usize> {
if self.dataset.manifest.writer_version.is_some() {
let Some(physical_rows) = self.metadata.physical_rows else {
return Err(Error::internal(format!(
"The method fast_physical_rows was called on a fragment that does not have the physical row count in the metadata. Fragment id: {}",
self.id()
)));
};
Ok(physical_rows)
} else {
Err(Error::internal(format!(
"The method fast_physical_rows was called on a fragment that does not have the physical row count in the metadata. Fragment id: {}",
self.id()
)))
}
}
pub fn fast_num_deletions(&self) -> Result<usize> {
match &self.metadata().deletion_file {
Some(DeletionFile {
num_deleted_rows: Some(num_deleted),
..
}) => Ok(*num_deleted),
None => Ok(0),
_ => Err(Error::internal(format!(
"The method fast_num_deletions was called on a fragment that does not have the deletion count in the metadata. Fragment id: {}",
self.id()
))),
}
}
pub fn fast_logical_rows(&self) -> Result<usize> {
let num_physical_rows = self.fast_physical_rows()?;
let num_deleted_rows = self.fast_num_deletions()?;
Ok(num_physical_rows - num_deleted_rows)
}
pub async fn physical_rows(&self) -> Result<usize> {
if self.metadata.files.is_empty() {
return Err(Error::not_found(format!(
"Fragment {} does not contain any data",
self.id()
)));
};
if self.dataset.manifest.writer_version.is_some()
&& let Some(physical_rows) = self.metadata.physical_rows
{
return Ok(physical_rows);
}
let some_file = &self.metadata.files[0];
let reader = self
.open_reader(some_file, None, &FragReadConfig::default())
.await?
.ok_or_else(|| {
Error::internal(format!(
"The data file {} did not have any fields contained in the dataset schema",
some_file.path
))
})?;
Ok(reader.len() as usize)
}
pub async fn validate(&self) -> Result<()> {
let mut seen_fields = HashSet::new();
for data_file in &self.metadata.files {
let last = -1;
for field_id in &data_file.fields {
if *field_id <= last {
return Err(Error::corrupt_file(
self.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str()),
format!(
"Field id {} is not in increasing order in fragment {:#?}",
field_id, self
),
));
}
if !seen_fields.insert(field_id) {
return Err(Error::corrupt_file(
self.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str()),
format!(
"Field id {} is duplicated in fragment {:#?}",
field_id, self
),
));
}
}
}
if self.metadata.files.iter().any(|f| f.is_legacy_file())
!= self.metadata.files.iter().all(|f| f.is_legacy_file())
{
return Err(Error::corrupt_file(
self.dataset
.data_file_dir(&self.metadata.files[0])?
.child(self.metadata.files[0].path.as_str()),
"Fragment contains a mix of v1 and v2 data files".to_string(),
));
}
for data_file in &self.metadata.files {
data_file.validate(&self.dataset.data_file_dir(&self.metadata.files[0])?)?;
}
let get_lengths = self.metadata.files.iter().map(|data_file| async move {
let data_file_dir = self.dataset.data_file_dir(data_file)?;
let reader = self
.open_reader(data_file, None, &FragReadConfig::default())
.await?
.ok_or_else(|| {
Error::corrupt_file(
data_file_dir.child(data_file.path.as_str()),
"did not have any fields in common with the dataset schema",
)
})?;
Result::Ok(reader.len() as usize)
});
let get_lengths = try_join_all(get_lengths);
let deletion_vector = self.get_deletion_vector();
let (get_lengths, deletion_vector) = join!(get_lengths, deletion_vector);
let get_lengths = get_lengths?;
let expected_length = get_lengths.first().unwrap_or(&0);
for (length, data_file) in get_lengths.iter().zip(self.metadata.files.iter()) {
if length != expected_length {
let path = self
.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str());
return Err(Error::corrupt_file(
path,
format!(
"data file has incorrect length. Expected: {} Got: {}",
expected_length, length
),
));
}
}
if let Some(physical_rows) = self.metadata.physical_rows
&& physical_rows != *expected_length
{
return Err(Error::corrupt_file(
self.dataset
.data_file_dir(&self.metadata.files[0])?
.child(self.metadata.files[0].path.as_str()),
format!(
"Fragment metadata has incorrect physical_rows. Actual: {} Metadata: {}",
expected_length, physical_rows
),
));
}
if let Some(deletion_vector) = deletion_vector? {
if let Some(num_deletions) = self
.metadata
.deletion_file
.as_ref()
.unwrap()
.num_deleted_rows
&& num_deletions != deletion_vector.len()
{
return Err(Error::corrupt_file(
deletion_file_path(
&self.dataset.base,
self.metadata.id,
self.metadata.deletion_file.as_ref().unwrap(),
),
format!(
"deletion vector length does not match metadata. Metadata: {} Deletion vector: {}",
num_deletions,
deletion_vector.len()
),
));
}
for offset in deletion_vector.iter() {
if offset >= *expected_length as u32 {
let deletion_file_meta = self.metadata.deletion_file.as_ref().unwrap();
return Err(Error::corrupt_file(
deletion_file_path(
&self.dataset.base,
self.metadata.id,
deletion_file_meta,
),
format!(
"deletion vector contains an offset that is out of range. Offset: {} Fragment length: {}",
offset, expected_length
),
));
}
}
}
Ok(())
}
pub async fn open_session(
&self,
projection: &Schema,
with_row_address: bool,
) -> Result<FragmentSession> {
FragmentSession::open(Arc::new(self.clone()), projection, with_row_address).await
}
pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
let deletion_vector = self.get_deletion_vector().await?;
let row_ids = if let Some(deletion_vector) = deletion_vector {
let mut sorted_deleted_ids = deletion_vector
.as_ref()
.clone()
.into_iter()
.collect::<Vec<_>>();
sorted_deleted_ids.sort();
Cow::Owned(resolve_actual_row_ids(indices, &sorted_deleted_ids))
} else {
Cow::Borrowed(indices)
};
self.take_rows(&row_ids, projection, false, false, false, false)
.await
}
pub async fn get_deletion_vector(&self) -> Result<Option<Arc<DeletionVector>>> {
let Some(deletion_file) = self.metadata.deletion_file.as_ref() else {
return Ok(None);
};
let deletion_vector =
read_dataset_deletion_file(&self.dataset, self.id() as u64, deletion_file).await?;
Ok(Some(deletion_vector))
}
async fn get_file_metadata(
&self,
file_scheduler: &FileScheduler,
) -> Result<Arc<CachedFileMetadata>> {
let path = file_scheduler.reader().path();
let cache = self.dataset.metadata_cache.file_metadata_cache(path);
let file_metadata = cache
.get_or_insert_with_key(FileMetadataCacheKey, || async {
let file_metadata: CachedFileMetadata =
lance_file::reader::FileReader::read_all_metadata(file_scheduler).await?;
Ok(file_metadata)
})
.await?;
Ok(file_metadata)
}
pub(crate) async fn take_rows(
&self,
row_offsets: &[u32],
projection: &Schema,
with_row_id: bool,
with_row_address: bool,
with_row_created_at_version: bool,
with_row_last_updated_at_version: bool,
) -> Result<RecordBatch> {
let reader = self
.open(
projection,
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address)
.with_row_created_at_version(with_row_created_at_version)
.with_row_last_updated_at_version(with_row_last_updated_at_version),
)
.await?;
if row_offsets.len() > 1 && Self::row_ids_contiguous(row_offsets) {
let range =
(row_offsets[0] as usize)..(row_offsets[row_offsets.len() - 1] as usize + 1);
reader.legacy_read_range_as_batch(range).await
} else {
reader.take_as_batch(row_offsets, None).await
}
}
fn row_ids_contiguous(row_ids: &[u32]) -> bool {
if row_ids.is_empty() {
return false;
}
let mut last_id = row_ids[0];
for id in row_ids.iter().skip(1) {
if *id != last_id + 1 {
return false;
}
last_id = *id;
}
true
}
pub fn scan(&self) -> Scanner {
Scanner::from_fragment(self.dataset.clone(), self.metadata.clone())
}
pub(crate) async fn updater<T: AsRef<str>>(
&self,
columns: Option<&[T]>,
schemas: Option<(Schema, Schema)>,
batch_size: Option<u32>,
) -> Result<Updater> {
let mut schema = self.dataset.schema().clone();
let mut with_row_addr = false;
let mut with_row_id = false;
if let Some(columns) = columns {
let mut projection = Vec::new();
for column in columns {
if column.as_ref() == ROW_ADDR {
with_row_addr = true;
} else if column.as_ref() == ROW_ID {
with_row_id = true;
} else {
projection.push(column.as_ref());
}
}
schema = schema.project(&projection)?;
}
with_row_addr |= !with_row_id && schema.fields.is_empty();
let reader = self.open(
&schema,
FragReadConfig::default()
.with_row_address(with_row_addr)
.with_row_id(with_row_id),
);
let deletion_vector = self.get_deletion_vector();
let (reader, deletion_vector) = join!(reader, deletion_vector);
let reader = reader?;
let deletion_vector = deletion_vector?.unwrap_or_default().as_ref().clone();
Updater::try_new(self.clone(), reader, deletion_vector, schemas, batch_size)
}
pub async fn merge_columns(
&mut self,
stream: impl RecordBatchReader + Send + 'static,
left_on: &str,
right_on: &str,
max_field_id: i32,
) -> Result<(Fragment, Schema)> {
let stream = Box::new(stream);
if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the left side fragment",
left_on
)));
};
let right_schema = stream.schema();
if right_schema.field_with_name(right_on).is_err() {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the right side fragment",
right_on
)));
};
for field in right_schema.fields() {
if field.name() == right_on {
continue;
}
if self.schema().field(field.name()).is_some() {
return Err(Error::invalid_input(format!(
"Column {} exists in left side fragment and right side dataset",
field.name()
)));
}
}
let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
let mut new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
new_schema.set_field_id(Some(max_field_id));
let new_fragment = self
.clone()
.merge(left_on, &joiner)
.await
.map(|f| f.metadata)?;
Ok((new_fragment, new_schema))
}
pub(crate) async fn merge(mut self, join_column: &str, joiner: &HashJoiner) -> Result<Self> {
let mut updater = self.updater(Some(&[join_column]), None, None).await?;
while let Some(batch) = updater.next().await? {
let batch = joiner
.collect(&self.dataset, batch[join_column].clone())
.await?;
updater.update(batch).await?;
}
self.metadata = updater.finish().await?;
Ok(self)
}
pub async fn update_columns(
&mut self,
right_stream: impl RecordBatchReader + Send + 'static,
left_on: &str,
right_on: &str,
) -> Result<(Fragment, Vec<u32>)> {
if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the left side fragment",
left_on
)));
};
let right_stream = Box::new(right_stream);
let right_schema = right_stream.schema();
if right_schema.field_with_name(right_on).is_err() {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the right side fragment",
right_on
)));
};
let write_schema = right_schema.as_ref().without_column(right_on);
for field in write_schema.fields() {
if ROW_ID.eq(field.name()) || ROW_ADDR.eq(field.name()) {
return Err(Error::invalid_input(format!(
"Column {} is a reversed metadata column and cannot be updated",
field.name()
)));
}
if self.schema().field(field.name()).is_none() {
return Err(Error::invalid_input(format!(
"Column {} in right side fragment does not exist in left side fragment",
field.name()
)));
}
}
let write_schema = self.schema().project_by_schema(
&write_schema,
OnMissing::Error,
OnTypeMismatch::Error,
)?;
let mut read_columns: Vec<String> =
write_schema.fields.iter().map(|f| f.name.clone()).collect();
read_columns.push(left_on.to_string());
let mut updater = self
.updater(
Some(&read_columns),
Some((write_schema.clone(), self.schema().clone())),
None,
)
.await?;
let joiner = Arc::new(HashJoiner::try_new(right_stream, right_on).await?);
while let Some(batch) = updater.next().await? {
let updated_batch = joiner
.collect_with_fallback(batch, batch[left_on].clone(), self.dataset())
.await?;
updater.update(updated_batch).await?;
}
let mut updated_fragment = updater.finish().await?;
let updated_fields = updated_fragment.files.last().unwrap().fields.clone();
for data_file in &mut updated_fragment.files.iter_mut().rev().skip(1) {
for field in &mut data_file.fields {
if updated_fields.contains(field) {
*field = -2;
}
}
}
updated_fragment
.files
.retain(|data_file| data_file.fields.iter().any(|&field| field != -2));
let updated_fields = updated_fields
.iter()
.filter_map(|&i| u32::try_from(i).ok())
.collect();
Ok((updated_fragment, updated_fields))
}
pub async fn add_columns(
&self,
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
batch_size: Option<u32>,
) -> Result<(Fragment, Schema)> {
let (fragments, schema) = schema_evolution::add_columns_to_fragments(
self.dataset.as_ref(),
transforms,
read_columns,
std::slice::from_ref(self),
batch_size,
)
.await?;
assert_eq!(fragments.len(), 1);
Ok((fragments.into_iter().next().unwrap(), schema))
}
pub async fn delete(self, predicate: &str) -> Result<Option<Self>> {
let mut deletion_vector = self
.get_deletion_vector()
.await?
.unwrap_or_default()
.as_ref()
.clone();
let starting_length = deletion_vector.len();
let mut scanner = self.scan();
let predicate_lower = predicate.trim().to_lowercase();
if predicate_lower == "true" {
return Ok(None);
} else if predicate_lower == "false" {
return Ok(Some(self));
}
scanner
.with_row_address()
.filter(predicate)?
.project::<&str>(&[])?;
if let Some(predicate) = &scanner.get_expr_filter()? {
if matches!(
predicate,
Expr::Literal(ScalarValue::Boolean(Some(false)), _)
) {
return Ok(Some(self));
}
if matches!(
predicate,
Expr::Literal(ScalarValue::Boolean(Some(true)), _)
) {
return Ok(None);
}
}
scanner
.try_into_stream()
.await?
.try_for_each(|batch| {
let array = batch[ROW_ADDR].clone();
let int_array: &UInt64Array = as_primitive_array(array.as_ref());
let local_row_ids = int_array.values().iter().map(|v| *v as u32);
deletion_vector.extend(local_row_ids);
futures::future::ready(Ok(()))
})
.await?;
if deletion_vector.len() == starting_length {
return Ok(Some(self));
}
self.write_deletions(deletion_vector).await
}
pub async fn extend_deletions(
self,
new_deletions: impl IntoIterator<Item = u32>,
) -> Result<Option<Self>> {
let mut deletion_vector = self
.get_deletion_vector()
.await?
.unwrap_or_default()
.as_ref()
.clone();
deletion_vector.extend(new_deletions);
self.write_deletions(deletion_vector).await
}
async fn write_deletions(mut self, deletion_vector: DeletionVector) -> Result<Option<Self>> {
let physical_rows = self.physical_rows().await?;
if deletion_vector.len() == physical_rows
&& deletion_vector.contains_range(0..physical_rows as u32)
{
return Ok(None);
} else if deletion_vector.len() >= physical_rows {
let dv_len = deletion_vector.len();
let examples: Vec<u32> = deletion_vector
.into_iter()
.filter(|x| *x >= physical_rows as u32)
.take(5)
.collect();
return Err(Error::internal(format!(
"Deletion vector includes rows that aren't in the fragment. \
Num physical rows {}; Deletion vector length: {}; \
Examples: {:?}",
physical_rows, dv_len, examples
)));
}
self.metadata.deletion_file = write_deletion_file(
&self.dataset.base,
self.metadata.id,
self.dataset.version().version,
&deletion_vector,
self.dataset.object_store(),
)
.await?;
Ok(Some(self))
}
}
pub(crate) fn resolve_actual_row_ids(row_ids: &[u32], sorted_deleted_ids: &[u32]) -> Vec<u32> {
let mut row_ids = row_ids.to_vec();
for row_id in row_ids.iter_mut() {
let mut new_row_id = *row_id;
let offset = sorted_deleted_ids.partition_point(|v| *v <= new_row_id);
let mut deletion_i = offset;
let mut i = 0;
while i < offset {
new_row_id += 1;
while deletion_i < sorted_deleted_ids.len()
&& sorted_deleted_ids[deletion_i] == new_row_id
{
deletion_i += 1;
new_row_id += 1;
}
i += 1;
}
*row_id = new_row_id;
}
row_ids
}
#[derive(Debug, Clone)]
struct FileMetadataCacheKey;
impl CacheKey for FileMetadataCacheKey {
type ValueType = CachedFileMetadata;
fn key(&self) -> std::borrow::Cow<'_, str> {
"".into()
}
}
impl From<FileFragment> for Fragment {
fn from(fragment: FileFragment) -> Self {
fragment.metadata
}
}
#[derive(Debug)]
pub struct FragmentReader {
readers: Vec<Box<dyn GenericFileReader>>,
output_schema: ArrowSchema,
deletion_vec: Option<Arc<DeletionVector>>,
row_id_sequence: Option<Arc<RowIdSequence>>,
fragment_id: usize,
with_row_id: bool,
with_row_addr: bool,
with_row_last_updated_at_version: bool,
with_row_created_at_version: bool,
make_deletions_null: bool,
fragment: Arc<Fragment>,
last_updated_at_sequence: Option<Arc<lance_table::rowids::version::RowDatasetVersionSequence>>,
created_at_sequence: Option<Arc<lance_table::rowids::version::RowDatasetVersionSequence>>,
num_rows: usize,
num_physical_rows: usize,
}
impl Clone for FragmentReader {
fn clone(&self) -> Self {
Self {
readers: self
.readers
.iter()
.map(|reader| reader.clone_box())
.collect::<Vec<_>>(),
output_schema: self.output_schema.clone(),
deletion_vec: self.deletion_vec.clone(),
row_id_sequence: self.row_id_sequence.clone(),
fragment_id: self.fragment_id,
with_row_id: self.with_row_id,
with_row_addr: self.with_row_addr,
with_row_last_updated_at_version: self.with_row_last_updated_at_version,
with_row_created_at_version: self.with_row_created_at_version,
make_deletions_null: self.make_deletions_null,
fragment: self.fragment.clone(),
last_updated_at_sequence: self.last_updated_at_sequence.clone(),
created_at_sequence: self.created_at_sequence.clone(),
num_rows: self.num_rows,
num_physical_rows: self.num_physical_rows,
}
}
}
impl std::fmt::Display for FragmentReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "FragmentReader(id={})", self.fragment_id)
}
}
fn merge_batches(batches: &[RecordBatch]) -> Result<RecordBatch> {
if batches.is_empty() {
return Err(Error::invalid_input(
"Cannot merge empty batches".to_string(),
));
}
let mut merged = batches[0].clone();
for batch in batches.iter().skip(1) {
merged = merged.merge(batch)?;
}
Ok(merged)
}
impl FragmentReader {
#[allow(clippy::too_many_arguments)]
fn try_new(
fragment_id: usize,
deletion_vec: Option<Arc<DeletionVector>>,
row_id_sequence: Option<Arc<RowIdSequence>>,
readers: Vec<Box<dyn GenericFileReader>>,
output_schema: ArrowSchema,
num_rows: usize,
num_physical_rows: usize,
fragment: Arc<Fragment>,
) -> Result<Self> {
if let Some(legacy_reader) = readers.first().and_then(|reader| reader.as_legacy_opt()) {
let num_batches = legacy_reader.num_batches();
for reader in readers.iter().skip(1) {
if let Some(other_legacy) = reader.as_legacy_opt() {
if other_legacy.num_batches() != num_batches {
return Err(Error::invalid_input("Cannot create FragmentReader from data files with different number of batches"
.to_string()));
}
} else {
return Err(Error::invalid_input(
"Cannot mix legacy and non-legacy readers".to_string(),
));
}
}
}
Ok(Self {
readers,
output_schema,
deletion_vec,
row_id_sequence,
fragment_id,
with_row_id: false,
with_row_addr: false,
with_row_last_updated_at_version: false,
with_row_created_at_version: false,
make_deletions_null: false,
fragment,
last_updated_at_sequence: None,
created_at_sequence: None,
num_rows,
num_physical_rows,
})
}
pub(crate) fn with_row_id(&mut self) -> &mut Self {
self.with_row_id = true;
self.output_schema = self
.output_schema
.try_with_column(ROW_ID_FIELD.clone())
.expect("Table already has a column named _rowid");
self
}
pub(crate) fn with_row_address(&mut self) -> &mut Self {
self.with_row_addr = true;
self.output_schema = self
.output_schema
.try_with_column(ROW_ADDR_FIELD.clone())
.expect("Table already has a column named _rowaddr");
self
}
pub(crate) fn with_make_deletions_null(&mut self) -> &mut Self {
self.make_deletions_null = true;
self
}
pub(crate) fn with_row_last_updated_at_version(&mut self) -> &mut Self {
self.with_row_last_updated_at_version = true;
if self.last_updated_at_sequence.is_none()
&& let Some(meta) = &self.fragment.last_updated_at_version_meta
&& let Ok(sequence) = meta.load_sequence()
{
self.last_updated_at_sequence = Some(Arc::new(sequence));
}
self.output_schema = self
.output_schema
.try_with_column(ROW_LAST_UPDATED_AT_VERSION_FIELD.clone())
.expect("Table already has a column named _row_last_updated_at_version");
self
}
pub(crate) fn with_row_created_at_version(&mut self) -> &mut Self {
self.with_row_created_at_version = true;
if self.created_at_sequence.is_none()
&& let Some(meta) = &self.fragment.created_at_version_meta
&& let Ok(sequence) = meta.load_sequence()
{
self.created_at_sequence = Some(Arc::new(sequence));
}
self.output_schema = self
.output_schema
.try_with_column(ROW_CREATED_AT_VERSION_FIELD.clone())
.expect("Table already has a column named _row_created_at_version");
self
}
pub(crate) fn legacy_num_batches(&self) -> usize {
let legacy_reader = self.readers[0].as_legacy();
let num_batches = legacy_reader.num_batches();
assert!(
self.readers
.iter()
.all(|r| r.as_legacy().num_batches() == num_batches),
"Data files have varying number of batches, which is not yet supported."
);
num_batches
}
pub(crate) fn legacy_num_rows_in_batch(&self, batch_id: u32) -> Option<u32> {
if let Some(legacy_reader) = self.readers.first().and_then(|r| r.as_legacy_opt()) {
if batch_id < legacy_reader.num_batches() as u32 {
Some(legacy_reader.num_rows_in_batch(batch_id as i32) as u32)
} else {
None
}
} else {
None
}
}
pub(crate) async fn legacy_read_page_stats(
&self,
projection: Option<&Schema>,
) -> Result<Option<RecordBatch>> {
let mut stats_batches = vec![];
for reader in self.readers.iter() {
let schema = match projection {
Some(projection) => Arc::new(reader.projection().intersection(projection)?),
None => reader.projection().clone(),
};
let reader = reader.as_legacy();
if let Some(stats_batch) = reader.read_page_stats(&schema.field_ids()).await? {
stats_batches.push(stats_batch);
}
}
if stats_batches.is_empty() {
Ok(None)
} else {
Ok(Some(merge_batches(&stats_batches)?))
}
}
pub(crate) async fn legacy_read_batch_projected(
&self,
batch_id: usize,
params: impl Into<ReadBatchParams> + Clone,
projection: &Schema,
) -> Result<RecordBatch> {
let first_reader = self.readers[0].as_legacy();
let batch_offset = batch_id * first_reader.num_rows_in_batch(0);
let rows_in_batch = first_reader.num_rows_in_batch(batch_id as i32);
let batches = if !projection.fields.is_empty() {
let read_tasks = self.readers.iter().map(|reader| {
let projection = reader.projection().intersection(projection);
let params = params.clone();
let reader = reader.as_legacy();
async move {
let projection = projection?;
if projection.fields.is_empty() {
Result::Ok(None)
} else {
Ok(Some(
reader
.read_batch(batch_id as i32, params, &projection)
.await?,
))
}
}
});
let results = try_join_all(read_tasks).await?;
results.into_iter().flatten().collect::<Vec<RecordBatch>>()
} else {
let expected_rows = params
.clone()
.into()
.slice(0, rows_in_batch)
.unwrap()
.to_offsets()?
.len();
vec![RecordBatch::from(StructArray::new_empty_fields(
expected_rows,
None,
))]
};
let params = params.into();
let result = merge_batches(&batches)?;
let file_params = match params {
ReadBatchParams::Indices(indices) => ReadBatchParams::Indices(
indices
.values()
.iter()
.map(|i| *i + batch_offset as u32)
.collect(),
),
ReadBatchParams::Ranges(_) => {
return Err(Error::internal(
"ReadBatchParams::Ranges should not be used in v1 files".to_string(),
));
}
ReadBatchParams::RangeFull => {
ReadBatchParams::Range(batch_offset..(batch_offset + rows_in_batch))
}
ReadBatchParams::RangeFrom(start) => {
ReadBatchParams::Range((start.start + batch_offset)..(batch_offset + rows_in_batch))
}
ReadBatchParams::RangeTo(end) => {
ReadBatchParams::Range(batch_offset..(end.end + batch_offset))
}
ReadBatchParams::Range(range) => {
ReadBatchParams::Range((range.start + batch_offset)..(range.end + batch_offset))
}
};
let result = lance_table::utils::stream::apply_row_id_and_deletes(
result,
0,
self.fragment_id as u32,
&RowIdAndDeletesConfig {
params: file_params,
deletion_vector: self.deletion_vec.clone(),
row_id_sequence: self.row_id_sequence.clone(),
with_row_id: self.with_row_id,
with_row_addr: self.with_row_addr,
with_row_last_updated_at_version: self.with_row_last_updated_at_version,
with_row_created_at_version: self.with_row_created_at_version,
last_updated_at_sequence: self.last_updated_at_sequence.clone(),
created_at_sequence: self.created_at_sequence.clone(),
make_deletions_null: self.make_deletions_null,
total_num_rows: first_reader.len() as u32,
},
)?;
let output_schema = {
let mut output_schema = ArrowSchema::from(projection);
if self.with_row_id {
output_schema = output_schema.try_with_column(ROW_ID_FIELD.clone())?;
}
if self.with_row_addr {
output_schema = output_schema.try_with_column(ROW_ADDR_FIELD.clone())?;
}
output_schema
};
Ok(result.project_by_schema(&output_schema)?)
}
fn new_read_impl(
&self,
params: ReadBatchParams,
batch_size: u32,
read_fn: impl Fn(&dyn GenericFileReader) -> Result<ReadBatchTaskStream>,
) -> Result<ReadBatchFutStream> {
let total_num_rows = self.num_physical_rows as u32;
if !params.valid_given_len(total_num_rows as usize) {
return Err(Error::invalid_input(format!(
"Invalid read params {} for fragment with {} addressable rows",
params, total_num_rows
)));
}
let merged = if self.num_system_cols() == self.output_schema.fields.len() {
let selected_rows = params.to_offsets_total(total_num_rows).len();
let tasks = (0..selected_rows)
.step_by(batch_size as usize)
.map(move |offset| {
let num_rows = (batch_size as usize).min(selected_rows - offset);
let batch = RecordBatch::from(StructArray::new_empty_fields(num_rows, None));
ReadBatchTask {
task: std::future::ready(Ok(batch)).boxed(),
num_rows: num_rows as u32,
}
});
stream::iter(tasks).boxed()
} else {
let read_streams = self
.readers
.iter()
.filter_map(|reader| {
if reader.projection().fields.is_empty() {
None
} else {
Some(read_fn(reader.as_ref()))
}
})
.collect::<Result<Vec<_>>>()?;
lance_table::utils::stream::merge_streams(read_streams)
};
let config = RowIdAndDeletesConfig {
deletion_vector: self.deletion_vec.clone(),
row_id_sequence: self.row_id_sequence.clone(),
make_deletions_null: self.make_deletions_null,
with_row_id: self.with_row_id,
with_row_addr: self.with_row_addr,
with_row_last_updated_at_version: self.with_row_last_updated_at_version,
with_row_created_at_version: self.with_row_created_at_version,
last_updated_at_sequence: self.last_updated_at_sequence.clone(),
created_at_sequence: self.created_at_sequence.clone(),
params,
total_num_rows,
};
let output_schema = Arc::new(self.output_schema.clone());
Ok(
wrap_with_row_id_and_delete(merged, self.fragment_id as u32, config)
.map(move |batch_fut| {
let output_schema = output_schema.clone();
batch_fut
.map(move |batch| {
batch?
.project_by_schema(&output_schema)
.map_err(Error::from)
})
.boxed()
})
.boxed(),
)
}
fn patch_range_for_deletions(&self, range: Range<u32>, dv: &DeletionVector) -> Range<u32> {
let mut start = range.start;
let mut end = range.end;
for val in dv.to_sorted_iter() {
if val <= start {
start += 1;
end += 1;
} else if val < end {
end += 1;
} else {
break;
}
}
start..end
}
fn do_read_range(
&self,
mut range: Range<u32>,
batch_size: u32,
skip_deleted_rows: bool,
) -> Result<ReadBatchFutStream> {
if skip_deleted_rows && let Some(deletion_vector) = self.deletion_vec.as_ref() {
range = self.patch_range_for_deletions(range, deletion_vector.as_ref());
}
self.new_read_impl(
ReadBatchParams::Range(range.start as usize..range.end as usize),
batch_size,
move |reader| {
reader.read_range_tasks(
range.start as u64..range.end as u64,
batch_size,
reader.projection().clone(),
)
},
)
}
fn num_system_cols(&self) -> usize {
self.with_row_id as usize
+ self.with_row_addr as usize
+ self.with_row_created_at_version as usize
+ self.with_row_last_updated_at_version as usize
}
pub fn read_range(&self, range: Range<u32>, batch_size: u32) -> Result<ReadBatchFutStream> {
self.do_read_range(range, batch_size, true)
}
pub fn take_range(&self, range: Range<u32>, batch_size: u32) -> Result<ReadBatchFutStream> {
self.do_read_range(range, batch_size, false)
}
pub fn read_all(&self, batch_size: u32) -> Result<ReadBatchFutStream> {
self.new_read_impl(ReadBatchParams::RangeFull, batch_size, move |reader| {
reader.read_all_tasks(batch_size, reader.projection().clone())
})
}
pub fn read_ranges(
&self,
ranges: Arc<[Range<u64>]>,
batch_size: u32,
) -> Result<ReadBatchFutStream> {
let total_num_rows = self.num_physical_rows as u32;
let mut num_requested_rows = 0;
for range in ranges.as_ref() {
if range.end > total_num_rows as u64 {
return Err(Error::internal(format!(
"Invalid read of range {:?} for fragment {} with {} addressable rows",
range, self.fragment_id, total_num_rows
)));
}
num_requested_rows += range.end - range.start;
}
let merged_stream = if self.num_system_cols() == self.output_schema.fields.len() {
let tasks = (0..num_requested_rows)
.step_by(batch_size as usize)
.map(move |offset| {
let num_rows = (batch_size as u64).min(num_requested_rows - offset);
let batch =
RecordBatch::from(StructArray::new_empty_fields(num_rows as usize, None));
ReadBatchTask {
task: std::future::ready(Ok(batch)).boxed(),
num_rows: num_rows as u32,
}
});
stream::iter(tasks).boxed()
} else {
let read_streams = self
.readers
.iter()
.map(|reader| {
reader.read_ranges_tasks(
ranges.clone(),
batch_size,
reader.projection().clone(),
)
})
.collect::<Result<Vec<_>>>()?;
lance_table::utils::stream::merge_streams(read_streams)
};
let config = RowIdAndDeletesConfig {
deletion_vector: self.deletion_vec.clone(),
row_id_sequence: self.row_id_sequence.clone(),
make_deletions_null: self.make_deletions_null,
with_row_id: self.with_row_id,
with_row_addr: self.with_row_addr,
with_row_last_updated_at_version: self.with_row_last_updated_at_version,
with_row_created_at_version: self.with_row_created_at_version,
last_updated_at_sequence: self.last_updated_at_sequence.clone(),
created_at_sequence: self.created_at_sequence.clone(),
params: ReadBatchParams::Ranges(ranges),
total_num_rows,
};
let output_schema = Arc::new(self.output_schema.clone());
Ok(
wrap_with_row_id_and_delete(merged_stream, self.fragment_id as u32, config)
.map(move |batch_fut| {
let output_schema = output_schema.clone();
batch_fut
.map(move |batch| {
batch?
.project_by_schema(&output_schema)
.map_err(Error::from)
})
.boxed()
})
.boxed(),
)
}
pub async fn legacy_read_range_as_batch(&self, range: Range<usize>) -> Result<RecordBatch> {
let batches = self
.take_range(
range.start as u32..range.end as u32,
DEFAULT_BATCH_READ_SIZE,
)?
.buffered(get_num_compute_intensive_cpus())
.try_collect::<Vec<_>>()
.await?;
concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from)
}
pub async fn take(
&self,
indices: &[u32],
batch_size: u32,
take_priority: Option<u32>,
) -> Result<ReadBatchFutStream> {
let indices_arr = UInt32Array::from(indices.to_vec());
self.new_read_impl(
ReadBatchParams::Indices(indices_arr),
batch_size,
move |reader| {
reader.take_all_tasks(
indices,
batch_size,
reader.projection().clone(),
take_priority,
)
},
)
}
pub async fn take_as_batch(
&self,
indices: &[u32],
take_priority: Option<u32>,
) -> Result<RecordBatch> {
let has_duplicates = indices.windows(2).any(|w| w[0] == w[1]);
let (unique_indices, expand_map) = if has_duplicates {
let mut unique: Vec<u32> = Vec::with_capacity(indices.len());
let mut mapping: Vec<u32> = Vec::with_capacity(indices.len());
for &idx in indices {
if unique.last() != Some(&idx) {
unique.push(idx);
}
mapping.push((unique.len() - 1) as u32);
}
(Cow::Owned(unique), Some(UInt32Array::from(mapping)))
} else {
(Cow::Borrowed(indices), None)
};
let batches = self
.take(&unique_indices, u32::MAX, take_priority)
.await?
.buffered(get_num_compute_intensive_cpus())
.try_collect::<Vec<_>>()
.await?;
let mut batch = concat_batches(&Arc::new(self.output_schema.clone()), batches.iter())?;
if let Some(expand_map) = expand_map {
batch = arrow_select::take::take_record_batch(&batch, &expand_map)?;
}
Ok(batch)
}
}
#[cfg(test)]
mod tests {
use arrow_arith::numeric::mul;
use arrow_array::{
ArrayRef, BooleanArray, Int32Array, Int64Array, RecordBatchIterator, StringArray,
};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use lance_core::ROW_ID;
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::{RowCount, array, gen_batch};
use lance_file::version::LanceFileVersion;
use lance_file::writer::FileWriterOptions;
use lance_io::{assert_io_eq, assert_io_lt, object_store::ObjectStore};
use pretty_assertions::assert_eq;
use rstest::rstest;
use super::*;
use crate::{
dataset::{
InsertBuilder,
transaction::{Operation, UpdateMode},
},
session::Session,
utils::test::TestDatasetGenerator,
};
async fn create_dataset(test_uri: &str, data_storage_version: LanceFileVersion) -> Dataset {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int32, true),
ArrowField::new("s", DataType::Utf8, true),
]));
let batches: Vec<RecordBatch> = (0..10)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
Arc::new(StringArray::from_iter_values(
(i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
)),
],
)
.unwrap()
})
.collect();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
Dataset::open(test_uri).await.unwrap()
}
async fn create_dataset_v2(test_uri: &str) -> Dataset {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
true,
)]));
let batches: Vec<RecordBatch> = (0..10)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20))],
)
.unwrap()
})
.collect();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(LanceFileVersion::Stable),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
Dataset::open(test_uri).await.unwrap()
}
#[rstest]
#[tokio::test]
async fn test_fragment_scan(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = create_dataset(test_uri, data_storage_version).await;
let fragment = &dataset.get_fragments()[2];
let mut scanner = fragment.scan();
let batches = scanner
.with_row_id()
.filter(" i < 105")
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
if data_storage_version == LanceFileVersion::Legacy {
assert_eq!(batches.len(), 3);
assert_eq!(
batches[0].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(80..90)
);
assert_eq!(
batches[1].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(90..100)
);
assert_eq!(
batches[2].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(100..105)
);
} else {
assert_eq!(batches.len(), 1);
assert_eq!(
batches[0].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(80..105)
)
}
}
#[tokio::test]
async fn test_fragment_scan_v2() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = create_dataset_v2(test_uri).await;
let fragment = &dataset.get_fragments()[2];
let mut scanner = fragment.scan();
let batches = scanner
.with_row_id()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(
batches[0].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(80..120)
);
let mut scanner = fragment.scan();
let batches = scanner
.with_row_id()
.batch_size(20)
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(
batches[0].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(80..100)
);
assert_eq!(
batches[1].column_by_name("i").unwrap().as_ref(),
&Int32Array::from_iter_values(100..120)
);
}
#[tokio::test]
async fn test_fragment_update() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset_v2(test_uri).await;
let _ = dataset
.add_columns(
NewColumnTransform::SqlExpressions(vec![("col1".into(), "-1".into())]),
None,
None,
)
.await;
let mut fragment1 = dataset.get_fragment(0).unwrap();
let schema1 = Arc::new(ArrowSchema::new(vec![
ArrowField::new(ROW_ID, DataType::UInt64, false),
ArrowField::new("col1", DataType::Int64, true),
]));
let update_batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(UInt64Array::from(
(0..40).filter(|&v| v != 0 && v != 3).collect::<Vec<_>>(),
)),
Arc::new(Int64Array::from(vec![2; 38])),
],
)
.unwrap();
let right_stream1: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
vec![Ok(update_batch1)].into_iter(),
schema1,
));
let (updated_fragment1, fields_modified1) = fragment1
.update_columns(right_stream1, ROW_ID, ROW_ID)
.await
.unwrap();
let op1 = Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![updated_fragment1],
new_fragments: vec![],
fields_modified: fields_modified1,
merged_generations: Vec::new(),
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
};
let mut dataset1 = Dataset::commit(
test_uri,
op1,
Some(dataset.version().version),
None,
None,
Default::default(),
true,
)
.await
.unwrap();
assert_eq!(dataset1.get_fragments().len(), 5);
let scanner1 = dataset1.get_fragment(0).unwrap().scan();
let batches1 = scanner1
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches1.len(), 1);
let mut expected_col1 = vec![2; 40];
expected_col1[0] = -1;
expected_col1[3] = -1;
assert_eq!(
batches1[0].column_by_name("col1").unwrap().as_ref(),
&Int64Array::from(expected_col1)
);
let _ = dataset1
.add_columns(
NewColumnTransform::SqlExpressions(vec![("col2".into(), "false".into())]),
None,
None,
)
.await;
let mut fragment2 = dataset1.get_fragment(0).unwrap();
let schema2 = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i1", DataType::Int32, true),
ArrowField::new("col2", DataType::Boolean, true),
ArrowField::new("col1", DataType::Int64, true),
]));
let update_batch2 = RecordBatch::try_new(
schema2.clone(),
vec![
Arc::new(Int32Array::from(
(0..40).filter(|&v| v != 0 && v != 3).collect::<Vec<_>>(),
)),
Arc::new(BooleanArray::from(vec![true; 38])),
Arc::new(Int64Array::from(vec![3; 38])),
],
)
.unwrap();
let right_stream2: Box<dyn RecordBatchReader + Send> = Box::new(RecordBatchIterator::new(
vec![Ok(update_batch2)].into_iter(),
schema2,
));
let (updated_fragment2, fields_modified2) = fragment2
.update_columns(right_stream2, "i", "i1")
.await
.unwrap();
let op = Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![updated_fragment2],
new_fragments: vec![],
fields_modified: fields_modified2,
merged_generations: Vec::new(),
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
};
let dataset2 = Dataset::commit(
test_uri,
op,
Some(dataset1.version().version),
None,
None,
Default::default(),
true,
)
.await
.unwrap();
assert_eq!(dataset2.get_fragments().len(), 5);
let scanner2 = dataset2.get_fragment(0).unwrap().scan();
let batches2 = scanner2
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(batches2.len(), 1);
expected_col1 = vec![3; 40];
expected_col1[0] = -1;
expected_col1[3] = -1;
assert_eq!(
batches2[0].column_by_name("col1").unwrap().as_ref(),
&Int64Array::from(expected_col1)
);
let mut expected_col2 = vec![true; 40];
expected_col2[0] = false;
expected_col2[3] = false;
assert_eq!(
batches2[0].column_by_name("col2").unwrap().as_ref(),
&BooleanArray::from(expected_col2)
);
}
#[tokio::test]
async fn test_out_of_range() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, LanceFileVersion::Legacy).await;
dataset.delete("i >= 20").await.unwrap();
let fragment = &dataset.get_fragments()[0];
assert_eq!(fragment.metadata.num_rows().unwrap(), 20);
for with_row_id in [false, true] {
let reader = fragment
.open(
fragment.schema(),
FragReadConfig::default().with_row_id(with_row_id),
)
.await
.unwrap();
for valid_range in [0..40, 20..40] {
reader
.take_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..41, 41..42] {
assert!(reader.take_range(invalid_range, 100).is_err());
}
}
for with_row_id in [false, true] {
let reader = fragment
.open(
fragment.schema(),
FragReadConfig::default().with_row_id(with_row_id),
)
.await
.unwrap();
for valid_range in [0..20, 0..10, 10..20] {
reader
.read_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..21, 21..22] {
assert!(reader.read_range(invalid_range, 100).is_err());
}
}
}
#[tokio::test]
async fn test_rowid_rowaddr_only() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, LanceFileVersion::Legacy).await;
dataset.delete("i >= 20").await.unwrap();
let fragment = &dataset.get_fragments()[0];
assert_eq!(fragment.metadata.num_rows().unwrap(), 20);
for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] {
let reader = fragment
.open(
&fragment.schema().project::<&str>(&[]).unwrap(),
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address),
)
.await
.unwrap();
for valid_range in [0..40, 20..40] {
reader
.take_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..41, 41..42] {
assert!(reader.take_range(invalid_range, 100).is_err());
}
}
for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] {
let reader = fragment
.open(
&fragment.schema().project::<&str>(&[]).unwrap(),
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address),
)
.await
.unwrap();
for valid_range in [0..20, 0..10, 10..20] {
reader
.read_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..21, 21..22] {
assert!(reader.read_range(invalid_range, 100).is_err());
}
}
}
#[rstest]
#[tokio::test]
async fn test_fragment_take_range_deletions(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, data_storage_version).await;
dataset.delete("i >= 0 and i < 15").await.unwrap();
let fragment = &dataset.get_fragments()[0];
let mut reader = fragment
.open(
dataset.schema(),
FragReadConfig::default().with_row_id(true),
)
.await
.unwrap();
reader.with_make_deletions_null();
if data_storage_version == LanceFileVersion::Legacy {
let batch1 = reader
.legacy_read_batch_projected(0, .., dataset.schema())
.await
.unwrap();
assert_eq!(
batch1.column_by_name(ROW_ID).unwrap().as_ref(),
&UInt64Array::from_iter(std::iter::repeat_n(None, 10))
);
let batch2 = reader
.legacy_read_batch_projected(1, .., dataset.schema())
.await
.unwrap();
assert_eq!(
batch2.column_by_name(ROW_ID).unwrap().as_ref(),
&UInt64Array::from_iter((10..20).map(|v| if v < 15 { None } else { Some(v) }))
);
let batch3 = reader
.legacy_read_batch_projected(2, .., dataset.schema())
.await
.unwrap();
assert_eq!(
batch3.column_by_name(ROW_ID).unwrap().as_ref(),
&UInt64Array::from_iter_values(20..30)
);
} else {
let to_batches = |range: Range<u32>| {
let batch_size = range.len() as u32;
reader
.take_range(range, batch_size)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
};
let batches = to_batches(0..10).await.unwrap();
assert_eq!(batches.len(), 1);
let batch = batches.into_iter().next().unwrap();
assert_eq!(
batch.column_by_name(ROW_ID).unwrap().as_ref(),
&UInt64Array::from_iter(std::iter::repeat_n(None, 10))
);
let batches = to_batches(10..20).await.unwrap();
assert_eq!(batches.len(), 1);
let batch = batches.into_iter().next().unwrap();
assert_eq!(
batch.column_by_name(ROW_ID).unwrap().as_ref(),
&UInt64Array::from_iter((10..20).map(|v| if v < 15 { None } else { Some(v) }))
);
let batches = to_batches(20..30).await.unwrap();
assert_eq!(batches.len(), 1);
let batch = batches.into_iter().next().unwrap();
assert_eq!(
batch.column_by_name(ROW_ID).unwrap().as_ref(),
&UInt64Array::from_iter_values(20..30)
);
}
}
#[rstest]
#[tokio::test]
async fn test_range_scan_deletions(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = create_dataset(test_uri, data_storage_version).await;
let version = dataset.version().version;
let check = |cond: &'static str, range: Range<u32>, expected: Vec<i32>| async {
let mut dataset = dataset.checkout_version(version).await.unwrap();
dataset.restore().await.unwrap();
dataset.delete(cond).await.unwrap();
let fragment = &dataset.get_fragments()[0];
let reader = fragment
.open(
dataset.schema(),
FragReadConfig::default().with_row_id(true),
)
.await
.unwrap();
let mut stream = reader.read_range(range, 20).unwrap();
let mut batches = Vec::new();
while let Some(next) = stream.next().await {
batches.push(next.await.unwrap());
}
let schema = Arc::new(dataset.schema().into());
let batch = arrow_select::concat::concat_batches(&schema, batches.iter()).unwrap();
assert_eq!(batch.num_rows(), expected.len());
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(expected)
);
};
check("i < 5", 0..2, vec![5, 6]).await;
check("i < 5", 0..15, (5..20).collect()).await;
check("i >= 5 and i < 15", 7..9, vec![17, 18]).await;
check("i >= 5 and i < 15", 3..5, vec![3, 4]).await;
check("i >= 5 and i < 15", 3..6, vec![3, 4, 15]).await;
check("i >= 5 and i < 15", 5..6, vec![15]).await;
check("i >= 5 and i < 15", 5..10, vec![15, 16, 17, 18, 19]).await;
check(
"i >= 5 and i < 15",
0..10,
vec![0, 1, 2, 3, 4, 15, 16, 17, 18, 19],
)
.await;
check("i >= 15", 10..15, vec![10, 11, 12, 13, 14]).await;
check("i >= 15", 0..15, (0..15).collect()).await;
}
#[rstest]
#[tokio::test]
async fn test_fragment_take_indices(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, data_storage_version).await;
let fragment = dataset
.get_fragments()
.into_iter()
.find(|f| f.id() == 3)
.unwrap();
let batch = fragment
.take(&[1, 2, 4, 5, 5, 8], dataset.schema())
.await
.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(vec![121, 122, 124, 125, 125, 128])
);
dataset.delete("i in (122, 123, 125)").await.unwrap();
dataset.validate().await.unwrap();
let fragment = dataset
.get_fragments()
.into_iter()
.find(|f| f.id() == 3)
.unwrap();
assert!(fragment.metadata().deletion_file.is_some());
let batch = fragment
.take(&[1, 2, 4, 5, 8], dataset.schema())
.await
.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(vec![121, 124, 127, 128, 131])
);
let batch = fragment.take(&[], dataset.schema()).await.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(Vec::<i32>::new())
);
}
#[rstest]
#[tokio::test]
async fn test_fragment_take_rows(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, data_storage_version).await;
let fragment = dataset
.get_fragments()
.into_iter()
.find(|f| f.id() == 3)
.unwrap();
let batch = fragment
.take_rows(
&[1, 2, 4, 5, 5, 8],
dataset.schema(),
false,
false,
false,
false,
)
.await
.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(vec![121, 122, 124, 125, 125, 128])
);
dataset.delete("i in (122, 124)").await.unwrap();
dataset.validate().await.unwrap();
let fragment = dataset
.get_fragments()
.into_iter()
.find(|f| f.id() == 3)
.unwrap();
assert!(fragment.metadata().deletion_file.is_some());
let batch = fragment
.take_rows(
&[1, 2, 4, 5, 8],
dataset.schema(),
false,
false,
false,
false,
)
.await
.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(vec![121, 125, 128])
);
let batch = fragment
.take_rows(&[], dataset.schema(), false, false, false, false)
.await
.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(Vec::<i32>::new())
);
let batch = fragment
.take_rows(
&[1, 2, 4, 5, 8],
dataset.schema(),
false,
true,
false,
false,
)
.await
.unwrap();
assert_eq!(
batch.column_by_name("i").unwrap().as_ref(),
&Int32Array::from(vec![121, 125, 128])
);
assert_eq!(
batch.column_by_name(ROW_ADDR).unwrap().as_ref(),
&UInt64Array::from(vec![(3 << 32) + 1, (3 << 32) + 5, (3 << 32) + 8])
);
}
#[tokio::test]
async fn test_recommit_from_file() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = create_dataset(test_uri, LanceFileVersion::Legacy).await;
let schema = dataset.schema();
let dataset_rows = dataset.count_rows(None).await.unwrap();
let mut paths: Vec<String> = Vec::new();
for f in dataset.get_fragments() {
for file in Fragment::from(f.clone()).files {
let p = file.path.clone();
paths.push(p);
}
}
let mut fragments: Vec<Fragment> = Vec::new();
for (idx, path) in paths.iter().enumerate() {
let f = FileFragment::create_from_file(path, &dataset, idx, None)
.await
.unwrap();
fragments.push(f)
}
let op = Operation::Overwrite {
schema: schema.clone(),
fragments,
config_upsert_values: None,
initial_bases: None,
};
let new_dataset =
Dataset::commit(test_uri, op, None, None, None, Default::default(), false)
.await
.unwrap();
assert_eq!(new_dataset.count_rows(None).await.unwrap(), dataset_rows);
let fragments = new_dataset.get_fragments();
assert_eq!(fragments.len(), 5);
for f in fragments {
assert_eq!(f.metadata.num_rows(), Some(40));
assert_eq!(f.count_rows(None).await.unwrap(), 40);
assert_eq!(f.metadata().deletion_file, None);
}
}
#[rstest]
#[tokio::test]
async fn test_fragment_count(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = create_dataset(test_uri, data_storage_version).await;
let fragment = dataset.get_fragments().pop().unwrap();
assert_eq!(fragment.count_rows(None).await.unwrap(), 40);
assert_eq!(fragment.physical_rows().await.unwrap(), 40);
assert!(fragment.metadata.deletion_file.is_none());
assert_eq!(
fragment
.count_rows(Some("i < 170".to_string()))
.await
.unwrap(),
10
);
let fragment = fragment
.delete("i >= 160 and i <= 172")
.await
.unwrap()
.unwrap();
fragment.validate().await.unwrap();
assert_eq!(fragment.count_rows(None).await.unwrap(), 27);
assert_eq!(fragment.physical_rows().await.unwrap(), 40);
assert!(fragment.metadata.deletion_file.is_some());
assert_eq!(
fragment.metadata.deletion_file.unwrap().num_deleted_rows,
Some(13)
);
}
#[rstest]
#[tokio::test]
async fn test_append_new_columns(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
for with_delete in [true, false] {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, data_storage_version).await;
dataset.validate().await.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 200);
if with_delete {
dataset.delete("i >= 15 and i < 20").await.unwrap();
dataset.validate().await.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 195);
}
let fragment = &mut dataset.get_fragment(0).unwrap();
let mut updater = fragment.updater(Some(&["i"]), None, None).await.unwrap();
let new_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"double_i",
DataType::Int32,
true,
)]));
while let Some(batch) = updater.next().await.unwrap() {
let input_col = batch.column_by_name("i").unwrap();
let result_col = mul(input_col, &Int32Array::new_scalar(2)).unwrap();
let batch = RecordBatch::try_new(
new_schema.clone(),
vec![Arc::new(result_col) as ArrayRef],
)
.unwrap();
updater.update(batch).await.unwrap();
}
let new_fragment = updater.finish().await.unwrap();
assert_eq!(new_fragment.files.len(), 2);
let mut full_schema = dataset.schema().merge(new_schema.as_ref()).unwrap();
full_schema.set_field_id(None);
let before_version = dataset.version().version;
let op = Operation::Overwrite {
fragments: vec![new_fragment],
schema: full_schema.clone(),
config_upsert_values: None,
initial_bases: None,
};
let dataset =
Dataset::commit(test_uri, op, None, None, None, Default::default(), false)
.await
.unwrap();
assert_eq!(
dataset.count_rows(None).await.unwrap(),
if with_delete { 35 } else { 40 }
);
assert_eq!(dataset.version().version, before_version + 1);
dataset.validate().await.unwrap();
let new_projection = full_schema.project(&["i", "double_i"]).unwrap();
let stream = dataset
.scan()
.batch_size(10)
.project(&["i", "double_i"])
.unwrap()
.try_into_stream()
.await
.unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batches[1].schema().as_ref(), &(&new_projection).into());
let expected_i = match (with_delete, data_storage_version) {
(true, LanceFileVersion::Legacy) => vec![10, 11, 12, 13, 14],
(true, _) => vec![10, 11, 12, 13, 14, 20, 21, 22, 23, 24],
(false, _) => vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
};
let expected_batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int32, true),
ArrowField::new("double_i", DataType::Int32, true),
])),
vec![
Arc::new(Int32Array::from_iter_values(expected_i.iter().copied())),
Arc::new(Int32Array::from_iter_values(
expected_i.iter().map(|i| 2 * i),
)),
],
)
.unwrap();
assert_eq!(batches[1], expected_batch);
}
}
#[rstest]
#[tokio::test]
async fn test_merge_fragment(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let mut dataset = create_dataset(test_uri, data_storage_version).await;
dataset.validate().await.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 200);
let deleted_range = 15..20;
dataset.delete("i >= 15 and i < 20").await.unwrap();
dataset.validate().await.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 195);
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int32, true),
ArrowField::new("double_i", DataType::Int32, true),
]));
let to_merge = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..200)),
Arc::new(Int32Array::from_iter_values((0..400).step_by(2))),
],
)
.unwrap();
let stream = RecordBatchIterator::new(vec![Ok(to_merge)], schema.clone());
dataset.merge(stream, "i", "i").await.unwrap();
dataset.validate().await.unwrap();
let batches = dataset
.scan()
.project(&["i", "double_i"])
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let batch = concat_batches(&schema, &batches).unwrap();
let mut row_id: i32 = 0;
let mut i: usize = 0;
let array_i: &Int32Array = as_primitive_array(&batch["i"]);
let array_double_i: &Int32Array = as_primitive_array(&batch["double_i"]);
while row_id < 200 {
if deleted_range.contains(&row_id) {
row_id += 1;
continue;
}
assert_eq!(array_i.value(i), row_id);
assert_eq!(array_double_i.value(i), 2 * row_id);
row_id += 1;
i += 1;
}
}
#[tokio::test]
async fn test_write_batch_size() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
true,
)]));
let in_memory_batch = 1024;
let batches: Vec<RecordBatch> = (0..10)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(
i * in_memory_batch..(i + 1) * in_memory_batch,
))],
)
.unwrap()
})
.collect();
let batch_iter = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let fragment = FileFragment::create(
test_uri,
10,
batch_iter,
Some(WriteParams {
max_rows_per_group: 100,
data_storage_version: Some(LanceFileVersion::Legacy),
..Default::default()
}),
)
.await
.unwrap();
let (object_store, base_path) = ObjectStore::from_uri(test_uri).await.unwrap();
let file_reader = PreviousFileReader::try_new_with_fragment_id(
&object_store,
&base_path
.child("data")
.child(fragment.files[0].path.as_str()),
schema.as_ref().try_into().unwrap(),
10,
0,
1,
None,
)
.await
.unwrap();
for i in 0..file_reader.num_batches() - 1 {
assert_eq!(file_reader.num_rows_in_batch(i as i32), 100);
}
assert_eq!(
file_reader.num_rows_in_batch(file_reader.num_batches() as i32 - 1) as i32,
in_memory_batch * 10 % 100
);
}
#[tokio::test]
async fn test_shuffled_columns() -> Result<()> {
let batch_i = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
true,
)])),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)?;
let batch_s = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Utf8,
true,
)])),
vec![Arc::new(StringArray::from_iter_values(
(0..20).map(|v| format!("s-{}", v)),
))],
)?;
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(batch_i.clone())], batch_i.schema().clone()),
test_uri,
None,
)
.await?;
let fragment = dataset.get_fragments().pop().unwrap();
let mut updater = fragment.updater(Some(&["i"]), None, None).await?;
updater.next().await?;
updater.update(batch_s.clone()).await?;
let frag = updater.finish().await?;
let schema = updater.schema().unwrap().clone().project(&["s", "i"])?;
let dataset = Dataset::commit(
test_uri,
Operation::Merge {
schema,
fragments: vec![frag],
},
Some(dataset.manifest.version),
None,
None,
Default::default(),
false,
)
.await?;
let expected_data = batch_s.merge(&batch_i)?;
let actual_data = dataset.scan().try_into_batch().await?;
assert_eq!(expected_data, actual_data);
let reader = dataset
.get_fragments()
.first()
.unwrap()
.open(dataset.schema(), FragReadConfig::default())
.await?;
let actual_data = reader.take_as_batch(&[0, 1, 2], None).await?;
assert_eq!(expected_data.slice(0, 3), actual_data);
let actual_data = reader
.read_range(0..3, 3)
.unwrap()
.next()
.await
.unwrap()
.await
.unwrap();
assert_eq!(expected_data.slice(0, 3), actual_data);
let expected_data = expected_data.try_with_column(
ROW_ID_FIELD.clone(),
Arc::new(UInt64Array::from_iter_values(0..20)),
)?;
let actual_data = dataset.scan().with_row_id().try_into_batch().await?;
assert_eq!(expected_data, actual_data);
Ok(())
}
#[tokio::test]
async fn test_row_id_reader() -> Result<()> {
let batch = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
true,
)])),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)?;
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema().clone()),
test_uri,
None,
)
.await?;
let fragment = dataset.get_fragments().pop().unwrap();
let reader = fragment
.open(
&dataset.schema().project::<&str>(&[])?,
FragReadConfig::default().with_row_id(true),
)
.await?;
let batch = reader.legacy_read_range_as_batch(0..20).await?;
let expected_data = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ROW_ID_FIELD.clone()])),
vec![Arc::new(UInt64Array::from_iter_values(0..20))],
)?;
assert_eq!(expected_data, batch);
let res = fragment
.open(
&dataset.schema().project::<&str>(&[])?,
FragReadConfig::default(),
)
.await;
assert!(matches!(res, Err(Error::NotFound { .. })));
Ok(())
}
#[tokio::test]
async fn create_from_file_v2() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;
let make_gen = || {
gen_batch()
.col("str", array::rand_type(&DataType::Utf8))
.col("int", array::rand_type(&DataType::Int32))
};
let batch = make_gen().into_batch_rows(RowCount::from(128)).unwrap();
let dataset = TestDatasetGenerator::new(vec![batch], LanceFileVersion::Stable)
.make_hostile(test_uri)
.await;
let new_data = make_gen().into_batch_rows(RowCount::from(128)).unwrap();
let store = ObjectStore::local();
let file_path = dataset.data_dir().child("some_file.lance");
let object_writer = store.create(&file_path).await.unwrap();
let mut file_writer =
lance_file::writer::FileWriter::new_lazy(object_writer, FileWriterOptions::default());
file_writer.write_batch(&new_data).await.unwrap();
file_writer.finish().await.unwrap();
let frag = FileFragment::create_from_file("some_file.lance", &dataset, 0, Some(128))
.await
.unwrap();
assert_eq!(
Fragment::try_infer_version(std::slice::from_ref(&frag))
.unwrap()
.unwrap(),
LanceFileVersion::Stable.resolve()
);
let op = Operation::Append {
fragments: vec![frag],
};
let dataset = Dataset::commit(
&dataset.uri,
op,
Some(dataset.version().version),
None,
None,
Default::default(),
false,
)
.await
.unwrap();
assert_eq!(
dataset
.count_rows(Some("int IS NOT NULL".to_string()))
.await
.unwrap(),
256
);
}
#[tokio::test]
async fn test_iops_read_small() {
let schema = Arc::new(ArrowSchema::new(
(0..8)
.map(|i| ArrowField::new(format!("col_{}", i), DataType::Int32, true))
.collect::<Vec<_>>(),
));
let batch = RecordBatch::try_new(
schema.clone(),
(0..8)
.map(|i| Arc::new(Int32Array::from(vec![i])) as ArrayRef)
.collect(),
)
.unwrap();
let session = Arc::new(Session::default());
let write_params = WriteParams {
session: Some(session.clone()),
..Default::default()
};
let dataset = InsertBuilder::new("memory://test")
.with_params(&write_params)
.execute(vec![batch])
.await
.unwrap();
let fragment = dataset.get_fragments().pop().unwrap();
{
let stats = dataset.object_store().io_stats_incremental();
assert_io_eq!(stats, write_iops, 3);
assert_io_lt!(stats, written_bytes, 4300);
}
let projection = Schema::try_from(schema.as_ref())
.unwrap()
.project_by_ids(&[0, 1, 2, 3, 4, 6, 7, 8, 9], true);
let reader = fragment
.open(&projection, Default::default())
.await
.unwrap();
let mut data = reader
.read_all(1024)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(data.len(), 1);
let data = data.pop().unwrap();
assert_eq!(data.num_rows(), 1);
assert_eq!(data.num_columns(), 7);
let stats = dataset.object_store().io_stats_incremental();
assert_io_eq!(stats, read_iops, 1);
assert_io_lt!(stats, read_bytes, 4096);
}
}