use std::collections::{BTreeMap, HashMap};
use std::default::Default;
use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{
cast::as_struct_array, RecordBatch, RecordBatchReader, StructArray, UInt64Array,
};
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use arrow_select::{concat::concat_batches, take::take};
use chrono::{prelude::*, Duration};
use futures::future::BoxFuture;
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::FutureExt;
use log::warn;
use object_store::path::Path;
use tracing::instrument;
mod chunker;
pub mod cleanup;
mod feature_flags;
pub mod fragment;
mod hash_joiner;
pub mod optimize;
pub mod progress;
pub mod scanner;
pub mod transaction;
pub mod updater;
mod write;
use self::cleanup::RemovalStats;
use self::feature_flags::{apply_feature_flags, can_read_dataset, can_write_dataset};
use self::fragment::FileFragment;
use self::scanner::Scanner;
use self::transaction::{Operation, Transaction};
use self::write::{reader_to_stream, write_fragments};
use crate::datatypes::Schema;
use crate::error::box_error;
use crate::format::{Fragment, Index, Manifest};
use crate::index::vector::open_index;
use crate::io::reader::read_manifest_indexes;
use crate::io::{
commit::{commit_new_dataset, commit_transaction, CommitError},
object_reader::read_struct,
object_store::ObjectStoreParams,
read_manifest, read_metadata_offset, write_manifest, ObjectStore,
};
use crate::session::Session;
use crate::utils::temporal::{utc_now, SystemTime};
use crate::{Error, Result};
use hash_joiner::HashJoiner;
pub use scanner::ROW_ID;
pub use write::{WriteMode, WriteParams};
const INDICES_DIR: &str = "_indices";
pub(crate) const DELETION_DIRS: &str = "_deletions";
const DATA_DIR: &str = "data";
pub(crate) const DEFAULT_INDEX_CACHE_SIZE: usize = 256;
pub(crate) const DEFAULT_METADATA_CACHE_SIZE: usize = 256;
#[derive(Debug, Clone)]
pub struct Dataset {
pub(crate) object_store: Arc<ObjectStore>,
pub(crate) base: Path,
pub(crate) manifest: Arc<Manifest>,
pub(crate) session: Arc<Session>,
}
pub struct Version {
pub version: u64,
pub timestamp: DateTime<Utc>,
pub metadata: BTreeMap<String, String>,
}
impl From<&Manifest> for Version {
fn from(m: &Manifest) -> Self {
Self {
version: m.version,
timestamp: m.timestamp(),
metadata: BTreeMap::default(),
}
}
}
pub struct ReadParams {
pub block_size: Option<usize>,
pub index_cache_size: usize,
pub metadata_cache_size: usize,
pub session: Option<Arc<Session>>,
pub store_options: Option<ObjectStoreParams>,
}
impl ReadParams {
pub fn index_cache_size(&mut self, cache_size: usize) -> &mut Self {
self.index_cache_size = cache_size;
self
}
pub fn metadata_cache_size(&mut self, cache_size: usize) -> &mut Self {
self.metadata_cache_size = cache_size;
self
}
pub fn session(&mut self, session: Arc<Session>) -> &mut Self {
self.session = Some(session);
self
}
}
impl Default for ReadParams {
fn default() -> Self {
Self {
block_size: None,
index_cache_size: DEFAULT_INDEX_CACHE_SIZE,
metadata_cache_size: DEFAULT_METADATA_CACHE_SIZE,
session: None,
store_options: None,
}
}
}
impl Dataset {
pub async fn open(uri: &str) -> Result<Self> {
let params = ReadParams::default();
Self::open_with_params(uri, ¶ms).await
}
pub async fn open_with_params(uri: &str, params: &ReadParams) -> Result<Self> {
let (mut object_store, base_path) = match params.store_options.as_ref() {
Some(store_options) => ObjectStore::from_uri_and_params(uri, store_options).await?,
None => ObjectStore::from_uri(uri).await?,
};
if let Some(block_size) = params.block_size {
object_store.set_block_size(block_size);
}
let latest_manifest = object_store
.commit_handler
.resolve_latest_version(&base_path, &object_store)
.await
.map_err(|e| Error::DatasetNotFound {
path: base_path.to_string(),
source: Box::new(e),
})?;
let session = if let Some(session) = params.session.as_ref() {
session.clone()
} else {
Arc::new(Session::new(
params.index_cache_size,
params.metadata_cache_size,
))
};
Self::checkout_manifest(
Arc::new(object_store),
base_path.clone(),
&latest_manifest,
session,
)
.await
}
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let params = ReadParams::default();
Self::checkout_with_params(uri, version, ¶ms).await
}
pub async fn checkout_with_params(
uri: &str,
version: u64,
params: &ReadParams,
) -> Result<Self> {
let (mut object_store, base_path) = ObjectStore::from_uri(uri).await?;
if let Some(block_size) = params.block_size {
object_store.set_block_size(block_size);
};
let manifest_file = object_store
.commit_handler
.resolve_version(&base_path, version, &object_store)
.await?;
let session = if let Some(session) = params.session.as_ref() {
session.clone()
} else {
Arc::new(Session::new(
params.index_cache_size,
params.metadata_cache_size,
))
};
Self::checkout_manifest(Arc::new(object_store), base_path, &manifest_file, session).await
}
pub async fn checkout_version(&self, version: u64) -> Result<Self> {
let base_path = self.base.clone();
let manifest_file = self
.object_store
.commit_handler
.resolve_version(&base_path, version, &self.object_store)
.await?;
Self::checkout_manifest(
self.object_store.clone(),
base_path,
&manifest_file,
self.session.clone(),
)
.await
}
async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
manifest_path: &Path,
session: Arc<Session>,
) -> Result<Self> {
let object_reader = object_store
.open(manifest_path)
.await
.map_err(|e| match &e {
Error::NotFound { uri, .. } => Error::DatasetNotFound {
path: uri.clone(),
source: box_error(e),
},
_ => e,
})?;
let get_result = object_store
.inner
.get(manifest_path)
.await
.map_err(|e| match e {
object_store::Error::NotFound { path: _, source } => Error::DatasetNotFound {
path: base_path.to_string(),
source,
},
_ => e.into(),
})?;
let bytes = get_result.bytes().await?;
let offset = read_metadata_offset(&bytes)?;
let mut manifest: Manifest = read_struct(object_reader.as_ref(), offset).await?;
if !can_read_dataset(manifest.reader_feature_flags) {
let message = format!(
"This dataset cannot be read by this version of Lance. \
Please upgrade Lance to read this dataset.\n Flags: {}",
manifest.reader_feature_flags
);
return Err(Error::NotSupported {
source: message.into(),
});
}
manifest
.schema
.load_dictionary(object_reader.as_ref())
.await?;
Ok(Self {
object_store,
base: base_path,
manifest: Arc::new(manifest),
session,
})
}
#[instrument(skip(batches, params))]
async fn write_impl(
batches: Box<dyn RecordBatchReader + Send>,
uri: &str,
params: Option<WriteParams>,
) -> Result<Self> {
let mut params = params.unwrap_or_default();
let (object_store, base) =
ObjectStore::from_uri_and_params(uri, ¶ms.store_params.clone().unwrap_or_default())
.await?;
let dataset_exists = match object_store
.commit_handler
.resolve_latest_version(&base, &object_store)
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};
let (stream, schema) = reader_to_stream(batches)?;
if dataset_exists && matches!(params.mode, WriteMode::Create) {
return Err(Error::DatasetAlreadyExists {
uri: uri.to_owned(),
});
}
if !dataset_exists
&& (matches!(params.mode, WriteMode::Append)
|| matches!(params.mode, WriteMode::Overwrite))
{
warn!("No existing dataset at {uri}, it will be created");
params = WriteParams {
mode: WriteMode::Create,
..params
};
}
let params = params; let dataset = if matches!(params.mode, WriteMode::Create) {
None
} else {
Some(
Self::open_with_params(
uri,
&ReadParams {
store_options: params.store_params.clone(),
..Default::default()
},
)
.await?,
)
};
if matches!(params.mode, WriteMode::Append) {
if let Some(d) = dataset.as_ref() {
let m = d.manifest.as_ref();
if schema != m.schema {
return Err(Error::SchemaMismatch {
original: m.schema.clone(),
new: schema,
});
}
}
}
if let Some(d) = dataset.as_ref() {
if !can_write_dataset(d.manifest.writer_feature_flags) {
let message = format!(
"This dataset cannot be written by this version of Lance. \
Please upgrade Lance to write to this dataset.\n Flags: {}",
d.manifest.writer_feature_flags
);
return Err(Error::NotSupported {
source: message.into(),
});
}
}
let object_store = Arc::new(object_store);
let fragments =
write_fragments(object_store.clone(), &base, &schema, stream, params.clone()).await?;
let operation = match params.mode {
WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite { schema, fragments },
WriteMode::Append => Operation::Append { fragments },
};
let transaction = Transaction::new(
dataset.as_ref().map(|ds| ds.manifest.version).unwrap_or(0),
operation,
None,
);
let manifest = if let Some(dataset) = &dataset {
commit_transaction(
dataset,
&object_store,
&transaction,
&Default::default(),
&Default::default(),
)
.await?
} else {
commit_new_dataset(&object_store, &base, &transaction, &Default::default()).await?
};
Ok(Self {
object_store,
base,
manifest: Arc::new(manifest.clone()),
session: Arc::new(Session::default()),
})
}
pub async fn write(
batches: impl RecordBatchReader + Send + 'static,
uri: &str,
params: Option<WriteParams>,
) -> Result<Self> {
let batches = Box::new(batches);
Self::write_impl(batches, uri, params).await
}
async fn append_impl(
&mut self,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<()> {
let params = WriteParams {
mode: WriteMode::Append,
..params.unwrap_or_default()
};
let object_store = Arc::new(
self.object_store()
.with_params(¶ms.store_params.clone().unwrap_or_default()),
);
let (stream, schema) = reader_to_stream(batches)?;
if self.manifest.schema != schema {
return Err(Error::SchemaMismatch {
original: self.manifest.schema.clone(),
new: schema,
});
}
let fragments = write_fragments(
object_store.clone(),
&self.base,
&schema,
stream,
params.clone(),
)
.await?;
let transaction =
Transaction::new(self.manifest.version, Operation::Append { fragments }, None);
let new_manifest = commit_transaction(
self,
&object_store,
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
self.manifest = Arc::new(new_manifest);
Ok(())
}
pub async fn append(
&mut self,
batches: impl RecordBatchReader + Send + 'static,
params: Option<WriteParams>,
) -> Result<()> {
let batches = Box::new(batches);
self.append_impl(batches, params).await
}
async fn latest_manifest(&self) -> Result<Manifest> {
read_manifest(
&self.object_store,
&self
.object_store
.commit_handler
.resolve_latest_version(&self.base, &self.object_store)
.await?,
)
.await
}
pub async fn restore(&mut self, write_params: Option<WriteParams>) -> Result<()> {
let latest_manifest = self.latest_manifest().await?;
let latest_version = latest_manifest.version;
let transaction = Transaction::new(
latest_version,
Operation::Restore {
version: self.manifest.version,
},
None,
);
let object_store =
if let Some(store_params) = write_params.and_then(|params| params.store_params) {
Arc::new(self.object_store.with_params(&store_params))
} else {
self.object_store.clone()
};
self.manifest = Arc::new(
commit_transaction(
self,
&object_store,
&transaction,
&Default::default(),
&Default::default(),
)
.await?,
);
Ok(())
}
pub fn cleanup_old_versions(
&self,
older_than: Duration,
delete_unverified: Option<bool>,
) -> BoxFuture<Result<RemovalStats>> {
let before = utc_now() - older_than;
cleanup::cleanup_old_versions(self, before, delete_unverified).boxed()
}
pub async fn commit(
base_uri: &str,
operation: Operation,
read_version: Option<u64>,
store_params: Option<ObjectStoreParams>,
) -> Result<Self> {
let read_version = read_version.map_or_else(
|| match operation {
Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
_ => Err(Error::invalid_input(
"read_version must be specified for this operation",
)),
},
Ok,
)?;
let (object_store, base) =
ObjectStore::from_uri_and_params(base_uri, &store_params.clone().unwrap_or_default())
.await?;
let dataset_exists = match object_store
.commit_handler
.resolve_latest_version(&base, &object_store)
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};
if !dataset_exists && !matches!(operation, Operation::Overwrite { .. }) {
return Err(Error::DatasetNotFound {
path: base.to_string(),
source: "The dataset must already exist unless the operation is Overwrite".into(),
});
}
let dataset = if dataset_exists {
Some(
Self::open_with_params(
base_uri,
&ReadParams {
store_options: store_params.clone(),
..Default::default()
},
)
.await?,
)
} else {
None
};
let transaction = Transaction::new(read_version, operation, None);
let manifest = if let Some(dataset) = &dataset {
commit_transaction(
dataset,
&object_store,
&transaction,
&Default::default(),
&Default::default(),
)
.await?
} else {
commit_new_dataset(&object_store, &base, &transaction, &Default::default()).await?
};
Ok(Self {
object_store: Arc::new(object_store),
base,
manifest: Arc::new(manifest.clone()),
session: Arc::new(Session::default()),
})
}
async fn merge_impl(
&mut self,
stream: Box<dyn RecordBatchReader + Send>,
left_on: &str,
right_on: &str,
) -> Result<()> {
if self.schema().field(left_on).is_none() {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the left side dataset",
left_on
)));
};
let right_schema = stream.schema();
if right_schema.field_with_name(right_on).is_err() {
return Err(Error::invalid_input(format!(
"Column {} does not exist in the right side dataset",
right_on
)));
};
for field in right_schema.fields() {
if field.name() == right_on {
continue;
}
if self.schema().field(field.name()).is_some() {
return Err(Error::invalid_input(format!(
"Column {} exists in both sides of the dataset",
field.name()
)));
}
}
let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
let new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
let updated_fragments: Vec<Fragment> = stream::iter(self.get_fragments())
.then(|f| {
let joiner = joiner.clone();
async move { f.merge(left_on, &joiner).await.map(|f| f.metadata) }
})
.try_collect::<Vec<_>>()
.await?;
let transaction = Transaction::new(
self.manifest.version,
Operation::Merge {
fragments: updated_fragments,
schema: new_schema,
},
None,
);
let manifest = commit_transaction(
self,
&self.object_store,
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
self.manifest = Arc::new(manifest);
Ok(())
}
pub async fn merge(
&mut self,
stream: impl RecordBatchReader + Send + 'static,
left_on: &str,
right_on: &str,
) -> Result<()> {
let stream = Box::new(stream);
self.merge_impl(stream, left_on, right_on).await
}
pub fn scan(&self) -> Scanner {
Scanner::new(Arc::new(self.clone()))
}
pub async fn count_rows(&self) -> Result<usize> {
let counts = stream::iter(self.get_fragments())
.map(|f| async move { f.count_rows().await })
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
Ok(counts.iter().sum())
}
pub async fn take(&self, row_indices: &[usize], projection: &Schema) -> Result<RecordBatch> {
let mut sorted_indices: Vec<u32> =
Vec::from_iter(row_indices.iter().map(|indice| *indice as u32));
sorted_indices.sort();
let mut row_count = 0;
let mut start = 0;
let schema = Arc::new(ArrowSchema::from(projection));
let mut batches = Vec::with_capacity(sorted_indices.len());
for fragment in self.get_fragments().iter() {
if start >= sorted_indices.len() {
break;
}
let max_row_indices = row_count + fragment.count_rows().await? as u32;
if sorted_indices[start] < max_row_indices {
let mut end = start;
sorted_indices[end] -= row_count;
while end + 1 < sorted_indices.len() && sorted_indices[end + 1] < max_row_indices {
end += 1;
sorted_indices[end] -= row_count;
}
batches.push(
fragment
.take(&sorted_indices[start..end + 1], projection)
.await?,
);
for indice in sorted_indices[start..end + 1].iter_mut() {
*indice += row_count;
}
start = end + 1;
}
row_count = max_row_indices;
}
let one_batch = concat_batches(&schema, &batches)?;
let remapping_index: UInt64Array = row_indices
.iter()
.map(|o| sorted_indices.binary_search(&(*o as u32)).unwrap() as u64)
.collect();
let struct_arr: StructArray = one_batch.into();
let reordered = take(&struct_arr, &remapping_index, None)?;
Ok(as_struct_array(&reordered).into())
}
pub async fn take_rows(&self, row_ids: &[u64], projection: &Schema) -> Result<RecordBatch> {
if row_ids.is_empty() {
return Ok(RecordBatch::new_empty(Arc::new(projection.into())));
}
let row_id_meta = check_row_ids(row_ids);
if row_id_meta.contiguous {
let start = row_ids.first().expect("empty range passed to take_rows");
let fragment_id = (start >> 32) as usize;
let range_start = *start as u32 as usize;
let range_end =
*row_ids.last().expect("empty range passed to take_rows") as u32 as usize;
let range = range_start..(range_end + 1);
let fragment = self.get_fragment(fragment_id).ok_or_else(|| {
Error::invalid_input(format!("row_id belongs to non-existant fragment: {start}"))
})?;
let reader = fragment.open(projection).await?;
reader.read_range(range).await
} else if row_id_meta.sorted {
let mut batches: Vec<RecordBatch> = Vec::new();
let mut current_fragment = row_ids[0] >> 32;
let mut current_start = 0;
let mut row_ids_iter = row_ids.iter().enumerate();
'outer: loop {
let (fragment_id, range) = loop {
if let Some((i, row_id)) = row_ids_iter.next() {
let fragment_id = row_id >> 32;
if fragment_id != current_fragment {
let next = (current_fragment, current_start..i);
current_fragment = fragment_id;
current_start = i;
break next;
}
} else if current_start != row_ids.len() {
let next = (current_fragment, current_start..row_ids.len());
current_start = row_ids.len();
break next;
} else {
break 'outer;
}
};
let fragment = self.get_fragment(fragment_id as usize).ok_or_else(|| {
Error::invalid_input(format!(
"row_id belongs to non-existant fragment: {}",
row_ids[current_start]
))
})?;
let row_ids: Vec<u32> = row_ids[range].iter().map(|x| *x as u32).collect();
let batch = fragment.take_rows(&row_ids, projection, false).await?;
batches.push(batch);
}
Ok(concat_batches(&batches[0].schema(), &batches)?)
} else {
let projection_with_row_id = Schema::merge(
projection,
&ArrowSchema::new(vec![ArrowField::new(
ROW_ID,
arrow::datatypes::DataType::UInt64,
false,
)]),
)?;
let schema_with_row_id = Arc::new(ArrowSchema::from(&projection_with_row_id));
let mut sorted_row_ids = Vec::from(row_ids);
sorted_row_ids.sort();
let mut row_ids_per_fragment: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
sorted_row_ids.iter().for_each(|row_id| {
let fragment_id = row_id >> 32;
let offset = (row_id - (fragment_id << 32)) as u32;
row_ids_per_fragment
.entry(fragment_id)
.and_modify(|v| v.push(offset))
.or_insert_with(|| vec![offset]);
});
let fragments = self.get_fragments();
let fragment_and_indices = fragments.iter().filter_map(|f| {
let local_row_ids = row_ids_per_fragment.get(&(f.id() as u64))?;
Some((f, local_row_ids))
});
let mut batches = stream::iter(fragment_and_indices)
.then(|(fragment, indices)| fragment.take_rows(indices, projection, true))
.try_collect::<Vec<_>>()
.await?;
let one_batch = if batches.len() > 1 {
concat_batches(&schema_with_row_id, &batches)?
} else {
batches.pop().unwrap()
};
let returned_row_ids = one_batch
.column_by_name(ROW_ID)
.ok_or_else(|| Error::Internal {
message: "ROW_ID column not found".into(),
})?
.as_primitive::<UInt64Type>()
.values();
let remapping_index: UInt64Array = row_ids
.iter()
.filter_map(|o| {
returned_row_ids
.iter()
.position(|id| id == o)
.map(|pos| pos as u64)
})
.collect();
debug_assert_eq!(remapping_index.len(), one_batch.num_rows());
let keep_indices = (0..one_batch.num_columns() - 1).collect::<Vec<_>>();
let one_batch = one_batch.project(&keep_indices)?;
let struct_arr: StructArray = one_batch.into();
let reordered = take(&struct_arr, &remapping_index, None)?;
Ok(as_struct_array(&reordered).into())
}
}
pub(crate) async fn sample(&self, n: usize, projection: &Schema) -> Result<RecordBatch> {
use rand::seq::IteratorRandom;
let num_rows = self.count_rows().await?;
let ids = (0..num_rows).choose_multiple(&mut rand::thread_rng(), n);
self.take(&ids[..], projection).await
}
pub async fn delete(&mut self, predicate: &str) -> Result<()> {
let mut updated_fragments: Vec<Fragment> = Vec::new();
let mut deleted_fragment_ids: Vec<u64> = Vec::new();
stream::iter(self.get_fragments())
.map(|f| async move {
let old_fragment = f.metadata.clone();
let new_fragment = f.delete(predicate).await?.map(|f| f.metadata);
Ok((old_fragment, new_fragment))
})
.buffer_unordered(num_cpus::get())
.try_for_each(|(old_fragment, new_fragment)| {
if let Some(new_fragment) = new_fragment {
if new_fragment != old_fragment {
updated_fragments.push(new_fragment);
}
} else {
deleted_fragment_ids.push(old_fragment.id);
}
futures::future::ready(Ok::<_, crate::Error>(()))
})
.await?;
let transaction = Transaction::new(
self.manifest.version,
Operation::Delete {
updated_fragments,
deleted_fragment_ids,
predicate: predicate.to_string(),
},
None,
);
let manifest = commit_transaction(
self,
&self.object_store,
&transaction,
&Default::default(),
&Default::default(),
)
.await?;
self.manifest = Arc::new(manifest);
Ok(())
}
pub(crate) fn object_store(&self) -> &ObjectStore {
&self.object_store
}
async fn manifest_file(&self, version: u64) -> Result<Path> {
self.object_store
.commit_handler
.resolve_version(&self.base, version, &self.object_store)
.await
}
pub(crate) fn data_dir(&self) -> Path {
self.base.child(DATA_DIR)
}
pub(crate) fn indices_dir(&self) -> Path {
self.base.child(INDICES_DIR)
}
pub fn version(&self) -> Version {
Version::from(self.manifest.as_ref())
}
pub async fn versions(&self) -> Result<Vec<Version>> {
let mut versions: Vec<Version> = self
.object_store
.commit_handler
.list_manifests(&self.base, &self.object_store)
.await?
.try_filter_map(|path| async move {
match read_manifest(&self.object_store, &path).await {
Ok(manifest) => Ok(Some(Version::from(&manifest))),
Err(e) => Err(e),
}
})
.try_collect()
.await?;
versions.sort_by_key(|v| v.version);
Ok(versions)
}
pub fn schema(&self) -> &Schema {
&self.manifest.schema
}
pub fn get_fragments(&self) -> Vec<FileFragment> {
let dataset = Arc::new(self.clone());
self.manifest
.fragments
.iter()
.map(|f| FileFragment::new(dataset.clone(), f.clone()))
.collect()
}
pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
let dataset = Arc::new(self.clone());
let fragment = self
.manifest
.fragments
.iter()
.find(|f| f.id == fragment_id as u64)?;
Some(FileFragment::new(dataset, fragment.clone()))
}
pub(crate) fn fragments(&self) -> &Arc<Vec<Fragment>> {
&self.manifest.fragments
}
pub async fn load_indices(&self) -> Result<Vec<Index>> {
let manifest_file = self.manifest_file(self.version().version).await?;
read_manifest_indexes(&self.object_store, &manifest_file, &self.manifest).await
}
pub async fn index_statistics(&self, index_name: &str) -> Result<Option<String>> {
let index_uuid = self
.load_indices()
.await
.unwrap()
.iter()
.find(|idx| idx.name.eq(index_name))
.map(|idx| idx.uuid.to_string());
if let Some(index_uuid) = index_uuid {
let index_statistics = open_index(Arc::new(self.clone()), "vector", &index_uuid)
.await?
.statistics()
.unwrap();
Ok(Some(serde_json::to_string(&index_statistics).unwrap()))
} else {
Ok(None)
}
}
pub async fn validate(&self) -> Result<()> {
let id_counts =
self.manifest
.fragments
.iter()
.map(|f| f.id)
.fold(HashMap::new(), |mut acc, id| {
*acc.entry(id).or_insert(0) += 1;
acc
});
for (id, count) in id_counts {
if count > 1 {
return Err(Error::corrupt_file(
self.base.clone(),
format!(
"Duplicate fragment id {} found in dataset {:?}",
id, self.base
),
));
}
}
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.validate().await })
.buffer_unordered(num_cpus::get() * 4)
.try_collect::<Vec<()>>()
.await?;
Ok(())
}
}
#[derive(Debug)]
pub(crate) struct ManifestWriteConfig {
auto_set_feature_flags: bool, timestamp: Option<SystemTime>, }
impl Default for ManifestWriteConfig {
fn default() -> Self {
Self {
auto_set_feature_flags: true,
timestamp: None,
}
}
}
pub(crate) async fn write_manifest_file(
object_store: &ObjectStore,
base_path: &Path,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
config: &ManifestWriteConfig,
) -> std::result::Result<(), CommitError> {
if config.auto_set_feature_flags {
apply_feature_flags(manifest);
}
manifest.set_timestamp(config.timestamp);
manifest.update_max_fragment_id();
object_store
.commit_handler
.commit(
manifest,
indices,
base_path,
object_store,
write_manifest_file_to_path,
)
.await?;
Ok(())
}
fn write_manifest_file_to_path<'a>(
object_store: &'a ObjectStore,
manifest: &'a mut Manifest,
indices: Option<Vec<Index>>,
path: &'a Path,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async {
let mut object_writer = object_store.create(path).await?;
let pos = write_manifest(&mut object_writer, manifest, indices).await?;
object_writer.write_magics(pos).await?;
object_writer.shutdown().await?;
Ok(())
})
}
struct RowIdMeta {
sorted: bool,
contiguous: bool,
}
fn check_row_ids(row_ids: &[u64]) -> RowIdMeta {
let mut sorted = true;
let mut contiguous = true;
if row_ids.is_empty() {
return RowIdMeta { sorted, contiguous };
}
let mut last_id = row_ids[0];
let first_fragment_id = row_ids[0] >> 32;
for id in row_ids.iter().skip(1) {
sorted &= *id > last_id;
contiguous &= *id == last_id + 1;
contiguous &= (*id >> 32) == first_fragment_id;
last_id = *id;
}
RowIdMeta { sorted, contiguous }
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::ops::Range;
use std::vec;
use super::*;
use crate::arrow::FixedSizeListArrayExt;
use crate::dataset::WriteMode::Overwrite;
use crate::datatypes::Schema;
use crate::index::{vector::VectorIndexParams, DatasetIndexExt, IndexType};
use crate::io::deletion::read_deletion_file;
use arrow_array::{
cast::{as_string_array, as_struct_array},
DictionaryArray, Float32Array, Int32Array, Int64Array, Int8Array, Int8DictionaryArray,
RecordBatch, RecordBatchIterator, StringArray, UInt16Array, UInt32Array,
};
use arrow_ord::sort::sort_to_indices;
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use arrow_select::take::take;
use futures::stream::TryStreamExt;
use lance_linalg::distance::MetricType;
use lance_testing::datagen::generate_random_array;
use tempfile::tempdir;
fn require_send<T: Send>(t: T) -> T {
t
}
async fn create_file(path: &std::path::Path, mode: WriteMode) {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new(
"dict",
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
false,
),
]));
let dict_values = StringArray::from_iter_values(["a", "b", "c", "d", "e"]);
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
Arc::new(
DictionaryArray::try_new(
UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)),
Arc::new(dict_values.clone()),
)
.unwrap(),
),
],
)
.unwrap()
})
.collect();
let expected_batches = batches.clone();
let test_uri = path.to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
mode,
..WriteParams::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 1);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
for batch in &actual_batches {
assert_eq!(batch.num_rows(), 10);
}
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray =
concat_batches(&schema, &expected_batches).unwrap().into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..10).collect::<Vec<_>>()
)
}
#[lance_test_macros::test(tokio::test)]
async fn test_create_dataset() {
for mode in [WriteMode::Create, WriteMode::Append, Overwrite] {
let test_dir = tempdir().unwrap();
create_file(test_dir.path(), mode).await
}
}
#[lance_test_macros::test(tokio::test)]
async fn test_create_and_fill_empty_dataset() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone());
assert_eq!(schema.as_ref(), reader.schema().as_ref());
let result = Dataset::write(reader, test_uri, None).await.unwrap();
assert_eq!(result.count_rows().await.unwrap(), 0);
assert_eq!(result.manifest.max_fragment_id(), None);
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let schema_with_meta = Arc::new(
schema
.as_ref()
.clone()
.with_metadata([("key".to_string(), "value".to_string())].into()),
);
let batches = vec![RecordBatch::try_new(
schema_with_meta,
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
assert_eq!(actual_ds.count_rows().await.unwrap(), 10);
assert_eq!(actual_ds.manifest.max_fragment_id(), Some(0));
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
}
#[tokio::test]
async fn test_write_params() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let num_rows: usize = 1_000;
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..num_rows as i32))],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let write_params = WriteParams {
max_rows_per_file: 100,
max_rows_per_group: 10,
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.count_rows().await.unwrap(), num_rows);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 10);
for fragment in &fragments {
assert_eq!(fragment.count_rows().await.unwrap(), 100);
let reader = fragment.open(dataset.schema()).await.unwrap();
assert_eq!(reader.num_batches(), 10);
for i in 0..reader.num_batches() {
assert_eq!(reader.num_rows_in_batch(i), 10);
}
}
}
#[tokio::test]
async fn test_write_manifest() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let write_fut = Dataset::write(batches, test_uri, None);
let write_fut = require_send(write_fut);
let mut dataset = write_fut.await.unwrap();
let manifest = read_manifest(
dataset.object_store(),
&dataset
.object_store()
.commit_handler
.resolve_latest_version(&dataset.base, dataset.object_store())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(manifest.writer_feature_flags, 0);
assert_eq!(manifest.reader_feature_flags, 0);
dataset.delete("i < 10").await.unwrap();
dataset.validate().await.unwrap();
let mut manifest = read_manifest(
dataset.object_store(),
&dataset
.object_store()
.commit_handler
.resolve_latest_version(&dataset.base, dataset.object_store())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
manifest.writer_feature_flags,
feature_flags::FLAG_DELETION_FILES
);
assert_eq!(
manifest.reader_feature_flags,
feature_flags::FLAG_DELETION_FILES
);
manifest.writer_feature_flags = 5; manifest.reader_feature_flags = 5;
manifest.version += 1;
write_manifest_file(
dataset.object_store(),
&dataset.base,
&mut manifest,
None,
&ManifestWriteConfig {
auto_set_feature_flags: false,
timestamp: None,
},
)
.await
.unwrap();
let read_result = Dataset::open(test_uri).await;
assert!(matches!(read_result, Err(Error::NotSupported { .. })));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let write_result = Dataset::write(
batches,
test_uri,
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await;
assert!(matches!(write_result, Err(Error::NotSupported { .. })));
}
#[tokio::test]
async fn append_dataset() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(20..40))],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..40))],
)
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..2).collect::<Vec<_>>()
)
}
#[tokio::test]
async fn test_self_dataset_append() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut ds = Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(20..40))],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
ds.append(batches, Some(write_params.clone()))
.await
.unwrap();
let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..40))],
)
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
assert_eq!(actual_ds.fragments().len(), 2);
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..2).collect::<Vec<_>>()
);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
actual_ds.validate().await.unwrap();
}
#[tokio::test]
async fn test_self_dataset_append_schema_different() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let other_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int64,
false,
)]));
let other_batches = vec![RecordBatch::try_new(
other_schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut ds = Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
write_params.mode = WriteMode::Append;
let other_batches =
RecordBatchIterator::new(other_batches.into_iter().map(Ok), other_schema.clone());
let result = ds.append(other_batches, Some(write_params.clone())).await;
assert!(matches!(result, Err(Error::SchemaMismatch { .. })))
}
#[tokio::test]
async fn append_dictionary() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"x",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
false,
)]));
let dictionary = Arc::new(StringArray::from(vec!["a", "b"]));
let indices = Int8Array::from(vec![0, 1, 0]);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int8DictionaryArray::try_new(indices, dictionary.clone()).unwrap(),
)],
)
.unwrap()];
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let indices = Int8Array::from(vec![1, 0, 1]);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int8DictionaryArray::try_new(indices, dictionary).unwrap(),
)],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let dictionary = Arc::new(StringArray::from(vec!["d", "c"]));
let indices = Int8Array::from(vec![1, 0, 1]);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int8DictionaryArray::try_new(indices, dictionary).unwrap(),
)],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let result = Dataset::write(batches, test_uri, Some(write_params)).await;
assert!(result.is_err());
}
#[tokio::test]
async fn overwrite_dataset() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(dataset.manifest.max_fragment_id(), Some(0));
let new_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"s",
DataType::Utf8,
false,
)]));
let new_batches = vec![RecordBatch::try_new(
new_schema.clone(),
vec![Arc::new(StringArray::from_iter_values(
(20..40).map(|v| v.to_string()),
))],
)
.unwrap()];
write_params.mode = WriteMode::Overwrite;
let new_batch_reader =
RecordBatchIterator::new(new_batches.into_iter().map(Ok), new_schema.clone());
let dataset = Dataset::write(new_batch_reader, test_uri, Some(write_params.clone()))
.await
.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].id(), 0);
assert_eq!(dataset.manifest.max_fragment_id(), Some(0));
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, new_schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&new_schema, &actual_batches).unwrap();
assert_eq!(new_schema.clone(), actual_batch.schema());
let arr = actual_batch.column_by_name("s").unwrap();
assert_eq!(
&StringArray::from_iter_values((20..40).map(|v| v.to_string())),
as_string_array(arr)
);
assert_eq!(actual_ds.version().version, 2);
let first_ver = Dataset::checkout(test_uri, 1).await.unwrap();
assert_eq!(first_ver.version().version, 1);
assert_eq!(&ArrowSchema::from(first_ver.schema()), schema.as_ref());
}
#[tokio::test]
async fn test_take() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new("s", DataType::Utf8, false),
]));
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
Arc::new(StringArray::from_iter_values(
(i * 20..(i + 1) * 20).map(|i| format!("str-{i}")),
)),
],
)
.unwrap()
})
.collect();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.count_rows().await.unwrap(), 400);
let projection = Schema::try_from(schema.as_ref()).unwrap();
let values = dataset
.take(
&[
200, 199, 39, 40, 100, ],
&projection,
)
.await
.unwrap();
assert_eq!(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values([200, 199, 39, 40, 100])),
Arc::new(StringArray::from_iter_values(
[200, 199, 39, 40, 100].iter().map(|v| format!("str-{v}"))
)),
],
)
.unwrap(),
values
);
}
#[tokio::test]
async fn test_take_rows() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new("s", DataType::Utf8, false),
]));
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
Arc::new(StringArray::from_iter_values(
(i * 20..(i + 1) * 20).map(|i| format!("str-{i}")),
)),
],
)
.unwrap()
})
.collect();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.count_rows().await.unwrap(), 400);
let projection = Schema::try_from(schema.as_ref()).unwrap();
let indices = &[
5_u64 << 32, (4_u64 << 32) + 39, 39, 1_u64 << 32, (2_u64 << 32) + 20, ];
let values = dataset.take_rows(indices, &projection).await.unwrap();
assert_eq!(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values([200, 199, 39, 40, 100])),
Arc::new(StringArray::from_iter_values(
[200, 199, 39, 40, 100].iter().map(|v| format!("str-{v}"))
)),
],
)
.unwrap(),
values
);
dataset.delete("i in (199, 100)").await.unwrap();
dataset.validate().await.unwrap();
let values = dataset.take_rows(indices, &projection).await.unwrap();
assert_eq!(
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values([200, 39, 40])),
Arc::new(StringArray::from_iter_values(
[200, 39, 40].iter().map(|v| format!("str-{v}"))
)),
],
)
.unwrap(),
values
);
let values = dataset.take_rows(&[], &projection).await.unwrap();
assert_eq!(RecordBatch::new_empty(schema.clone()), values);
}
#[tokio::test]
async fn test_fast_count_rows() {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20))],
)
.unwrap()
})
.collect();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(10, dataset.fragments().len());
assert_eq!(400, dataset.count_rows().await.unwrap());
dataset.validate().await.unwrap();
}
#[tokio::test]
async fn test_create_index() {
let test_dir = tempdir().unwrap();
let dimension = 16;
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"embeddings",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimension,
),
false,
)]));
let float_arr = generate_random_array(512 * dimension as usize);
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
float_arr, dimension,
)
.unwrap(),
);
let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
dataset.validate().await.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 50);
dataset
.create_index(&["embeddings"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
dataset.validate().await.unwrap();
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
let expected = dataset.manifest.version - 1;
assert_eq!(actual, expected);
let fragment_bitmap = indices.first().unwrap().fragment_bitmap.as_ref().unwrap();
assert_eq!(fragment_bitmap.len(), 1);
assert!(fragment_bitmap.contains(0));
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
let expected = dataset.manifest.version - 2;
assert_eq!(actual, expected);
dataset.validate().await.unwrap();
let fragment_bitmap = indices.first().unwrap().fragment_bitmap.as_ref().unwrap();
assert_eq!(fragment_bitmap.len(), 1);
assert!(fragment_bitmap.contains(0));
let expected_statistics =
"{\"index_type\":\"IVF\",\"metric_type\":\"l2\",\"num_partitions\":10";
let actual_statistics = dataset
.index_statistics("embeddings_idx")
.await
.unwrap()
.unwrap();
assert!(actual_statistics.starts_with(expected_statistics));
assert_eq!(
dataset.index_statistics("non-existent_idx").await.unwrap(),
None
);
assert_eq!(dataset.index_statistics("").await.unwrap(), None);
let write_params = WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};
let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap()];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
assert!(dataset.manifest.index_section.is_none());
assert!(dataset.load_indices().await.unwrap().is_empty());
dataset.validate().await.unwrap();
let fragment_bitmap = indices.first().unwrap().fragment_bitmap.as_ref().unwrap();
assert_eq!(fragment_bitmap.len(), 1);
assert!(fragment_bitmap.contains(0));
}
async fn create_bad_file() -> Result<Dataset> {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"a.b.c",
DataType::Int32,
false,
)]));
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20))],
)
.unwrap()
})
.collect();
let test_uri = test_dir.path().to_str().unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(reader, test_uri, None).await
}
#[tokio::test]
async fn test_bad_field_name() {
assert!(create_bad_file().await.is_err());
}
#[tokio::test]
async fn test_open_dataset_not_found() {
let result = Dataset::open(".").await;
assert!(matches!(result.unwrap_err(), Error::DatasetNotFound { .. }));
}
#[tokio::test]
async fn test_merge() {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new("x", DataType::Float32, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Float32Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 2])),
Arc::new(Float32Array::from(vec![3.0, 4.0])),
],
)
.unwrap();
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let batches = RecordBatchIterator::new(vec![batch1].into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let batches = RecordBatchIterator::new(vec![batch2].into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.fragments().len(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(1));
let right_schema = Arc::new(ArrowSchema::new(vec![
Field::new("i2", DataType::Int32, false),
Field::new("y", DataType::Utf8, true),
]));
let right_batch1 = RecordBatch::try_new(
right_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batches =
RecordBatchIterator::new(vec![right_batch1].into_iter().map(Ok), right_schema.clone());
let mut dataset = Dataset::open(test_uri).await.unwrap();
dataset.merge(batches, "i", "i2").await.unwrap();
dataset.validate().await.unwrap();
assert_eq!(dataset.version().version, 3);
assert_eq!(dataset.fragments().len(), 2);
assert_eq!(dataset.fragments()[0].files.len(), 2);
assert_eq!(dataset.fragments()[1].files.len(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(1));
let actual_batches = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
let expected = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("i", DataType::Int32, false),
Field::new("x", DataType::Float32, false),
Field::new("y", DataType::Utf8, true),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 2])),
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(StringArray::from(vec![
Some("a"),
Some("b"),
None,
Some("b"),
])),
],
)
.unwrap();
assert_eq!(actual, expected);
let dataset = Dataset::open(test_uri).await.unwrap();
let actual_batches = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
assert_eq!(actual, expected);
}
#[tokio::test]
async fn test_delete() {
fn sequence_data(range: Range<u32>) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::UInt32,
false,
)]));
RecordBatch::try_new(schema, vec![Arc::new(UInt32Array::from_iter_values(range))])
.unwrap()
}
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::UInt32,
false,
)]));
let data = sequence_data(0..100);
let batches = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema.clone());
let write_params = WriteParams {
max_rows_per_file: 50, ..Default::default()
};
let mut dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
dataset.delete("i < 0").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(1));
assert!(fragments[0].metadata.deletion_file.is_none());
assert!(fragments[1].metadata.deletion_file.is_none());
dataset.delete("i < 10 OR i >= 90").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert!(fragments[0].metadata.deletion_file.is_some());
assert!(fragments[1].metadata.deletion_file.is_some());
let store = dataset.object_store().clone();
let path = Path::from_filesystem_path(test_uri).unwrap();
let deletion_vector = read_deletion_file(&path, &fragments[0].metadata, &store)
.await
.unwrap()
.unwrap();
assert_eq!(deletion_vector.len(), 10);
assert_eq!(
deletion_vector.into_iter().collect::<HashSet<_>>(),
(0..10).collect::<HashSet<_>>()
);
let deletion_vector = read_deletion_file(&path, &fragments[1].metadata, &store)
.await
.unwrap()
.unwrap();
assert_eq!(deletion_vector.len(), 10);
assert_eq!(
deletion_vector.into_iter().collect::<HashSet<_>>(),
(40..50).collect::<HashSet<_>>()
);
let second_deletion_file = fragments[1].metadata.deletion_file.clone().unwrap();
dataset.delete("i < 20").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert!(fragments[0].metadata.deletion_file.is_some());
let deletion_vector = read_deletion_file(&path, &fragments[0].metadata, &store)
.await
.unwrap()
.unwrap();
assert_eq!(deletion_vector.len(), 20);
assert_eq!(
deletion_vector.into_iter().collect::<HashSet<_>>(),
(0..20).collect::<HashSet<_>>()
);
assert_eq!(
fragments[1].metadata.deletion_file.as_ref().unwrap(),
&second_deletion_file
);
dataset.delete("i >= 50").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].id(), 0);
let data = sequence_data(0..100);
let batches = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema.clone());
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert_eq!(fragments[0].id(), 0);
assert_eq!(fragments[1].id(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(2));
}
#[tokio::test]
async fn test_restore() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::UInt32,
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..100))],
);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
assert_eq!(dataset.manifest.version, 1);
let original_manifest = dataset.manifest.clone();
dataset.delete("i > 50").await.unwrap();
assert_eq!(dataset.manifest.version, 2);
let mut dataset = dataset.checkout_version(1).await.unwrap();
assert_eq!(dataset.manifest.version, 1);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].metadata.deletion_file, None);
assert_eq!(dataset.manifest, original_manifest);
dataset.restore(None).await.unwrap();
assert_eq!(dataset.manifest.version, 3);
assert_eq!(dataset.manifest.fragments, original_manifest.fragments);
assert_eq!(dataset.manifest.schema, original_manifest.schema);
dataset.delete("i > 30").await.unwrap();
assert_eq!(dataset.manifest.version, 4);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert!(fragments[0].metadata.deletion_file.is_some());
}
#[tokio::test]
async fn test_search_empty() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 128),
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
Float32Array::from_iter_values(vec![]),
128,
)
.unwrap(),
);
let data = RecordBatch::try_new(schema.clone(), vec![vectors]);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let mut stream = dataset
.scan()
.nearest(
"vec",
&Float32Array::from_iter_values((0..128).map(|_| 0.1)),
1,
)
.unwrap()
.try_into_stream()
.await
.unwrap();
while let Some(batch) = stream.next().await {
let schema = batch.unwrap().schema();
assert_eq!(schema.fields.len(), 2);
assert_eq!(
schema.field_with_name("vec").unwrap(),
&Field::new(
"vec",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
128
),
false,
)
);
assert_eq!(
schema.field_with_name("_distance").unwrap(),
&Field::new("_distance", DataType::Float32, false)
);
}
}
#[tokio::test]
async fn test_search_empty_after_delete() {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 128),
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
Float32Array::from_iter_values((0..128).map(|_| 0.1_f32)),
128,
)
.unwrap(),
);
let data = RecordBatch::try_new(schema.clone(), vec![vectors]);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
dataset.delete("vec IS NOT NULL").await.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
let mut stream = dataset
.scan()
.nearest(
"vec",
&Float32Array::from_iter_values((0..128).map(|_| 0.1)),
1,
)
.unwrap()
.try_into_stream()
.await
.unwrap();
while let Some(batch) = stream.next().await {
let schema = batch.unwrap().schema();
assert_eq!(schema.fields.len(), 2);
assert_eq!(
schema.field_with_name("vec").unwrap(),
&Field::new(
"vec",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
128
),
false,
)
);
assert_eq!(
schema.field_with_name("_distance").unwrap(),
&Field::new("_distance", DataType::Float32, false)
);
}
}
}