use std::ops::{Range, RangeTo};
use std::sync::Arc;
use arrow::array::PrimitiveBuilder;
use arrow::datatypes::{Int32Type, Int64Type};
use arrow_arith::arithmetic::subtract_scalar;
use arrow_array::cast::as_primitive_array;
use arrow_array::{
ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray,
RecordBatch, StructArray, UInt32Array, UInt64Array,
};
use arrow_buffer::ArrowNativeType;
use arrow_schema::{DataType, Field as ArrowField, FieldRef, Schema as ArrowSchema};
use arrow_select::concat::{concat, concat_batches};
use arrow_select::filter::filter_record_batch;
use async_recursion::async_recursion;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use futures::stream::{self, TryStreamExt};
use futures::{Future, FutureExt, StreamExt};
use object_store::path::Path;
use prost::Message;
use super::deletion::{read_deletion_file, DeletionVector};
use super::object_reader::read_message;
use super::{deletion_file_path, ReadBatchParams};
use crate::arrow::*;
use crate::dataset::ROW_ID;
use crate::encodings::{dictionary::DictionaryDecoder, AsyncIndex};
use crate::error::{Error, Result};
use crate::format::{pb, Index, Metadata, PageTable};
use crate::format::{Fragment, Manifest};
use crate::io::object_reader::{read_fixed_stride_array, read_struct, ObjectReader};
use crate::io::{read_metadata_offset, read_struct_from_buf};
use crate::session::Session;
use crate::{
datatypes::{Field, Schema},
format::PageInfo,
};
use super::object_store::ObjectStore;
pub async fn read_manifest(object_store: &ObjectStore, path: &Path) -> Result<Manifest> {
let file_size = object_store.inner.head(path).await?.size;
const PREFETCH_SIZE: usize = 64 * 1024;
let initial_start = std::cmp::max(file_size as i64 - PREFETCH_SIZE as i64, 0) as usize;
let range = Range {
start: initial_start,
end: file_size,
};
let buf = object_store.inner.get_range(path, range).await?;
if buf.len() < 16 {
return Err(Error::IO {
message: "Invalid format: file size is smaller than 16 bytes".to_string(),
});
}
if !buf.ends_with(super::MAGIC) {
return Err(Error::IO {
message: "Invalid format: magic number does not match".to_string(),
});
}
let manifest_pos = LittleEndian::read_i64(&buf[buf.len() - 16..buf.len() - 8]) as usize;
let manifest_len = file_size - manifest_pos;
let buf: Bytes = if manifest_len <= buf.len() {
buf.slice(buf.len() - manifest_len..buf.len())
} else {
let mut buf2: BytesMut = object_store
.inner
.get_range(
path,
Range {
start: manifest_pos,
end: file_size - PREFETCH_SIZE,
},
)
.await?
.into_iter()
.collect();
buf2.extend_from_slice(&buf);
buf2.freeze()
};
let recorded_length = LittleEndian::read_u32(&buf[0..4]) as usize;
let buf = buf.slice(4..buf.len() - 16);
if buf.len() != recorded_length {
return Err(Error::IO {
message: format!(
"Invalid format: manifest length does not match. Expected {}, got {}",
recorded_length,
buf.len()
),
});
}
let proto = pb::Manifest::decode(buf)?;
Ok(Manifest::from(proto))
}
pub async fn read_manifest_indexes(
object_store: &ObjectStore,
path: &Path,
manifest: &Manifest,
) -> Result<Vec<Index>> {
if let Some(pos) = manifest.index_section.as_ref() {
let reader = object_store.open(path).await?;
let section: pb::IndexSection = read_message(reader.as_ref(), *pos).await?;
Ok(section
.indices
.iter()
.map(Index::try_from)
.collect::<Result<Vec<_>>>()?)
} else {
Ok(vec![])
}
}
fn compute_row_id(fragment_id: u64, offset: i32) -> u64 {
(fragment_id << 32) + offset as u64
}
pub struct FileReader {
object_reader: Arc<dyn ObjectReader>,
metadata: Arc<Metadata>,
page_table: Arc<PageTable>,
projection: Option<Schema>,
fragment_id: u64,
with_row_id: bool,
deletion_vector: Option<Arc<DeletionVector>>,
}
impl std::fmt::Debug for FileReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FileReader(fragment={}, path={})",
self.fragment_id,
self.object_reader.path()
)
}
}
impl FileReader {
pub(crate) async fn try_new_with_fragment(
object_store: &ObjectStore,
path: &Path,
fragment_id: u64,
manifest: Option<&Manifest>,
session: Option<&Session>,
) -> Result<Self> {
let object_reader = object_store.open(path).await?;
let is_dataset = manifest.is_some();
let metadata = Self::load_from_cache(session, path, |_| async {
let file_size = object_reader.size().await?;
let begin = if file_size < object_store.block_size() {
0
} else {
file_size - object_store.block_size()
};
let tail_bytes = object_reader.get_range(begin..file_size).await?;
let metadata_pos = read_metadata_offset(&tail_bytes)?;
let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() {
read_struct(object_reader.as_ref(), metadata_pos).await?
} else {
let offset = tail_bytes.len() - (file_size - metadata_pos);
read_struct_from_buf(&tail_bytes.slice(offset..))?
};
Ok(metadata)
})
.await?;
let mut m: Manifest;
let manifest = if let Some(m) = manifest {
m
} else {
m = read_struct(object_reader.as_ref(), metadata.manifest_position.unwrap()).await?;
m.schema.load_dictionary(object_reader.as_ref()).await?;
&m
};
let page_table = Self::load_from_cache(session, path, |_| {
let num_columns = manifest.schema.max_field_id().unwrap() + 1;
PageTable::load(
object_reader.as_ref(),
metadata.page_table_position,
num_columns,
metadata.num_batches() as i32,
)
})
.await?;
let projection = manifest.schema.clone();
let fragment = if is_dataset {
let fragment = manifest
.fragments
.iter()
.find(|frag| frag.id == fragment_id)
.map(|f| f.to_owned())
.ok_or(Error::IO {
message: format!("Fragment {} not found in manifest", fragment_id),
})?;
Some(fragment)
} else {
None
};
let deletion_vector = if let Some(fragment) = &fragment {
Self::load_deletion_vector(object_store, fragment, session).await?
} else {
None
};
Ok(Self {
object_reader: object_reader.into(),
metadata,
projection: Some(projection),
page_table,
fragment_id,
with_row_id: false,
deletion_vector,
})
}
async fn load_from_cache<T: Send + Sync + 'static, F, Fut>(
session: Option<&Session>,
path: &Path,
loader: F,
) -> Result<Arc<T>>
where
F: Fn(&Path) -> Fut,
Fut: Future<Output = Result<T>>,
{
if let Some(session) = session {
if let Some(metadata) = session.file_metadata_cache.get::<T>(path) {
return Ok(metadata);
}
}
let metadata = Arc::new(loader(path).await?);
if let Some(session) = session {
session
.file_metadata_cache
.insert(path.to_owned(), metadata.clone());
}
Ok(metadata)
}
async fn load_deletion_vector(
object_store: &ObjectStore,
fragment: &Fragment,
session: Option<&Session>,
) -> Result<Option<Arc<DeletionVector>>> {
if let Some(deletion_file) = &fragment.deletion_file {
let path = deletion_file_path(object_store.base_path(), fragment.id, deletion_file);
let deletion_vector = Self::load_from_cache(session, &path, |_| async {
read_deletion_file(object_store.base_path(), fragment, object_store)
.await?
.ok_or(Error::IO {
message: format!(
"Deletion file {:?} not found in fragment {}",
deletion_file, fragment.id
),
})
})
.await?;
Ok(Some(deletion_vector))
} else {
Ok(None)
}
}
pub(crate) async fn try_new(object_store: &ObjectStore, path: &Path) -> Result<Self> {
Self::try_new_with_fragment(object_store, path, 0, None, None).await
}
pub(crate) fn with_row_id(&mut self, v: bool) -> &mut Self {
self.with_row_id = v;
self
}
pub fn schema(&self) -> &Schema {
self.projection.as_ref().unwrap()
}
pub fn num_batches(&self) -> usize {
self.metadata.num_batches()
}
pub fn num_rows_in_batch(&self, batch_id: i32) -> usize {
self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize
}
pub fn len(&self) -> usize {
self.metadata.len()
}
pub fn is_empty(&self) -> bool {
self.metadata.is_empty()
}
pub(crate) async fn read_batch(
&self,
batch_id: i32,
params: impl Into<ReadBatchParams>,
projection: &Schema,
) -> Result<RecordBatch> {
read_batch(
self,
¶ms.into(),
projection,
batch_id,
self.with_row_id,
self.deletion_vector.clone(),
)
.await
}
pub(crate) async fn read_range(
&self,
range: Range<usize>,
projection: &Schema,
) -> Result<RecordBatch> {
let range_in_batches = self.metadata.range_to_batches(range)?;
let batches =
stream::iter(range_in_batches)
.map(|(batch_id, range)| async move {
self.read_batch(batch_id, range, projection).await
})
.buffered(num_cpus::get())
.try_collect::<Vec<_>>()
.await?;
if batches.len() == 1 {
return Ok(batches[0].clone());
}
let schema = batches[0].schema();
Ok(concat_batches(&schema, &batches)?)
}
pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result<RecordBatch> {
let indices_in_batches = self.metadata.group_indices_to_batches(indices);
let batches = stream::iter(indices_in_batches)
.map(|batch| async move {
self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection)
.await
})
.buffered(num_cpus::get() * 4)
.try_collect::<Vec<_>>()
.await?;
let mut schema = ArrowSchema::from(projection);
if self.with_row_id {
schema = schema.try_with_column(ArrowField::new(ROW_ID, DataType::UInt64, false))?;
}
let schema = Arc::new(schema);
Ok(concat_batches(&schema, &batches)?)
}
}
async fn read_batch(
reader: &FileReader,
params: &ReadBatchParams,
schema: &Schema,
batch_id: i32,
with_row_id: bool,
deletion_vector: Option<Arc<DeletionVector>>,
) -> Result<RecordBatch> {
let arrs = stream::iter(&schema.fields)
.map(|f| read_array(reader, f, batch_id, params))
.buffered(num_cpus::get() * 4)
.try_collect::<Vec<_>>()
.boxed();
let arrs = arrs.await?;
let should_fetch_row_id = with_row_id
|| !matches!(
deletion_vector.as_deref(),
None | Some(DeletionVector::NoDeletions)
);
let mut batch = RecordBatch::try_new(Arc::new(schema.into()), arrs)?;
let row_ids = if should_fetch_row_id {
let ids_in_batch: Vec<i32> = match params {
ReadBatchParams::Indices(indices) => {
indices.values().iter().map(|v| *v as i32).collect()
}
ReadBatchParams::Range(r) => r.clone().map(|v| v as i32).collect(),
ReadBatchParams::RangeFull => (0..batch.num_rows() as i32).collect(),
ReadBatchParams::RangeTo(r) => (0..r.end).map(|v| v as i32).collect(),
ReadBatchParams::RangeFrom(r) => (r.start..r.start + batch.num_rows())
.map(|v| v as i32)
.collect(),
};
let batch_offset = reader
.metadata
.get_offset(batch_id)
.ok_or_else(|| Error::IO {
message: format!("batch {batch_id} does not exist"),
})?;
let row_ids: Vec<u64> = ids_in_batch
.iter()
.map(|o| compute_row_id(reader.fragment_id, *o + batch_offset))
.collect();
Some(row_ids)
} else {
None
};
let deletion_mask =
deletion_vector.and_then(|v| v.build_predicate(row_ids.as_ref().unwrap().iter()));
if with_row_id {
let row_id_arr = Arc::new(UInt64Array::from(row_ids.unwrap()));
batch = batch.try_with_column(
ArrowField::new("_rowid", DataType::UInt64, false),
row_id_arr,
)?;
}
match deletion_mask {
None => Ok(batch),
Some(mask) => Ok(filter_record_batch(&batch, &mask)?),
}
}
#[async_recursion]
async fn read_array(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef> {
let data_type = field.data_type();
use DataType::*;
if data_type.is_fixed_stride() {
_read_fixed_stride_array(reader, field, batch_id, params).await
} else {
match data_type {
Null => read_null_array(reader, field, batch_id, params),
Utf8 | LargeUtf8 | Binary | LargeBinary => {
read_binary_array(reader, field, batch_id, params).await
}
Struct(_) => read_struct_array(reader, field, batch_id, params).await,
Dictionary(_, _) => read_dictionary_array(reader, field, batch_id, params).await,
List(_) => read_list_array::<Int32Type>(reader, field, batch_id, params).await,
LargeList(_) => read_list_array::<Int64Type>(reader, field, batch_id, params).await,
_ => {
unimplemented!("{}", format!("No support for {data_type} yet"));
}
}
}
}
fn get_page_info<'a>(
page_table: &'a PageTable,
field: &'a Field,
batch_id: i32,
) -> Result<&'a PageInfo> {
page_table.get(field.id, batch_id).ok_or_else(|| Error::IO {
message: format!(
"No page info found for field: {}, field_id={} batch={}",
field.name, field.id, batch_id
),
})
}
async fn _read_fixed_stride_array(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef> {
let page_info = get_page_info(&reader.page_table, field, batch_id)?;
read_fixed_stride_array(
reader.object_reader.as_ref(),
&field.data_type(),
page_info.position,
page_info.length,
params.clone(),
)
.await
}
fn read_null_array(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef> {
let page_info = get_page_info(&reader.page_table, field, batch_id)?;
let length_output = match params {
ReadBatchParams::Indices(indices) => {
if indices.is_empty() {
0
} else {
let idx_max = *indices.values().iter().max().unwrap() as u64;
if idx_max >= page_info.length.try_into().unwrap() {
return Err(Error::IO {
message: format!(
"NullArray Reader: request([{}]) out of range: [0..{}]",
idx_max, page_info.length
),
});
}
indices.len()
}
}
_ => {
let (idx_start, idx_end) = match params {
ReadBatchParams::Range(r) => (r.start, r.end),
ReadBatchParams::RangeFull => (0, page_info.length),
ReadBatchParams::RangeTo(r) => (0, r.end),
ReadBatchParams::RangeFrom(r) => (r.start, page_info.length),
_ => unreachable!(),
};
if idx_end > page_info.length {
return Err(Error::IO {
message: format!(
"NullArray Reader: request([{}..{}]) out of range: [0..{}]",
idx_start, idx_end, page_info.length
),
});
}
idx_end - idx_start
}
};
Ok(Arc::new(NullArray::new(length_output)))
}
async fn read_binary_array(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef> {
let page_info = get_page_info(&reader.page_table, field, batch_id)?;
use crate::io::object_reader::read_binary_array;
read_binary_array(
reader.object_reader.as_ref(),
&field.data_type(),
field.nullable,
page_info.position,
page_info.length,
params,
)
.await
}
async fn read_dictionary_array(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef> {
let page_info = get_page_info(&reader.page_table, field, batch_id)?;
let data_type = field.data_type();
let decoder = DictionaryDecoder::new(
reader.object_reader.as_ref(),
page_info.position,
page_info.length,
&data_type,
field
.dictionary
.as_ref()
.unwrap()
.values
.as_ref()
.unwrap()
.clone(),
);
decoder.get(params.clone()).await
}
async fn read_struct_array(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef> {
let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![];
for child in field.children.as_slice() {
let arr = read_array(reader, child, batch_id, params).await?;
sub_arrays.push((Arc::new(child.into()), arr));
}
Ok(Arc::new(StructArray::from(sub_arrays)))
}
async fn take_list_array<T: ArrowNumericType>(
reader: &FileReader,
field: &Field,
batch_id: i32,
positions: &PrimitiveArray<T>,
indices: &UInt32Array,
) -> Result<ArrayRef>
where
T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
{
let first_idx = indices.value(0);
let ranges = indices
.values()
.iter()
.map(|i| (*i - first_idx).as_usize())
.map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize())
.collect::<Vec<_>>();
let field = field.clone();
let mut list_values: Vec<ArrayRef> = vec![];
for range in ranges.iter() {
list_values.push(
read_array(
reader,
&field.children[0],
batch_id,
&(range.clone()).into(),
)
.await?,
);
}
let value_refs = list_values
.iter()
.map(|arr| arr.as_ref())
.collect::<Vec<_>>();
let mut offsets_builder = PrimitiveBuilder::<T>::new();
offsets_builder.append_value(T::Native::usize_as(0));
let mut off = 0_usize;
for range in ranges {
off += range.len();
offsets_builder.append_value(T::Native::usize_as(off));
}
let all_values = concat(value_refs.as_slice())?;
let offset_arr = offsets_builder.finish();
let arr = try_new_generic_list_array(all_values, &offset_arr)?;
Ok(Arc::new(arr) as ArrayRef)
}
async fn read_list_array<T: ArrowNumericType>(
reader: &FileReader,
field: &Field,
batch_id: i32,
params: &ReadBatchParams,
) -> Result<ArrayRef>
where
T::Native: ArrowNativeTypeOp + OffsetSizeTrait,
{
let positions_params = match params {
ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)),
ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1),
ReadBatchParams::Indices(indices) => {
(indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into()
}
p => p.clone(),
};
let page_info = get_page_info(&reader.page_table, field, batch_id)?;
let position_arr = read_fixed_stride_array(
reader.object_reader.as_ref(),
&T::DATA_TYPE,
page_info.position,
page_info.length,
positions_params,
)
.await?;
let positions: &PrimitiveArray<T> = as_primitive_array(position_arr.as_ref());
let value_params = match params {
ReadBatchParams::Range(range) => ReadBatchParams::from(
positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(),
),
ReadBatchParams::RangeTo(RangeTo { end }) => {
ReadBatchParams::from(..positions.value(*end).as_usize())
}
ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..),
ReadBatchParams::RangeFull => ReadBatchParams::from(
positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(),
),
ReadBatchParams::Indices(indices) => {
return take_list_array(reader, field, batch_id, positions, indices).await;
}
};
let start_position = positions.value(0);
let offset_arr = subtract_scalar(positions, start_position)?;
let value_arrs = read_array(reader, &field.children[0], batch_id, &value_params).await?;
let arr = try_new_generic_list_array(value_arrs, &offset_arr)?;
Ok(Arc::new(arr) as ArrayRef)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dataset::{Dataset, WriteParams};
use crate::format::Fragment;
use crate::io::deletion::write_deletion_file;
use crate::io::deletion::DeletionVector;
use crate::io::ObjectStore;
use arrow::array::LargeListBuilder;
use arrow_array::builder::StringDictionaryBuilder;
use arrow_array::types::Int32Type;
use arrow_array::RecordBatchIterator;
use arrow_array::{
builder::{Int32Builder, ListBuilder, StringBuilder},
cast::{as_primitive_array, as_string_array, as_struct_array},
types::UInt8Type,
Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, NullArray,
StringArray, StructArray, UInt32Array, UInt8Array,
};
use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
use rand::{distributions::Alphanumeric, Rng};
use roaring::RoaringBitmap;
use tempfile::tempdir;
use tokio::io::AsyncWriteExt;
use crate::io::{write_manifest, FileWriter};
#[tokio::test]
async fn read_with_row_id() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int64, true),
ArrowField::new("f", DataType::Float32, false),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let store = ObjectStore::memory();
let path = Path::from("/foo");
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from_iter(
value_range.clone().collect::<Vec<_>>(),
)),
Arc::new(Float32Array::from_iter(
value_range.map(|n| n as f32).collect::<Vec<_>>(),
)),
];
let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap();
file_writer.write(&[batch]).await.unwrap();
}
file_writer.finish().await.unwrap();
let fragment = 123;
let mut reader = FileReader::try_new_with_fragment(&store, &path, fragment, None, None)
.await
.unwrap();
reader.with_row_id(true);
for b in 0..10 {
let batch = reader.read_batch(b, .., reader.schema()).await.unwrap();
let row_ids_col = &batch["_rowid"];
let start_pos = (fragment << 32) + 10 * b as u64;
assert_eq!(
&UInt64Array::from_iter_values(start_pos..start_pos + 10),
as_primitive_array(row_ids_col)
);
}
}
#[tokio::test]
async fn test_take() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int64, true),
ArrowField::new("f", DataType::Float32, false),
ArrowField::new("s", DataType::Utf8, false),
ArrowField::new(
"d",
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
false,
),
]);
let mut schema = Schema::try_from(&arrow_schema).unwrap();
let store = ObjectStore::memory();
let path = Path::from("/take_test");
let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
let values_ref = Arc::new(values);
let mut batches = vec![];
for batch_id in 0..10 {
let value_range: Range<i64> = batch_id * 10..batch_id * 10 + 10;
let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8));
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from_iter(
value_range.clone().collect::<Vec<_>>(),
)),
Arc::new(Float32Array::from_iter(
value_range.clone().map(|n| n as f32).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from_iter_values(
value_range.clone().map(|n| format!("str-{}", n)),
)),
Arc::new(DictionaryArray::<UInt8Type>::try_new(keys, values_ref.clone()).unwrap()),
];
batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap());
}
schema.set_dictionary(&batches[0]).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
for batch in batches.iter() {
file_writer.write(&[batch.clone()]).await.unwrap();
}
file_writer.finish().await.unwrap();
let reader = FileReader::try_new(&store, &path).await.unwrap();
let batch = reader
.take(&[1, 15, 20, 25, 30, 48, 90], reader.schema())
.await
.unwrap();
let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]);
assert_eq!(
batch,
RecordBatch::try_new(
batch.schema(),
vec![
Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])),
Arc::new(Float32Array::from_iter_values([
1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0
])),
Arc::new(StringArray::from_iter_values([
"str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90"
])),
Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()),
]
)
.unwrap()
);
}
#[tokio::test]
async fn read_with_delete() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int64, true),
ArrowField::new("f", DataType::Float32, false),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let (store, path) = ObjectStore::from_uri("memory:///foo").await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from_iter(value_range.clone())),
Arc::new(Float32Array::from_iter(value_range.map(|n| n as f32))),
];
let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap();
file_writer.write(&[batch]).await.unwrap();
}
file_writer.finish().await.unwrap();
let fragment = 123;
let dv = DeletionVector::Bitmap(RoaringBitmap::from_iter((0..100).filter(|x| x % 2 == 0)));
let deletion_file = write_deletion_file(&Path::from("/foo"), fragment, 0, &dv, &store)
.await
.unwrap();
let mut frag_struct = Fragment::new(fragment);
frag_struct.deletion_file = deletion_file;
let manifest = Manifest::new(&schema, Arc::new(vec![frag_struct]));
let mut reader =
FileReader::try_new_with_fragment(&store, &path, fragment, Some(&manifest), None)
.await
.unwrap();
reader.with_row_id(true);
for b in 0..10 {
let batch = reader.read_batch(b, .., reader.schema()).await.unwrap();
let row_ids_col = &batch["_rowid"];
let start_pos = (fragment << 32) + 10 * b as u64;
assert_eq!(
&UInt64Array::from_iter_values((start_pos..start_pos + 10).filter(|i| i % 2 == 1)),
as_primitive_array(row_ids_col)
);
}
}
#[tokio::test]
async fn read_with_delete_without_row_id() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int64, true),
ArrowField::new("f", DataType::Float32, false),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let (store, path) = ObjectStore::from_uri("memory:///foo").await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from_iter(value_range.clone())),
Arc::new(Float32Array::from_iter(value_range.map(|n| n as f32))),
];
let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap();
file_writer.write(&[batch]).await.unwrap();
}
file_writer.finish().await.unwrap();
let fragment = 123;
let dv = DeletionVector::Bitmap(RoaringBitmap::from_iter((0..100).filter(|x| x % 2 == 0)));
let deletion_file = write_deletion_file(&Path::from("/foo"), fragment, 0, &dv, &store)
.await
.unwrap();
let mut frag_struct = Fragment::new(fragment);
frag_struct.deletion_file = deletion_file;
let manifest = Manifest::new(&schema, Arc::new(vec![frag_struct]));
let mut reader =
FileReader::try_new_with_fragment(&store, &path, fragment, Some(&manifest), None)
.await
.unwrap();
reader.with_row_id(false);
for b in 0..10 {
let batch = reader.read_batch(b, .., reader.schema()).await.unwrap();
assert!(!batch
.schema()
.fields()
.iter()
.map(|f| f.name().as_str())
.any(|name| name == "_rowid"))
}
}
async fn test_write_null_string_in_struct(field_nullable: bool) {
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"parent",
DataType::Struct(ArrowFields::from(vec![ArrowField::new(
"str",
DataType::Utf8,
field_nullable,
)])),
true,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let store = ObjectStore::memory();
let path = Path::from("/null_strings");
let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")]));
let struct_arr = Arc::new(StructArray::from(vec![(
Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)),
string_arr.clone() as ArrayRef,
)]));
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();
let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
if field_nullable {
assert_eq!(
&StringArray::from_iter(vec![Some("a"), None, Some("b")]),
as_string_array(
as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref())
.column_by_name("str")
.unwrap()
.as_ref()
)
);
} else {
assert_eq!(actual_batch, batch);
}
}
#[tokio::test]
async fn read_nullable_string_in_struct() {
test_write_null_string_in_struct(true).await;
test_write_null_string_in_struct(false).await;
}
#[tokio::test]
async fn test_read_struct_of_list_arrays() {
let store = ObjectStore::memory();
let path = Path::from("/null_strings");
let arrow_schema = make_schema_of_list_array();
let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let batches = (0..3)
.map(|_| {
let struct_array = make_struct_of_list_array(10, 10);
RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap()
})
.collect::<Vec<_>>();
let batches_ref = batches.iter().collect::<Vec<_>>();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&batches).await.unwrap();
file_writer.finish().await.unwrap();
let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
let expected = concat_batches(&arrow_schema, batches_ref).unwrap();
assert_eq!(expected, actual_batch);
}
#[tokio::test]
async fn test_scan_struct_of_list_arrays() {
let store = ObjectStore::memory();
let path = Path::from("/null_strings");
let arrow_schema = make_schema_of_list_array();
let struct_array = make_struct_of_list_array(3, 10);
let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();
let mut expected_columns: Vec<ArrayRef> = Vec::new();
for c in struct_array.columns().iter() {
expected_columns.push(c.slice(1, 1));
}
let expected_struct = match arrow_schema.fields[0].data_type() {
DataType::Struct(subfields) => subfields
.iter()
.zip(expected_columns)
.map(|(f, d)| (f.clone(), d))
.collect::<Vec<_>>(),
_ => panic!("unexpected field"),
};
let expected_struct_array = StructArray::from(expected_struct);
let expected_batch = RecordBatch::from(&StructArray::from(vec![(
Arc::new(arrow_schema.fields[0].as_ref().clone()),
Arc::new(expected_struct_array) as ArrayRef,
)]));
let reader = FileReader::try_new(&store, &path).await.unwrap();
let params = ReadBatchParams::Range(1..2);
let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap();
assert_eq!(expected_batch, slice_of_batch);
}
fn make_schema_of_list_array() -> Arc<arrow_schema::Schema> {
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(ArrowFields::from(vec![
ArrowField::new(
"li",
DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
true,
),
ArrowField::new(
"ls",
DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
true,
),
ArrowField::new(
"ll",
DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
])),
true,
)]))
}
fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc<StructArray> {
let mut li_builder = ListBuilder::new(Int32Builder::new());
let mut ls_builder = ListBuilder::new(StringBuilder::new());
let ll_value_builder = Int32Builder::new();
let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
for i in 0..rows {
for j in 0..num_items {
li_builder.values().append_value(i * 10 + j);
ls_builder
.values()
.append_value(format!("str-{}", i * 10 + j));
large_list_builder.values().append_value(i * 10 + j);
}
li_builder.append(true);
ls_builder.append(true);
large_list_builder.append(true);
}
Arc::new(StructArray::from(vec![
(
Arc::new(ArrowField::new(
"li",
DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
true,
)),
Arc::new(li_builder.finish()) as ArrayRef,
),
(
Arc::new(ArrowField::new(
"ls",
DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))),
true,
)),
Arc::new(ls_builder.finish()) as ArrayRef,
),
(
Arc::new(ArrowField::new(
"ll",
DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
false,
)),
Arc::new(large_list_builder.finish()) as ArrayRef,
),
]))
}
#[tokio::test]
async fn test_read_struct_of_dictionary_arrays() {
let test_dir = tempdir().unwrap();
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(ArrowFields::from(vec![ArrowField::new(
"d",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)])),
true,
)]));
let mut batches: Vec<RecordBatch> = Vec::new();
for _ in 1..2 {
let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
dict_builder.append("a").unwrap();
dict_builder.append("b").unwrap();
dict_builder.append("c").unwrap();
dict_builder.append("d").unwrap();
let struct_array = Arc::new(StructArray::from(vec![(
Arc::new(ArrowField::new(
"d",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)),
Arc::new(dict_builder.finish()) as ArrayRef,
)]));
let batch =
RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
batches.push(batch);
}
let test_uri = test_dir.path().to_str().unwrap();
let batch_reader =
RecordBatchIterator::new(batches.clone().into_iter().map(Ok), arrow_schema.clone());
Dataset::write(batch_reader, test_uri, Some(WriteParams::default()))
.await
.unwrap();
let _result = scan_dataset(test_uri).await.unwrap();
assert_eq!(batches, _result);
}
async fn scan_dataset(uri: &str) -> Result<Vec<RecordBatch>> {
let results = Dataset::open(uri)
.await?
.scan()
.try_into_stream()
.await?
.try_collect::<Vec<_>>()
.await?;
Ok(results)
}
#[tokio::test]
async fn test_read_nullable_arrays() {
use arrow_array::Array;
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int64, false),
ArrowField::new("n", DataType::Null, true),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from_iter_values(0..100)),
Arc::new(NullArray::new(100)),
];
let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
let store = ObjectStore::memory();
let path = Path::from("/takes");
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();
let reader = FileReader::try_new(&store, &path).await.unwrap();
async fn read_array_w_params(
reader: &FileReader,
field: &Field,
params: ReadBatchParams,
) -> ArrayRef {
read_array(reader, field, 0, ¶ms)
.await
.expect("Error reading back the null array from file") as _
}
let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
assert_eq!(100, arr.len());
assert_eq!(100, arr.null_count());
let arr =
read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
assert_eq!(15, arr.len());
assert_eq!(15, arr.null_count());
let arr =
read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
assert_eq!(40, arr.len());
assert_eq!(40, arr.null_count());
let arr =
read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
assert_eq!(25, arr.len());
assert_eq!(25, arr.null_count());
let arr = read_array_w_params(
&reader,
&schema.fields[1],
ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])),
)
.await;
assert_eq!(4, arr.len());
assert_eq!(4, arr.null_count());
let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
let arr = read_array(&reader, &schema.fields[1], 0, ¶ms);
assert!(arr.await.is_err());
let params = ReadBatchParams::RangeTo(..107);
let arr = read_array(&reader, &schema.fields[1], 0, ¶ms);
assert!(arr.await.is_err());
}
#[tokio::test]
async fn test_take_lists() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new(
"l",
DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
ArrowField::new(
"ll",
DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
]);
let value_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(value_builder);
let ll_value_builder = Int32Builder::new();
let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
for i in 0..100 {
list_builder.values().append_value(i);
large_list_builder.values().append_value(i);
if (i + 1) % 10 == 0 {
list_builder.append(true);
large_list_builder.append(true);
}
}
let list_arr = Arc::new(list_builder.finish());
let large_list_arr = Arc::new(large_list_builder.finish());
let batch = RecordBatch::try_new(
Arc::new(arrow_schema.clone()),
vec![list_arr as ArrayRef, large_list_arr as ArrayRef],
)
.unwrap();
let store = ObjectStore::memory();
let path = Path::from("/take_list");
let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();
let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap();
let value_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(value_builder);
let ll_value_builder = Int32Builder::new();
let mut large_list_builder = LargeListBuilder::new(ll_value_builder);
for i in [1, 3, 5, 9] {
for j in 0..10 {
list_builder.values().append_value(i * 10 + j);
large_list_builder.values().append_value(i * 10 + j);
}
list_builder.append(true);
large_list_builder.append(true);
}
let expected_list = list_builder.finish();
let expected_large_list = large_list_builder.finish();
assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list);
assert_eq!(
actual.column_by_name("ll").unwrap().as_ref(),
&expected_large_list
);
}
#[tokio::test]
async fn test_list_array_with_offsets() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new(
"l",
DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
ArrowField::new(
"ll",
DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
]);
let store = ObjectStore::memory();
let path = Path::from("/lists");
let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![Some(3), Some(4)]),
Some((0..2_000).map(Some).collect::<Vec<_>>()),
])
.slice(1, 1);
let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(10), Some(11)]),
Some(vec![Some(12), Some(13)]),
Some((0..2_000).map(Some).collect::<Vec<_>>()),
])
.slice(1, 1);
let batch = RecordBatch::try_new(
Arc::new(arrow_schema.clone()),
vec![Arc::new(list_array), Arc::new(large_list_array)],
)
.unwrap();
let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();
let file_size_bytes = store.size(&path).await.unwrap();
assert!(file_size_bytes < 1_000);
let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
assert_eq!(batch, actual_batch);
}
#[tokio::test]
async fn test_read_ranges() {
let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let columns: Vec<ArrayRef> = vec![Arc::new(Int64Array::from_iter_values(0..100))];
let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap();
let store = ObjectStore::memory();
let path = Path::from("/read_range");
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();
let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap();
assert_eq!(
actual_batch.column_by_name("i").unwrap().as_ref(),
&Int64Array::from_iter_values(7..25)
);
}
async fn test_roundtrip_manifest(prefix_size: usize, manifest_min_size: usize) {
let store = ObjectStore::memory();
let path = Path::from("/read_large_manifest");
let mut writer = store.create(&path).await.unwrap();
let prefix: Vec<u8> = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(prefix_size)
.collect();
writer.write_all(&prefix).await.unwrap();
let long_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(manifest_min_size)
.map(char::from)
.collect();
let arrow_schema =
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut manifest = Manifest::new(&schema, Arc::new(vec![]));
let pos = write_manifest(&mut writer, &mut manifest, None)
.await
.unwrap();
writer.write_magics(pos).await.unwrap();
writer.shutdown().await.unwrap();
let roundtripped_manifest = read_manifest(&store, &path).await.unwrap();
assert_eq!(manifest, roundtripped_manifest);
store.inner.delete(&path).await.unwrap();
}
#[tokio::test]
async fn test_read_large_manifest() {
test_roundtrip_manifest(0, 100_000).await;
test_roundtrip_manifest(1000, 100_000).await;
test_roundtrip_manifest(1000, 1000).await;
}
}