use arrow_array::{RecordBatch, RecordBatchReader};
use byteorder::{ByteOrder, LittleEndian};
use chrono::{prelude::*, Duration};
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::{FutureExt, Stream};
use itertools::Itertools;
use lance_core::datatypes::NullabilityComparison;
use lance_core::utils::address::RowAddress;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::{datatypes::SchemaCompareOptions, traits::DatasetTakeRows};
use lance_datafusion::projection::ProjectionPlan;
use lance_datafusion::utils::{peek_reader_schema, reader_to_stream};
use lance_file::datatypes::populate_schema_dictionary;
use lance_file::version::LanceFileVersion;
use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use lance_io::object_writer::ObjectWriter;
use lance_io::traits::WriteExt;
use lance_io::utils::{read_last_block, read_metadata_offset, read_struct};
use lance_table::format::{
DataStorageFormat, Fragment, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION,
};
use lance_table::io::commit::{
commit_handler_from_url, migrate_scheme_to_v2, CommitError, CommitHandler, CommitLock,
ManifestLocation, ManifestNamingScheme,
};
use lance_table::io::manifest::{read_manifest, write_manifest};
use log::{info, warn};
use object_store::path::Path;
use prost::Message;
use rowids::get_row_id_index;
use snafu::{location, Location};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use tracing::instrument;
mod blob;
pub mod builder;
pub mod cleanup;
pub mod fragment;
mod hash_joiner;
pub mod index;
pub mod optimize;
pub mod progress;
pub mod refs;
pub(crate) mod rowids;
pub mod scanner;
mod schema_evolution;
mod take;
pub mod transaction;
pub mod updater;
mod utils;
mod write;
use self::builder::DatasetBuilder;
use self::cleanup::RemovalStats;
use self::fragment::FileFragment;
use self::refs::Tags;
use self::scanner::{DatasetRecordBatchStream, Scanner};
use self::transaction::{Operation, Transaction};
use self::write::write_fragments_internal;
use crate::datatypes::Schema;
use crate::error::box_error;
use crate::io::commit::{commit_detached_transaction, commit_new_dataset, commit_transaction};
use crate::session::Session;
use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime};
use crate::{Error, Result};
pub use blob::BlobFile;
use hash_joiner::HashJoiner;
pub use lance_core::ROW_ID;
use lance_table::feature_flags::{apply_feature_flags, can_read_dataset, can_write_dataset};
pub use schema_evolution::{
BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
};
pub use take::TakeBuilder;
pub use write::merge_insert::{
MergeInsertBuilder, MergeInsertJob, WhenMatched, WhenNotMatched, WhenNotMatchedBySource,
};
pub use write::update::{UpdateBuilder, UpdateJob};
pub use write::{write_fragments, WriteMode, WriteParams};
const INDICES_DIR: &str = "_indices";
pub const DATA_DIR: &str = "data";
pub const BLOB_DIR: &str = "_blobs";
pub(crate) const DEFAULT_INDEX_CACHE_SIZE: usize = 256;
pub(crate) const DEFAULT_METADATA_CACHE_SIZE: usize = 256;
#[derive(Debug, Clone)]
pub struct Dataset {
pub object_store: Arc<ObjectStore>,
pub(crate) commit_handler: Arc<dyn CommitHandler>,
uri: String,
pub(crate) base: Path,
pub(crate) manifest: Arc<Manifest>,
pub(crate) session: Arc<Session>,
pub tags: Tags,
pub manifest_naming_scheme: ManifestNamingScheme,
}
pub struct Version {
pub version: u64,
pub timestamp: DateTime<Utc>,
pub metadata: BTreeMap<String, String>,
}
impl From<&Manifest> for Version {
fn from(m: &Manifest) -> Self {
Self {
version: m.version,
timestamp: m.timestamp(),
metadata: BTreeMap::default(),
}
}
}
#[derive(Clone, Debug)]
pub struct ReadParams {
pub index_cache_size: usize,
pub metadata_cache_size: usize,
pub session: Option<Arc<Session>>,
pub store_options: Option<ObjectStoreParams>,
pub commit_handler: Option<Arc<dyn CommitHandler>>,
pub object_store_registry: Arc<ObjectStoreRegistry>,
}
impl ReadParams {
pub fn index_cache_size(&mut self, cache_size: usize) -> &mut Self {
self.index_cache_size = cache_size;
self
}
pub fn metadata_cache_size(&mut self, cache_size: usize) -> &mut Self {
self.metadata_cache_size = cache_size;
self
}
pub fn session(&mut self, session: Arc<Session>) -> &mut Self {
self.session = Some(session);
self
}
pub fn with_object_store_registry(
&mut self,
object_store_registry: Arc<ObjectStoreRegistry>,
) -> &mut Self {
self.object_store_registry = object_store_registry;
self
}
pub fn set_commit_lock<T: CommitLock + Send + Sync + 'static>(&mut self, lock: Arc<T>) {
self.commit_handler = Some(Arc::new(lock));
}
}
impl Default for ReadParams {
fn default() -> Self {
Self {
index_cache_size: DEFAULT_INDEX_CACHE_SIZE,
metadata_cache_size: DEFAULT_METADATA_CACHE_SIZE,
session: None,
store_options: None,
commit_handler: None,
object_store_registry: Arc::new(ObjectStoreRegistry::default()),
}
}
}
#[derive(Debug, Clone)]
pub enum ProjectionRequest {
Schema(Arc<Schema>),
Sql(Vec<(String, String)>),
}
impl ProjectionRequest {
pub fn from_columns(
columns: impl IntoIterator<Item = impl AsRef<str>>,
dataset_schema: &Schema,
) -> Self {
let columns = columns
.into_iter()
.map(|s| s.as_ref().to_string())
.collect::<Vec<_>>();
let schema = dataset_schema.project(&columns).unwrap();
Self::Schema(Arc::new(schema))
}
pub fn from_schema(schema: Schema) -> Self {
Self::Schema(Arc::new(schema))
}
pub fn from_sql(
columns: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
Self::Sql(
columns
.into_iter()
.map(|(a, b)| (a.into(), b.into()))
.collect(),
)
}
pub fn into_projection_plan(self, dataset_schema: &Schema) -> Result<ProjectionPlan> {
match self {
Self::Schema(schema) => Ok(ProjectionPlan::new_empty(
Arc::new(dataset_schema.project_by_schema(schema.as_ref())?),
false,
)),
Self::Sql(columns) => {
ProjectionPlan::try_new(dataset_schema, &columns, false)
}
}
}
}
impl From<Arc<Schema>> for ProjectionRequest {
fn from(schema: Arc<Schema>) -> Self {
Self::Schema(schema)
}
}
impl From<Schema> for ProjectionRequest {
fn from(schema: Schema) -> Self {
Self::from(Arc::new(schema))
}
}
impl Dataset {
#[instrument]
pub async fn open(uri: &str) -> Result<Self> {
DatasetBuilder::from_uri(uri).load().await
}
async fn params_from_uri(
uri: &str,
commit_handler: &Option<Arc<dyn CommitHandler>>,
store_options: &Option<ObjectStoreParams>,
object_store_registry: Arc<ObjectStoreRegistry>,
) -> Result<(ObjectStore, Path, Arc<dyn CommitHandler>)> {
let (mut object_store, base_path) = match store_options.as_ref() {
Some(store_options) => {
ObjectStore::from_uri_and_params(object_store_registry, uri, store_options).await?
}
None => ObjectStore::from_uri(uri).await?,
};
if let Some(block_size) = store_options.as_ref().and_then(|opts| opts.block_size) {
object_store.set_block_size(block_size);
}
let commit_handler = match &commit_handler {
None => {
if store_options.is_some() && store_options.as_ref().unwrap().object_store.is_some()
{
return Err(Error::InvalidInput { source: "when creating a dataset with a custom object store the commit_handler must also be specified".into(), location: location!() });
}
commit_handler_from_url(uri, store_options).await?
}
Some(commit_handler) => {
if uri.starts_with("s3+ddb") {
return Err(Error::InvalidInput {
source:
"`s3+ddb://` scheme and custom commit handler are mutually exclusive"
.into(),
location: location!(),
});
} else {
commit_handler.clone()
}
}
};
Ok((object_store, base_path, commit_handler))
}
pub async fn checkout_version(&self, version: impl Into<refs::Ref>) -> Result<Self> {
let ref_: refs::Ref = version.into();
match ref_ {
refs::Ref::Version(version) => self.checkout_by_version_number(version).await,
refs::Ref::Tag(tag) => self.checkout_by_tag(tag.as_str()).await,
}
}
pub async fn checkout_latest(&mut self) -> Result<()> {
self.manifest = Arc::new(self.latest_manifest().await?);
Ok(())
}
async fn checkout_by_version_number(&self, version: u64) -> Result<Self> {
let base_path = self.base.clone();
let manifest_location = self
.commit_handler
.resolve_version_location(&base_path, version, &self.object_store.inner)
.await?;
let manifest = Self::load_manifest(self.object_store.as_ref(), &manifest_location).await?;
Self::checkout_manifest(
self.object_store.clone(),
base_path,
self.uri.clone(),
manifest,
self.session.clone(),
self.commit_handler.clone(),
manifest_location.naming_scheme,
)
.await
}
async fn checkout_by_tag(&self, tag: &str) -> Result<Self> {
let version = self.tags.get_version(tag).await?;
self.checkout_by_version_number(version).await
}
async fn load_manifest(
object_store: &ObjectStore,
manifest_location: &ManifestLocation,
) -> Result<Manifest> {
let object_reader = if let Some(size) = manifest_location.size {
object_store
.open_with_size(&manifest_location.path, size as usize)
.await
} else {
object_store.open(&manifest_location.path).await
};
let object_reader = object_reader.map_err(|e| match &e {
Error::NotFound { uri, .. } => Error::DatasetNotFound {
path: uri.clone(),
source: box_error(e),
location: location!(),
},
_ => e,
})?;
let last_block =
read_last_block(object_reader.as_ref())
.await
.map_err(|err| match err {
object_store::Error::NotFound { path, source } => Error::DatasetNotFound {
path,
source,
location: location!(),
},
_ => Error::IO {
source: err.into(),
location: location!(),
},
})?;
let offset = read_metadata_offset(&last_block)?;
let manifest_size = object_reader.size().await?;
let mut manifest = if manifest_size - offset <= last_block.len() {
let manifest_len = manifest_size - offset;
let offset_in_block = last_block.len() - manifest_len;
let message_len =
LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)
} else {
read_struct(object_reader.as_ref(), offset).await
}?;
if !can_read_dataset(manifest.reader_feature_flags) {
let message = format!(
"This dataset cannot be read by this version of Lance. \
Please upgrade Lance to read this dataset.\n Flags: {}",
manifest.reader_feature_flags
);
return Err(Error::NotSupported {
source: message.into(),
location: location!(),
});
}
populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
Ok(manifest)
}
async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
uri: String,
manifest: Manifest,
session: Arc<Session>,
commit_handler: Arc<dyn CommitHandler>,
manifest_naming_scheme: ManifestNamingScheme,
) -> Result<Self> {
let tags = Tags::new(
object_store.clone(),
commit_handler.clone(),
base_path.clone(),
);
Ok(Self {
object_store,
base: base_path,
uri,
manifest: Arc::new(manifest),
commit_handler,
session,
tags,
manifest_naming_scheme,
})
}
#[instrument(skip(batches, params))]
async fn write_impl(
batches: Box<dyn RecordBatchReader + Send>,
uri: &str,
params: Option<WriteParams>,
) -> Result<Self> {
let mut params = params.unwrap_or_default();
let (object_store, base, commit_handler) = Self::params_from_uri(
uri,
¶ms.commit_handler,
¶ms.store_params,
params.object_store_registry.clone(),
)
.await?;
let dataset_exists = match commit_handler
.resolve_latest_version(&base, &object_store)
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};
let (batches, schema) = peek_reader_schema(Box::new(batches)).await?;
let stream = reader_to_stream(batches);
if dataset_exists && matches!(params.mode, WriteMode::Create) {
return Err(Error::DatasetAlreadyExists {
uri: uri.to_owned(),
location: location!(),
});
}
if !dataset_exists
&& (matches!(params.mode, WriteMode::Append)
|| matches!(params.mode, WriteMode::Overwrite))
{
warn!("No existing dataset at {uri}, it will be created");
params = WriteParams {
mode: WriteMode::Create,
..params
};
}
let dataset = if matches!(params.mode, WriteMode::Create) {
None
} else {
Some(
DatasetBuilder::from_uri(uri)
.with_read_params(ReadParams {
store_options: params.store_params.clone(),
commit_handler: params.commit_handler.clone(),
object_store_registry: params.object_store_registry.clone(),
..Default::default()
})
.load()
.await?,
)
};
let mut storage_version = match (params.mode, dataset.as_ref()) {
(WriteMode::Append, Some(dataset)) => {
let m = dataset.manifest.as_ref();
m.data_storage_format.lance_file_version()?
}
(WriteMode::Overwrite, Some(dataset)) => {
params.data_storage_version.map(Ok).unwrap_or_else(|| {
let m = dataset.manifest.as_ref();
m.data_storage_format.lance_file_version()
})?
}
_ => params.storage_version_or_default(),
};
if matches!(params.mode, WriteMode::Append) {
if let Some(d) = dataset.as_ref() {
if params.enable_move_stable_row_ids != d.manifest.uses_move_stable_row_ids() {
info!(
"Ignoring user provided move stable row ids setting of {}, dataset already has it set to {}",
params.enable_move_stable_row_ids,
d.manifest.uses_move_stable_row_ids()
);
params.enable_move_stable_row_ids = d.manifest.uses_move_stable_row_ids();
}
let m = d.manifest.as_ref();
let mut schema_cmp_opts = SchemaCompareOptions {
compare_dictionary: true,
compare_nullability: NullabilityComparison::Ignore,
..Default::default()
};
if m.blob_dataset_version.is_none() {
schema_cmp_opts.ignore_field_order = true;
schema_cmp_opts.allow_missing_if_nullable = true;
}
schema.check_compatible(&m.schema, &schema_cmp_opts)?;
storage_version = m.data_storage_format.lance_file_version()?;
}
}
if dataset.is_none()
&& !params.enable_move_stable_row_ids
&& schema.fields.iter().any(|f| !f.is_default_storage())
{
info!("Enabling move stable row ids because non-default storage is used");
params.enable_move_stable_row_ids = true;
}
let manifest_naming_scheme = if let Some(d) = dataset.as_ref() {
d.manifest_naming_scheme
} else if params.enable_v2_manifest_paths {
ManifestNamingScheme::V2
} else {
ManifestNamingScheme::V1
};
let params = params;
if let Some(d) = dataset.as_ref() {
if !can_write_dataset(d.manifest.writer_feature_flags) {
let message = format!(
"This dataset cannot be written by this version of Lance. \
Please upgrade Lance to write to this dataset.\n Flags: {}",
d.manifest.writer_feature_flags
);
return Err(Error::NotSupported {
source: message.into(),
location: location!(),
});
}
}
let object_store = Arc::new(object_store);
let written_frags = write_fragments_internal(
dataset.as_ref(),
object_store.clone(),
&base,
schema.clone(),
stream,
params.clone(),
)
.await?;
let operation = match params.mode {
WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite {
schema,
fragments: written_frags.default.0,
config_upsert_values: None,
},
WriteMode::Append => Operation::Append {
fragments: written_frags.default.0,
},
};
let blobs_op = written_frags.blob.map(|blob| match params.mode {
WriteMode::Create | WriteMode::Overwrite => Operation::Overwrite {
schema: blob.1,
fragments: blob.0,
config_upsert_values: None,
},
WriteMode::Append => Operation::Append { fragments: blob.0 },
});
let transaction = Transaction::new(
dataset.as_ref().map(|ds| ds.manifest.version).unwrap_or(0),
operation,
blobs_op,
None,
);
let manifest_config = ManifestWriteConfig {
use_move_stable_row_ids: params.enable_move_stable_row_ids,
storage_format: Some(DataStorageFormat::new(storage_version)),
..Default::default()
};
let manifest = if let Some(dataset) = &dataset {
commit_transaction(
dataset,
&object_store,
commit_handler.as_ref(),
&transaction,
&manifest_config,
&Default::default(),
manifest_naming_scheme,
)
.await?
} else {
commit_new_dataset(
&object_store,
commit_handler.as_ref(),
&base,
&transaction,
&manifest_config,
manifest_naming_scheme,
)
.await?
};
let tags = Tags::new(object_store.clone(), commit_handler.clone(), base.clone());
Ok(Self {
object_store,
base,
uri: uri.to_string(),
manifest: Arc::new(manifest),
session: Arc::new(Session::default()),
commit_handler,
tags,
manifest_naming_scheme,
})
}
pub async fn write(
batches: impl RecordBatchReader + Send + 'static,
uri: &str,
params: Option<WriteParams>,
) -> Result<Self> {
let batches = Box::new(batches);
Self::write_impl(batches, uri, params).await
}
async fn append_impl(
&mut self,
batches: Box<dyn RecordBatchReader + Send>,
params: Option<WriteParams>,
) -> Result<()> {
let mut params = WriteParams {
mode: WriteMode::Append,
..params.unwrap_or_default()
};
if params.commit_handler.is_some() || params.store_params.is_some() {
return Err(Error::InvalidInput {
source: "commit_handler / store_params should not be specified when calling append"
.into(),
location: location!(),
});
}
let (batches, schema) = peek_reader_schema(Box::new(batches)).await?;
let stream = reader_to_stream(batches);
let mut schema_cmp_opts = SchemaCompareOptions {
compare_dictionary: true,
compare_nullability: NullabilityComparison::Ignore,
..Default::default()
};
if self.manifest.blob_dataset_version.is_none() {
schema_cmp_opts.ignore_field_order = true;
schema_cmp_opts.allow_missing_if_nullable = true;
}
schema.check_compatible(&self.manifest.schema, &schema_cmp_opts)?;
if params.enable_move_stable_row_ids != self.manifest.uses_move_stable_row_ids() {
info!(
"Ignoring user provided move stable row ids setting of {}, dataset already has it set to {}",
params.enable_move_stable_row_ids,
self.manifest.uses_move_stable_row_ids()
);
params.enable_move_stable_row_ids = self.manifest.uses_move_stable_row_ids();
}
let written_frags = write_fragments_internal(
Some(self),
self.object_store.clone(),
&self.base,
schema,
stream,
params.clone(),
)
.await?;
let blobs_op = written_frags
.blob
.map(|blobs| Operation::Append { fragments: blobs.0 });
let transaction = Transaction::new(
self.manifest.version,
Operation::Append {
fragments: written_frags.default.0,
},
blobs_op,
None,
);
let new_manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;
self.manifest = Arc::new(new_manifest);
Ok(())
}
pub async fn append(
&mut self,
batches: impl RecordBatchReader + Send + 'static,
params: Option<WriteParams>,
) -> Result<()> {
let batches = Box::new(batches);
self.append_impl(batches, params).await
}
pub fn uri(&self) -> &str {
&self.uri
}
pub fn manifest(&self) -> &Manifest {
&self.manifest
}
pub async fn blobs_dataset(&self) -> Result<Option<Arc<Self>>> {
if let Some(blobs_version) = self.manifest.blob_dataset_version {
let blobs_path = self.base.child(BLOB_DIR);
let blob_manifest_location = self
.commit_handler
.resolve_version_location(&blobs_path, blobs_version, &self.object_store.inner)
.await?;
let manifest = read_manifest(&self.object_store, &blob_manifest_location.path).await?;
let blobs_dataset = Self::checkout_manifest(
self.object_store.clone(),
blobs_path,
format!("{}/{}", self.uri, BLOB_DIR),
manifest,
self.session.clone(),
self.commit_handler.clone(),
ManifestNamingScheme::V2,
)
.await?;
Ok(Some(Arc::new(blobs_dataset)))
} else {
Ok(None)
}
}
pub(crate) fn is_legacy_storage(&self) -> bool {
self.manifest
.data_storage_format
.lance_file_version()
.unwrap()
== LanceFileVersion::Legacy
}
pub async fn latest_manifest(&self) -> Result<Manifest> {
read_manifest(
&self.object_store,
&self
.commit_handler
.resolve_latest_version(&self.base, &self.object_store)
.await?,
)
.await
}
pub async fn read_transaction(&self) -> Result<Option<Transaction>> {
let path = match &self.manifest.transaction_file {
Some(path) => self.base.child("_transactions").child(path.as_str()),
None => return Ok(None),
};
let data = self.object_store.inner.get(&path).await?.bytes().await?;
let transaction = lance_table::format::pb::Transaction::decode(data)?;
Transaction::try_from(transaction).map(Some)
}
pub async fn restore(&mut self) -> Result<()> {
let latest_manifest = self.latest_manifest().await?;
let latest_version = latest_manifest.version;
let transaction = Transaction::new(
latest_version,
Operation::Restore {
version: self.manifest.version,
},
None,
None,
);
self.manifest = Arc::new(
commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?,
);
Ok(())
}
pub fn cleanup_old_versions(
&self,
older_than: Duration,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> BoxFuture<Result<RemovalStats>> {
let before = utc_now() - older_than;
cleanup::cleanup_old_versions(
self,
before,
delete_unverified,
error_if_tagged_old_versions,
)
.boxed()
}
#[allow(clippy::too_many_arguments)]
async fn do_commit(
base_uri: &str,
operation: Operation,
blobs_op: Option<Operation>,
read_version: Option<u64>,
store_params: Option<ObjectStoreParams>,
commit_handler: Option<Arc<dyn CommitHandler>>,
object_store_registry: Arc<ObjectStoreRegistry>,
enable_v2_manifest_paths: bool,
detached: bool,
) -> Result<Self> {
let read_version = read_version.map_or_else(
|| match operation {
Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
_ => Err(Error::invalid_input(
"read_version must be specified for this operation",
location!(),
)),
},
Ok,
)?;
let (object_store, base, commit_handler) = Self::params_from_uri(
base_uri,
&commit_handler,
&store_params,
object_store_registry.clone(),
)
.await?;
let dataset_exists = match commit_handler
.resolve_latest_version(&base, &object_store)
.await
{
Ok(_) => true,
Err(Error::NotFound { .. }) => false,
Err(e) => return Err(e),
};
if !dataset_exists && !matches!(operation, Operation::Overwrite { .. }) {
return Err(Error::DatasetNotFound {
path: base.to_string(),
source: "The dataset must already exist unless the operation is Overwrite".into(),
location: location!(),
});
}
let dataset = if dataset_exists {
let mut builder = DatasetBuilder::from_uri(base_uri).with_read_params(ReadParams {
store_options: store_params.clone(),
object_store_registry: object_store_registry.clone(),
..Default::default()
});
if detached {
builder = builder.with_version(read_version);
}
Some(builder.load().await?)
} else {
None
};
let manifest_naming_scheme = if let Some(ds) = &dataset {
ds.manifest_naming_scheme
} else if enable_v2_manifest_paths {
ManifestNamingScheme::V2
} else {
ManifestNamingScheme::V1
};
let transaction = Transaction::new(read_version, operation, blobs_op, None);
let manifest = if let Some(dataset) = &dataset {
if detached {
if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) {
return Err(Error::NotSupported {
source: "detached commits cannot be used with v1 manifest paths".into(),
location: location!(),
});
}
commit_detached_transaction(
dataset,
&object_store,
commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
)
.await?
} else {
commit_transaction(
dataset,
&object_store,
commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
manifest_naming_scheme,
)
.await?
}
} else if !detached {
commit_new_dataset(
&object_store,
commit_handler.as_ref(),
&base,
&transaction,
&Default::default(),
manifest_naming_scheme,
)
.await?
} else {
return Err(Error::NotSupported {
source: "detached commits cannot currently be used to create new datasets".into(),
location: location!(),
});
};
let object_store = Arc::new(object_store);
let tags = Tags::new(object_store.clone(), commit_handler.clone(), base.clone());
Ok(Self {
object_store,
base,
uri: base_uri.to_string(),
manifest: Arc::new(manifest),
session: Arc::new(Session::default()),
commit_handler,
tags,
manifest_naming_scheme,
})
}
pub async fn commit(
base_uri: &str,
operation: Operation,
read_version: Option<u64>,
store_params: Option<ObjectStoreParams>,
commit_handler: Option<Arc<dyn CommitHandler>>,
object_store_registry: Arc<ObjectStoreRegistry>,
enable_v2_manifest_paths: bool,
) -> Result<Self> {
Self::do_commit(
base_uri,
operation,
None,
read_version,
store_params,
commit_handler,
object_store_registry,
enable_v2_manifest_paths,
false,
)
.await
}
pub async fn commit_detached(
base_uri: &str,
operation: Operation,
read_version: Option<u64>,
store_params: Option<ObjectStoreParams>,
commit_handler: Option<Arc<dyn CommitHandler>>,
object_store_registry: Arc<ObjectStoreRegistry>,
enable_v2_manifest_paths: bool,
) -> Result<Self> {
Self::do_commit(
base_uri,
operation,
None,
read_version,
store_params,
commit_handler,
object_store_registry,
enable_v2_manifest_paths,
true,
)
.await
}
pub fn scan(&self) -> Scanner {
Scanner::new(Arc::new(self.clone()))
}
#[instrument(skip_all)]
pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
if let Some(filter) = filter {
let mut scanner = self.scan();
scanner.filter(&filter)?;
Ok(scanner
.project::<String>(&[])?
.with_row_id() .count_rows()
.await? as usize)
} else {
self.count_all_rows().await
}
}
pub(crate) async fn count_all_rows(&self) -> Result<usize> {
let cnts = stream::iter(self.get_fragments())
.map(|f| async move { f.count_rows().await })
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
Ok(cnts.iter().sum())
}
#[instrument(skip_all, fields(num_rows=row_indices.len()))]
pub async fn take(
&self,
row_indices: &[u64],
projection: impl Into<ProjectionRequest>,
) -> Result<RecordBatch> {
take::take(self, row_indices, projection.into()).await
}
pub async fn take_rows(
&self,
row_ids: &[u64],
projection: impl Into<ProjectionRequest>,
) -> Result<RecordBatch> {
Arc::new(self.clone())
.take_builder(row_ids, projection)?
.execute()
.await
}
pub fn take_builder(
self: &Arc<Self>,
row_ids: &[u64],
projection: impl Into<ProjectionRequest>,
) -> Result<TakeBuilder> {
TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into())
}
pub async fn take_blobs(
self: &Arc<Self>,
row_ids: &[u64],
column: impl AsRef<str>,
) -> Result<Vec<BlobFile>> {
blob::take_blobs(self, row_ids, column.as_ref()).await
}
pub fn take_scan(
&self,
row_ranges: Pin<Box<dyn Stream<Item = Result<Range<u64>>> + Send>>,
projection: Arc<Schema>,
batch_readahead: usize,
) -> DatasetRecordBatchStream {
take::take_scan(self, row_ranges, projection, batch_readahead)
}
pub(crate) async fn sample(&self, n: usize, projection: &Schema) -> Result<RecordBatch> {
use rand::seq::IteratorRandom;
let num_rows = self.count_rows(None).await?;
let ids = (0..num_rows as u64).choose_multiple(&mut rand::thread_rng(), n);
self.take(&ids, projection.clone()).await
}
pub async fn delete(&mut self, predicate: &str) -> Result<()> {
let mut updated_fragments: Vec<Fragment> = Vec::new();
let mut deleted_fragment_ids: Vec<u64> = Vec::new();
stream::iter(self.get_fragments())
.map(|f| async move {
let old_fragment = f.metadata.clone();
let new_fragment = f.delete(predicate).await?.map(|f| f.metadata);
Ok((old_fragment, new_fragment))
})
.buffer_unordered(get_num_compute_intensive_cpus())
.try_for_each(|(old_fragment, new_fragment)| {
if let Some(new_fragment) = new_fragment {
if new_fragment != old_fragment {
updated_fragments.push(new_fragment);
}
} else {
deleted_fragment_ids.push(old_fragment.id);
}
futures::future::ready(Ok::<_, crate::Error>(()))
})
.await?;
let transaction = Transaction::new(
self.manifest.version,
Operation::Delete {
updated_fragments,
deleted_fragment_ids,
predicate: predicate.to_string(),
},
None,
None,
);
let manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;
self.manifest = Arc::new(manifest);
Ok(())
}
pub async fn count_deleted_rows(&self) -> Result<usize> {
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.count_deletions().await })
.buffer_unordered(self.object_store.io_parallelism())
.try_fold(0, |acc, x| futures::future::ready(Ok(acc + x)))
.await
}
pub(crate) fn object_store(&self) -> &ObjectStore {
&self.object_store
}
pub(crate) async fn manifest_file(&self, version: u64) -> Result<Path> {
self.commit_handler
.resolve_version(&self.base, version, &self.object_store.inner)
.await
}
pub(crate) fn data_dir(&self) -> Path {
self.base.child(DATA_DIR)
}
pub(crate) fn indices_dir(&self) -> Path {
self.base.child(INDICES_DIR)
}
pub fn session(&self) -> Arc<Session> {
self.session.clone()
}
pub fn version(&self) -> Version {
Version::from(self.manifest.as_ref())
}
pub fn index_cache_entry_count(&self) -> usize {
self.session.index_cache.get_size()
}
pub fn index_cache_hit_rate(&self) -> f32 {
self.session.index_cache.hit_rate()
}
pub fn cache_size_bytes(&self) -> u64 {
self.session.deep_size_of() as u64
}
pub async fn versions(&self) -> Result<Vec<Version>> {
let mut versions: Vec<Version> = self
.commit_handler
.list_manifests(&self.base, &self.object_store.inner)
.await?
.try_filter_map(|path| async move {
match read_manifest(&self.object_store, &path).await {
Ok(manifest) => Ok(Some(Version::from(&manifest))),
Err(e) => Err(e),
}
})
.try_collect()
.await?;
versions.sort_by_key(|v| v.version);
Ok(versions)
}
pub async fn latest_version_id(&self) -> Result<u64> {
self.commit_handler
.resolve_latest_version_id(&self.base, &self.object_store)
.await
}
pub fn count_fragments(&self) -> usize {
self.manifest.fragments.len()
}
pub fn schema(&self) -> &Schema {
&self.manifest.schema
}
pub fn local_schema(&self) -> &Schema {
&self.manifest.local_schema
}
pub fn get_fragments(&self) -> Vec<FileFragment> {
let dataset = Arc::new(self.clone());
self.manifest
.fragments
.iter()
.map(|f| FileFragment::new(dataset.clone(), f.clone()))
.collect()
}
pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
let dataset = Arc::new(self.clone());
let fragment = self
.manifest
.fragments
.iter()
.find(|f| f.id == fragment_id as u64)?;
Some(FileFragment::new(dataset, fragment.clone()))
}
pub(crate) fn fragments(&self) -> &Arc<Vec<Fragment>> {
&self.manifest.fragments
}
fn get_frags_from_ordered_ids(&self, ordered_ids: &[u32]) -> Vec<Option<FileFragment>> {
let mut fragments = Vec::with_capacity(ordered_ids.len());
let mut id_iter = ordered_ids.iter();
let mut id = id_iter.next();
let mut last_id: i64 = -1;
for frag in self.manifest.fragments.iter() {
let mut the_id = if let Some(id) = id { *id } else { break };
assert!(the_id as i64 > last_id);
while the_id < frag.id as u32 {
fragments.push(None);
last_id = the_id as i64;
id = id_iter.next();
the_id = if let Some(id) = id { *id } else { break };
}
if the_id == frag.id as u32 {
fragments.push(Some(FileFragment::new(
Arc::new(self.clone()),
frag.clone(),
)));
last_id = the_id as i64;
id = id_iter.next();
}
}
fragments
}
async fn filter_addr_or_ids(&self, addr_or_ids: &[u64], addrs: &[u64]) -> Result<Vec<u64>> {
if addrs.is_empty() {
return Ok(Vec::new());
}
let mut perm = permutation::sort(addrs);
let sorted_addrs = perm.apply_slice(addrs);
let referenced_frag_ids = sorted_addrs
.iter()
.map(|addr| RowAddress::from(*addr).fragment_id())
.dedup()
.collect::<Vec<_>>();
let frags = self.get_frags_from_ordered_ids(&referenced_frag_ids);
let dv_futs = frags
.iter()
.map(|frag| {
if let Some(frag) = frag {
frag.get_deletion_vector().boxed()
} else {
std::future::ready(Ok(None)).boxed()
}
})
.collect::<Vec<_>>();
let dvs = stream::iter(dv_futs)
.buffered(self.object_store.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
let mut filtered_sorted_ids = Vec::with_capacity(sorted_addrs.len());
let mut sorted_addr_iter = sorted_addrs.into_iter().map(RowAddress::from);
let mut next_addr = sorted_addr_iter.next().unwrap();
let mut exhausted = false;
for frag_dv in frags.iter().zip(dvs).zip(referenced_frag_ids.iter()) {
let ((frag, dv), frag_id) = frag_dv;
if frag.is_some() {
if let Some(dv) = dv.as_ref() {
for deleted in dv.to_sorted_iter() {
while next_addr.fragment_id() == *frag_id
&& next_addr.row_offset() < deleted
{
filtered_sorted_ids.push(Some(u64::from(next_addr)));
if let Some(next) = sorted_addr_iter.next() {
next_addr = next;
} else {
exhausted = true;
break;
}
}
if exhausted {
break;
}
if next_addr.fragment_id() != *frag_id {
break;
}
if next_addr.row_offset() == deleted {
filtered_sorted_ids.push(None);
if let Some(next) = sorted_addr_iter.next() {
next_addr = next;
} else {
exhausted = true;
break;
}
}
}
}
if exhausted {
break;
}
while next_addr.fragment_id() == *frag_id {
filtered_sorted_ids.push(Some(u64::from(next_addr)));
if let Some(next) = sorted_addr_iter.next() {
next_addr = next;
} else {
break;
}
}
} else {
while next_addr.fragment_id() == *frag_id {
filtered_sorted_ids.push(None);
if let Some(next) = sorted_addr_iter.next() {
next_addr = next;
} else {
break;
}
}
}
}
perm.apply_inv_slice_in_place(&mut filtered_sorted_ids);
Ok(addr_or_ids
.iter()
.zip(filtered_sorted_ids)
.filter_map(|(addr_or_id, maybe_addr)| maybe_addr.map(|_| *addr_or_id))
.collect())
}
pub(crate) async fn filter_deleted_addresses(&self, addrs: &[u64]) -> Result<Vec<u64>> {
self.filter_addr_or_ids(addrs, addrs).await
}
pub(crate) async fn filter_deleted_ids(&self, ids: &[u64]) -> Result<Vec<u64>> {
let addresses = if let Some(row_id_index) = get_row_id_index(self).await? {
let addresses = ids
.iter()
.filter_map(|id| row_id_index.get(*id).map(|address| address.into()))
.collect::<Vec<_>>();
Cow::Owned(addresses)
} else {
Cow::Borrowed(ids)
};
self.filter_addr_or_ids(ids, &addresses).await
}
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.physical_rows().await })
.buffered(self.object_store.io_parallelism())
.try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group))
.count()
.await
}
pub async fn validate(&self) -> Result<()> {
let id_counts =
self.manifest
.fragments
.iter()
.map(|f| f.id)
.fold(HashMap::new(), |mut acc, id| {
*acc.entry(id).or_insert(0) += 1;
acc
});
for (id, count) in id_counts {
if count > 1 {
return Err(Error::corrupt_file(
self.base.clone(),
format!(
"Duplicate fragment id {} found in dataset {:?}",
id, self.base
),
location!(),
));
}
}
self.manifest
.fragments
.iter()
.map(|f| f.id)
.try_fold(0, |prev, id| {
if id < prev {
Err(Error::corrupt_file(
self.base.clone(),
format!(
"Fragment ids are not sorted in increasing fragment-id order. Found {} after {} in dataset {:?}",
id, prev, self.base
),
location!(),
))
} else {
Ok(id)
}
})?;
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.validate().await })
.buffer_unordered(self.object_store.io_parallelism())
.try_collect::<Vec<()>>()
.await?;
Ok(())
}
pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> {
migrate_scheme_to_v2(self.object_store(), &self.base).await?;
let latest_version = self.latest_version_id().await?;
*self = self.checkout_version(latest_version).await?;
Ok(())
}
}
impl Dataset {
pub async fn add_columns(
&mut self,
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
batch_size: Option<u32>,
) -> Result<()> {
schema_evolution::add_columns(self, transforms, read_columns, batch_size).await
}
pub async fn alter_columns(&mut self, alterations: &[ColumnAlteration]) -> Result<()> {
schema_evolution::alter_columns(self, alterations).await
}
pub async fn drop_columns(&mut self, columns: &[&str]) -> Result<()> {
schema_evolution::drop_columns(self, columns).await
}
#[deprecated(since = "0.9.12", note = "Please use `drop_columns` instead.")]
pub async fn drop(&mut self, columns: &[&str]) -> Result<()> {
self.drop_columns(columns).await
}
async fn merge_impl(
&mut self,
stream: Box<dyn RecordBatchReader + Send>,
left_on: &str,
right_on: &str,
) -> Result<()> {
if self.schema().field(left_on).is_none() {
return Err(Error::invalid_input(
format!("Column {} does not exist in the left side dataset", left_on),
location!(),
));
};
let right_schema = stream.schema();
if right_schema.field_with_name(right_on).is_err() {
return Err(Error::invalid_input(
format!(
"Column {} does not exist in the right side dataset",
right_on
),
location!(),
));
};
for field in right_schema.fields() {
if field.name() == right_on {
continue;
}
if self.schema().field(field.name()).is_some() {
return Err(Error::invalid_input(
format!(
"Column {} exists in both sides of the dataset",
field.name()
),
location!(),
));
}
}
let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
let mut new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
new_schema.set_field_id(Some(self.manifest.max_field_id()));
let updated_fragments: Vec<Fragment> = stream::iter(self.get_fragments())
.then(|f| {
let joiner = joiner.clone();
async move { f.merge(left_on, &joiner).await.map(|f| f.metadata) }
})
.try_collect::<Vec<_>>()
.await?;
let transaction = Transaction::new(
self.manifest.version,
Operation::Merge {
fragments: updated_fragments,
schema: new_schema,
},
None,
None,
);
let manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;
self.manifest = Arc::new(manifest);
Ok(())
}
pub async fn merge(
&mut self,
stream: impl RecordBatchReader + Send + 'static,
left_on: &str,
right_on: &str,
) -> Result<()> {
let stream = Box::new(stream);
self.merge_impl(stream, left_on, right_on).await
}
pub async fn update_config(
&mut self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
let transaction = Transaction::new(
self.manifest.version,
Operation::UpdateConfig {
upsert_values: Some(HashMap::from_iter(upsert_values)),
delete_keys: None,
},
None,
None,
);
let manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;
self.manifest = Arc::new(manifest);
Ok(())
}
pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
let transaction = Transaction::new(
self.manifest.version,
Operation::UpdateConfig {
upsert_values: None,
delete_keys: Some(Vec::from_iter(delete_keys.iter().map(ToString::to_string))),
},
None,
None,
);
let manifest = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;
self.manifest = Arc::new(manifest);
Ok(())
}
}
#[async_trait::async_trait]
impl DatasetTakeRows for Dataset {
fn schema(&self) -> &Schema {
Self::schema(self)
}
async fn take_rows(&self, row_ids: &[u64], projection: &Schema) -> Result<RecordBatch> {
Self::take_rows(self, row_ids, projection.clone()).await
}
}
#[derive(Debug)]
pub(crate) struct ManifestWriteConfig {
auto_set_feature_flags: bool, timestamp: Option<SystemTime>, use_move_stable_row_ids: bool, use_legacy_format: Option<bool>, storage_format: Option<DataStorageFormat>, }
impl Default for ManifestWriteConfig {
fn default() -> Self {
Self {
auto_set_feature_flags: true,
timestamp: None,
use_move_stable_row_ids: false,
use_legacy_format: None,
storage_format: None,
}
}
}
pub(crate) async fn write_manifest_file(
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
base_path: &Path,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
config: &ManifestWriteConfig,
naming_scheme: ManifestNamingScheme,
) -> std::result::Result<(), CommitError> {
if config.auto_set_feature_flags {
apply_feature_flags(manifest, config.use_move_stable_row_ids)?;
}
manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
manifest.update_max_fragment_id();
commit_handler
.commit(
manifest,
indices,
base_path,
object_store,
write_manifest_file_to_path,
naming_scheme,
)
.await?;
Ok(())
}
fn write_manifest_file_to_path<'a>(
object_store: &'a ObjectStore,
manifest: &'a mut Manifest,
indices: Option<Vec<Index>>,
path: &'a Path,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async {
let mut object_writer = ObjectWriter::new(object_store, path).await?;
let pos = write_manifest(&mut object_writer, manifest, indices).await?;
object_writer
.write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
.await?;
object_writer.shutdown().await?;
Ok(())
})
}
#[cfg(test)]
mod tests {
use std::vec;
use super::*;
use crate::arrow::FixedSizeListArrayExt;
use crate::dataset::optimize::{compact_files, CompactionOptions};
use crate::dataset::WriteMode::Overwrite;
use crate::index::vector::VectorIndexParams;
use crate::utils::test::TestDatasetGenerator;
use arrow::array::as_struct_array;
use arrow::compute::concat_batches;
use arrow_array::{
builder::StringDictionaryBuilder,
cast::as_string_array,
types::{Float32Type, Int32Type},
ArrayRef, DictionaryArray, Float32Array, Int32Array, Int64Array, Int8Array,
Int8DictionaryArray, RecordBatchIterator, StringArray, UInt16Array, UInt32Array,
};
use arrow_array::{
Array, FixedSizeListArray, GenericStringArray, Int16Array, Int16DictionaryArray,
StructArray,
};
use arrow_ord::sort::sort_to_indices;
use arrow_schema::{
DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
};
use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME};
use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY;
use lance_datagen::{array, gen, BatchCount, Dimension, RowCount};
use lance_file::version::LanceFileVersion;
use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams};
use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
use lance_table::feature_flags;
use lance_table::format::WriterVersion;
use lance_table::io::commit::RenameCommitHandler;
use lance_table::io::deletion::read_deletion_file;
use lance_testing::datagen::generate_random_array;
use pretty_assertions::assert_eq;
use rstest::rstest;
use tempfile::{tempdir, TempDir};
use url::Url;
fn require_send<T: Send>(t: T) -> T {
t
}
async fn create_file(
path: &std::path::Path,
mode: WriteMode,
data_storage_version: LanceFileVersion,
) {
let fields = vec![
ArrowField::new("i", DataType::Int32, false),
ArrowField::new(
"dict",
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
false,
),
];
let schema = Arc::new(ArrowSchema::new(fields));
let dict_values = StringArray::from_iter_values(["a", "b", "c", "d", "e"]);
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
let mut arrays =
vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)) as ArrayRef];
arrays.push(Arc::new(
DictionaryArray::try_new(
UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)),
Arc::new(dict_values.clone()),
)
.unwrap(),
));
RecordBatch::try_new(schema.clone(), arrays).unwrap()
})
.collect();
let expected_batches = batches.clone();
let test_uri = path.to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
mode,
data_storage_version: Some(data_storage_version),
..WriteParams::default()
};
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 1);
assert_eq!(
actual_ds.manifest.writer_version,
Some(WriterVersion::default())
);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
if data_storage_version == LanceFileVersion::Legacy {
for batch in &actual_batches {
assert_eq!(batch.num_rows(), 10);
}
}
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = arrow_select::take::take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray =
concat_batches(&schema, &expected_batches).unwrap().into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..10).collect::<Vec<_>>()
)
}
#[rstest]
#[lance_test_macros::test(tokio::test)]
async fn test_create_dataset(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
for mode in [WriteMode::Create, WriteMode::Append, Overwrite] {
let test_dir = tempdir().unwrap();
create_file(test_dir.path(), mode, data_storage_version).await
}
}
#[rstest]
#[lance_test_macros::test(tokio::test)]
async fn test_create_and_fill_empty_dataset(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let i32_array: ArrayRef = Arc::new(Int32Array::new(vec![].into(), None));
let batch = RecordBatch::try_from_iter(vec![("i", i32_array)]).unwrap();
let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
assert_eq!(schema.as_ref(), reader.schema().as_ref());
let result = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(result.count_rows(None).await.unwrap(), 0);
assert_eq!(result.manifest.max_fragment_id(), None);
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let schema_with_meta = Arc::new(
schema
.as_ref()
.clone()
.with_metadata([("key".to_string(), "value".to_string())].into()),
);
let batches = vec![RecordBatch::try_new(
schema_with_meta,
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
assert_eq!(actual_ds.count_rows(None).await.unwrap(), 10);
assert_eq!(actual_ds.manifest.max_fragment_id(), Some(0));
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = arrow_select::take::take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
}
#[rstest]
#[lance_test_macros::test(tokio::test)]
async fn test_create_with_empty_iter(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let reader = RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone());
assert_eq!(schema.as_ref(), reader.schema().as_ref());
let write_params = Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
});
let result = Dataset::write(reader, test_uri, write_params)
.await
.unwrap();
assert_eq!(result.count_rows(None).await.unwrap(), 0);
assert_eq!(result.manifest.max_fragment_id(), None);
}
#[tokio::test]
async fn test_load_manifest_iops() {
use crate::utils::test::IoTrackingStore;
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10_i32))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let dataset = Dataset::write(batches, "memory://test", None)
.await
.unwrap();
let memory_store = dataset.object_store.inner.clone();
let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper();
let _dataset = DatasetBuilder::from_uri("memory://test")
.with_read_params(ReadParams {
store_options: Some(ObjectStoreParams {
object_store_wrapper: Some(io_stats_wrapper),
..Default::default()
}),
..Default::default()
})
.with_object_store(
memory_store,
Url::parse("memory://test").unwrap(),
Arc::new(RenameCommitHandler),
)
.load()
.await
.unwrap();
let get_iops = || io_stats.lock().unwrap().read_iops;
assert_eq!(get_iops(), 2);
}
#[rstest]
#[tokio::test]
async fn test_write_params(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
use fragment::FragReadConfig;
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let num_rows: usize = 1_000;
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..num_rows as i32))],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let write_params = WriteParams {
max_rows_per_file: 100,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), num_rows);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 10);
assert_eq!(dataset.count_fragments(), 10);
for fragment in &fragments {
assert_eq!(fragment.count_rows().await.unwrap(), 100);
let reader = fragment
.open(dataset.schema(), FragReadConfig::default(), None)
.await
.unwrap();
if data_storage_version == LanceFileVersion::Legacy {
assert_eq!(reader.legacy_num_batches(), 10);
for i in 0..reader.legacy_num_batches() as u32 {
assert_eq!(reader.legacy_num_rows_in_batch(i).unwrap(), 10);
}
}
}
}
#[rstest]
#[tokio::test]
async fn test_write_manifest(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
use lance_table::feature_flags::FLAG_UNKNOWN;
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let write_fut = Dataset::write(
batches,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
);
let write_fut = require_send(write_fut);
let mut dataset = write_fut.await.unwrap();
let manifest = read_manifest(
dataset.object_store(),
&dataset
.commit_handler
.resolve_latest_version(&dataset.base, dataset.object_store())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
manifest.data_storage_format,
DataStorageFormat::new(data_storage_version)
);
assert_eq!(manifest.reader_feature_flags, 0);
dataset.delete("i < 10").await.unwrap();
dataset.validate().await.unwrap();
let mut manifest = read_manifest(
dataset.object_store(),
&dataset
.commit_handler
.resolve_latest_version(&dataset.base, dataset.object_store())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
manifest.writer_feature_flags,
feature_flags::FLAG_DELETION_FILES
);
assert_eq!(
manifest.reader_feature_flags,
feature_flags::FLAG_DELETION_FILES
);
manifest.writer_feature_flags |= FLAG_UNKNOWN; manifest.reader_feature_flags |= FLAG_UNKNOWN;
manifest.version += 1;
write_manifest_file(
dataset.object_store(),
dataset.commit_handler.as_ref(),
&dataset.base,
&mut manifest,
None,
&ManifestWriteConfig {
auto_set_feature_flags: false,
timestamp: None,
use_move_stable_row_ids: false,
use_legacy_format: None,
storage_format: None,
},
dataset.manifest_naming_scheme,
)
.await
.unwrap();
let read_result = Dataset::open(test_uri).await;
assert!(matches!(read_result, Err(Error::NotSupported { .. })));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let write_result = Dataset::write(
batches,
test_uri,
Some(WriteParams {
mode: WriteMode::Append,
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await;
assert!(matches!(write_result, Err(Error::NotSupported { .. })));
}
#[rstest]
#[tokio::test]
async fn append_dataset(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(20..40))],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..40))],
)
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = arrow_select::take::take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..2).collect::<Vec<_>>()
)
}
#[rstest]
#[tokio::test]
async fn test_self_dataset_append(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut ds = Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(20..40))],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
ds.append(batches, Some(write_params.clone()))
.await
.unwrap();
let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..40))],
)
.unwrap();
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
assert_eq!(actual_ds.fragments().len(), 2);
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..2).collect::<Vec<_>>()
);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = arrow_select::take::take(&struct_arr, &sorted_indices, None).unwrap();
let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));
actual_ds.validate().await.unwrap();
}
#[rstest]
#[tokio::test]
async fn test_self_dataset_append_schema_different(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let other_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int64,
false,
)]));
let other_batches = vec![RecordBatch::try_new(
other_schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut ds = Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
write_params.mode = WriteMode::Append;
let other_batches =
RecordBatchIterator::new(other_batches.into_iter().map(Ok), other_schema.clone());
let result = ds.append(other_batches, Some(write_params.clone())).await;
assert!(matches!(result, Err(Error::SchemaMismatch { .. })))
}
#[tokio::test]
async fn append_dictionary() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"x",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
false,
)]));
let dictionary = Arc::new(StringArray::from(vec!["a", "b"]));
let indices = Int8Array::from(vec![0, 1, 0]);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int8DictionaryArray::try_new(indices, dictionary.clone()).unwrap(),
)],
)
.unwrap()];
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let indices = Int8Array::from(vec![1, 0, 1]);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int8DictionaryArray::try_new(indices, dictionary).unwrap(),
)],
)
.unwrap()];
write_params.mode = WriteMode::Append;
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let dictionary = Arc::new(StringArray::from(vec!["d", "c"]));
let indices = Int8Array::from(vec![1, 0, 1]);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int8DictionaryArray::try_new(indices, dictionary).unwrap(),
)],
)
.unwrap()];
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let result = Dataset::write(batches, test_uri, Some(write_params)).await;
assert!(result.is_err());
}
#[rstest]
#[tokio::test]
async fn overwrite_dataset(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(dataset.manifest.max_fragment_id(), Some(0));
let new_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Utf8,
false,
)]));
let new_batches = vec![RecordBatch::try_new(
new_schema.clone(),
vec![Arc::new(StringArray::from_iter_values(
(20..40).map(|v| v.to_string()),
))],
)
.unwrap()];
write_params.mode = WriteMode::Overwrite;
let new_batch_reader =
RecordBatchIterator::new(new_batches.into_iter().map(Ok), new_schema.clone());
let dataset = Dataset::write(new_batch_reader, test_uri, Some(write_params.clone()))
.await
.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].id(), 0);
assert_eq!(dataset.manifest.max_fragment_id(), Some(0));
let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
let actual_schema = ArrowSchema::from(actual_ds.schema());
assert_eq!(&actual_schema, new_schema.as_ref());
let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual_batch = concat_batches(&new_schema, &actual_batches).unwrap();
assert_eq!(new_schema.clone(), actual_batch.schema());
let arr = actual_batch.column_by_name("s").unwrap();
assert_eq!(
&StringArray::from_iter_values((20..40).map(|v| v.to_string())),
as_string_array(arr)
);
assert_eq!(actual_ds.version().version, 2);
let first_ver = DatasetBuilder::from_uri(test_uri)
.with_version(1)
.load()
.await
.unwrap();
assert_eq!(first_ver.version().version, 1);
assert_eq!(&ArrowSchema::from(first_ver.schema()), schema.as_ref());
}
#[rstest]
#[tokio::test]
async fn test_fast_count_rows(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20))],
)
.unwrap()
})
.collect();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
dataset.validate().await.unwrap();
assert_eq!(10, dataset.fragments().len());
assert_eq!(400, dataset.count_rows(None).await.unwrap());
assert_eq!(
200,
dataset
.count_rows(Some("i < 200".to_string()))
.await
.unwrap()
);
}
#[rstest]
#[tokio::test]
async fn test_create_index(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let dimension = 16;
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"embeddings",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
dimension,
),
false,
)]));
let float_arr = generate_random_array(512 * dimension as usize);
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
float_arr, dimension,
)
.unwrap(),
);
let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()];
let test_uri = test_dir.path().to_str().unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
dataset.validate().await.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 50);
dataset
.create_index(&["embeddings"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
dataset.validate().await.unwrap();
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
let expected = dataset.manifest.version - 1;
assert_eq!(actual, expected);
let fragment_bitmap = indices.first().unwrap().fragment_bitmap.as_ref().unwrap();
assert_eq!(fragment_bitmap.len(), 1);
assert!(fragment_bitmap.contains(0));
let write_params = WriteParams {
mode: WriteMode::Append,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
let indices = dataset.load_indices().await.unwrap();
let actual = indices.first().unwrap().dataset_version;
let expected = dataset.manifest.version - 2;
assert_eq!(actual, expected);
dataset.validate().await.unwrap();
let fragment_bitmap = indices.first().unwrap().fragment_bitmap.as_ref().unwrap();
assert_eq!(fragment_bitmap.len(), 1);
assert!(fragment_bitmap.contains(0));
let actual_statistics: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("embeddings_idx").await.unwrap())
.unwrap();
let actual_statistics = actual_statistics.as_object().unwrap();
assert_eq!(actual_statistics["index_type"].as_str().unwrap(), "IVF_PQ");
let deltas = actual_statistics["indices"].as_array().unwrap();
assert_eq!(deltas.len(), 1);
assert_eq!(deltas[0]["metric_type"].as_str().unwrap(), "l2");
assert_eq!(deltas[0]["num_partitions"].as_i64().unwrap(), 10);
assert!(dataset.index_statistics("non-existent_idx").await.is_err());
assert!(dataset.index_statistics("").await.is_err());
let write_params = WriteParams {
mode: WriteMode::Overwrite,
data_storage_version: Some(data_storage_version),
..Default::default()
};
let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap()];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();
assert!(dataset.manifest.index_section.is_none());
assert!(dataset.load_indices().await.unwrap().is_empty());
dataset.validate().await.unwrap();
let fragment_bitmap = indices.first().unwrap().fragment_bitmap.as_ref().unwrap();
assert_eq!(fragment_bitmap.len(), 1);
assert!(fragment_bitmap.contains(0));
}
#[rstest]
#[tokio::test]
async fn test_create_scalar_index(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] use_stable_row_id: bool,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = gen().col("int", array::step::<Int32Type>());
let mut dataset = Dataset::write(
data.into_reader_rows(RowCount::from(16 * 1024), BatchCount::from(4)),
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
enable_move_stable_row_ids: use_stable_row_id,
..Default::default()
}),
)
.await
.unwrap();
let index_name = "my_index".to_string();
dataset
.create_index(
&["int"],
IndexType::Scalar,
Some(index_name.clone()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
let indices = dataset.load_indices_by_name(&index_name).await.unwrap();
assert_eq!(indices.len(), 1);
assert_eq!(indices[0].dataset_version, 1);
assert_eq!(indices[0].fields, vec![0]);
assert_eq!(indices[0].name, index_name);
dataset.index_statistics(&index_name).await.unwrap();
}
async fn create_bad_file(data_storage_version: LanceFileVersion) -> Result<Dataset> {
let test_dir = tempdir().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a.b.c",
DataType::Int32,
false,
)]));
let batches: Vec<RecordBatch> = (0..20)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20))],
)
.unwrap()
})
.collect();
let test_uri = test_dir.path().to_str().unwrap();
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
}
#[tokio::test]
async fn test_create_fts_index_with_empty_table() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"text",
DataType::Utf8,
false,
)]));
let batches: Vec<RecordBatch> = vec![];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(reader, test_uri, None)
.await
.expect("write dataset");
let params = InvertedIndexParams::default();
dataset
.create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
let batch = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("lance".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(batch.num_rows(), 0);
}
#[tokio::test]
async fn test_create_fts_index_with_empty_strings() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"text",
DataType::Utf8,
false,
)]));
let batches: Vec<RecordBatch> = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["", "", ""]))],
)
.unwrap()];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let mut dataset = Dataset::write(reader, test_uri, None)
.await
.expect("write dataset");
let params = InvertedIndexParams::default();
dataset
.create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
let batch = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("lance".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(batch.num_rows(), 0);
}
#[rstest]
#[tokio::test]
async fn test_bad_field_name(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
assert!(create_bad_file(data_storage_version).await.is_err());
}
#[tokio::test]
async fn test_open_dataset_not_found() {
let result = Dataset::open(".").await;
assert!(matches!(result.unwrap_err(), Error::DatasetNotFound { .. }));
}
fn assert_all_manifests_use_scheme(test_dir: &TempDir, scheme: ManifestNamingScheme) {
let entries_names = test_dir
.path()
.join("_versions")
.read_dir()
.unwrap()
.map(|entry| entry.unwrap().file_name().into_string().unwrap())
.collect::<Vec<_>>();
assert!(
entries_names
.iter()
.all(|name| ManifestNamingScheme::detect_scheme(name) == Some(scheme)),
"Entries: {:?}",
entries_names
);
}
#[tokio::test]
async fn test_v2_manifest_path_create() {
let data = lance_datagen::gen()
.col("key", array::step::<Int32Type>())
.into_batch_rows(RowCount::from(10))
.unwrap();
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
Dataset::write(
RecordBatchIterator::new([Ok(data.clone())], data.schema().clone()),
test_uri,
Some(WriteParams {
enable_v2_manifest_paths: true,
..Default::default()
}),
)
.await
.unwrap();
assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2);
let dataset = Dataset::write(
RecordBatchIterator::new([Ok(data.clone())], data.schema().clone()),
test_uri,
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap();
assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2);
UpdateBuilder::new(Arc::new(dataset))
.update_where("key = 5")
.unwrap()
.set("key", "200")
.unwrap()
.build()
.unwrap()
.execute()
.await
.unwrap();
assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2);
}
#[tokio::test]
async fn test_v2_manifest_path_commit() {
let schema = Schema::try_from(&ArrowSchema::new(vec![ArrowField::new(
"x",
DataType::Int32,
false,
)]))
.unwrap();
let operation = Operation::Overwrite {
fragments: vec![],
schema,
config_upsert_values: None,
};
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let dataset = Dataset::commit(
test_uri,
operation,
None,
None,
None,
Arc::new(ObjectStoreRegistry::default()),
true, )
.await
.unwrap();
assert!(dataset.manifest_naming_scheme == ManifestNamingScheme::V2);
assert_all_manifests_use_scheme(&test_dir, ManifestNamingScheme::V2);
}
#[rstest]
#[tokio::test]
async fn test_merge(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] use_stable_row_id: bool,
) {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int32, false),
ArrowField::new("x", DataType::Float32, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Float32Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 2])),
Arc::new(Float32Array::from(vec![3.0, 4.0])),
],
)
.unwrap();
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
mode: WriteMode::Append,
data_storage_version: Some(data_storage_version),
enable_move_stable_row_ids: use_stable_row_id,
..Default::default()
};
let batches = RecordBatchIterator::new(vec![batch1].into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let batches = RecordBatchIterator::new(vec![batch2].into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, Some(write_params.clone()))
.await
.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.fragments().len(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(1));
let right_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i2", DataType::Int32, false),
ArrowField::new("y", DataType::Utf8, true),
]));
let right_batch1 = RecordBatch::try_new(
right_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batches =
RecordBatchIterator::new(vec![right_batch1].into_iter().map(Ok), right_schema.clone());
let mut dataset = Dataset::open(test_uri).await.unwrap();
dataset.merge(batches, "i", "i2").await.unwrap();
dataset.validate().await.unwrap();
assert_eq!(dataset.version().version, 3);
assert_eq!(dataset.fragments().len(), 2);
assert_eq!(dataset.fragments()[0].files.len(), 2);
assert_eq!(dataset.fragments()[1].files.len(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(1));
let actual_batches = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
let expected = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::Int32, false),
ArrowField::new("x", DataType::Float32, false),
ArrowField::new("y", DataType::Utf8, true),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 2])),
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(StringArray::from(vec![
Some("a"),
Some("b"),
None,
Some("b"),
])),
],
)
.unwrap();
assert_eq!(actual, expected);
let dataset = Dataset::open(test_uri).await.unwrap();
let actual_batches = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
assert_eq!(actual, expected);
}
#[rstest]
#[tokio::test]
async fn test_large_merge(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] use_stable_row_id: bool,
) {
let data = lance_datagen::gen()
.col("key", array::step::<Int32Type>())
.col("value", array::fill_utf8("value".to_string()))
.into_reader_rows(RowCount::from(1_000), BatchCount::from(10));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let write_params = WriteParams {
mode: WriteMode::Append,
data_storage_version: Some(data_storage_version),
max_rows_per_file: 1024,
max_rows_per_group: 150,
enable_move_stable_row_ids: use_stable_row_id,
..Default::default()
};
Dataset::write(data, test_uri, Some(write_params.clone()))
.await
.unwrap();
let mut dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.fragments().len(), 10);
assert_eq!(dataset.manifest.max_fragment_id(), Some(9));
let new_data = lance_datagen::gen()
.col("key2", array::step_custom::<Int32Type>(500, 1))
.col("new_value", array::fill_utf8("new_value".to_string()))
.into_reader_rows(RowCount::from(1_000), BatchCount::from(10));
dataset.merge(new_data, "key", "key2").await.unwrap();
dataset.validate().await.unwrap();
}
#[rstest]
#[tokio::test]
async fn test_delete(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] with_scalar_index: bool,
) {
use std::collections::HashSet;
fn sequence_data(range: Range<u32>) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::UInt32, false),
ArrowField::new("x", DataType::UInt32, false),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(UInt32Array::from_iter_values(range.clone())),
Arc::new(UInt32Array::from_iter_values(range.map(|v| v * 2))),
],
)
.unwrap()
}
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", DataType::UInt32, false),
ArrowField::new("x", DataType::UInt32, false),
]));
let data = sequence_data(0..100);
let batches = vec![data.slice(0, 50), data.slice(50, 50)];
let mut dataset = TestDatasetGenerator::new(batches, data_storage_version)
.make_hostile(test_uri)
.await;
if with_scalar_index {
dataset
.create_index(
&["i"],
IndexType::Scalar,
Some("scalar_index".to_string()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();
}
dataset.delete("i < 0").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert_eq!(dataset.count_fragments(), 2);
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 0);
assert_eq!(dataset.manifest.max_fragment_id(), Some(1));
assert!(fragments[0].metadata.deletion_file.is_none());
assert!(fragments[1].metadata.deletion_file.is_none());
dataset.delete("i < 10 OR i >= 90").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert_eq!(dataset.count_fragments(), 2);
assert!(fragments[0].metadata.deletion_file.is_some());
assert!(fragments[1].metadata.deletion_file.is_some());
assert_eq!(
fragments[0]
.metadata
.deletion_file
.as_ref()
.unwrap()
.num_deleted_rows,
Some(10)
);
assert_eq!(
fragments[1]
.metadata
.deletion_file
.as_ref()
.unwrap()
.num_deleted_rows,
Some(10)
);
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 20);
let store = dataset.object_store().clone();
let path = Path::from_filesystem_path(test_uri).unwrap();
let deletion_vector = read_deletion_file(&path, &fragments[0].metadata, &store)
.await
.unwrap()
.unwrap();
assert_eq!(deletion_vector.len(), 10);
assert_eq!(
deletion_vector.into_iter().collect::<HashSet<_>>(),
(0..10).collect::<HashSet<_>>()
);
let deletion_vector = read_deletion_file(&path, &fragments[1].metadata, &store)
.await
.unwrap()
.unwrap();
assert_eq!(deletion_vector.len(), 10);
assert_eq!(
deletion_vector.into_iter().collect::<HashSet<_>>(),
(40..50).collect::<HashSet<_>>()
);
let second_deletion_file = fragments[1].metadata.deletion_file.clone().unwrap();
dataset.delete("i < 20").await.unwrap();
dataset.validate().await.unwrap();
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 30);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert!(fragments[0].metadata.deletion_file.is_some());
let deletion_vector = read_deletion_file(&path, &fragments[0].metadata, &store)
.await
.unwrap()
.unwrap();
assert_eq!(deletion_vector.len(), 20);
assert_eq!(
deletion_vector.into_iter().collect::<HashSet<_>>(),
(0..20).collect::<HashSet<_>>()
);
assert_eq!(
fragments[1].metadata.deletion_file.as_ref().unwrap(),
&second_deletion_file
);
dataset.delete("i >= 50").await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(dataset.count_fragments(), 1);
assert_eq!(fragments[0].id(), 0);
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 20);
let data = sequence_data(0..100);
let batches = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema.clone());
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert_eq!(dataset.count_fragments(), 2);
assert_eq!(fragments[0].id(), 0);
assert_eq!(fragments[1].id(), 2);
assert_eq!(dataset.manifest.max_fragment_id(), Some(2));
}
#[rstest]
#[tokio::test]
async fn test_restore(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::UInt32,
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..100))],
);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(dataset.manifest.version, 1);
let original_manifest = dataset.manifest.clone();
dataset.delete("i > 50").await.unwrap();
assert_eq!(dataset.manifest.version, 2);
let mut dataset = dataset.checkout_version(1).await.unwrap();
assert_eq!(dataset.manifest.version, 1);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(dataset.count_fragments(), 1);
assert_eq!(fragments[0].metadata.deletion_file, None);
assert_eq!(dataset.manifest, original_manifest);
dataset.checkout_latest().await.unwrap();
assert_eq!(dataset.manifest.version, 2);
let mut dataset = dataset.checkout_version(1).await.unwrap();
dataset.restore().await.unwrap();
assert_eq!(dataset.manifest.version, 3);
assert_eq!(dataset.manifest.fragments, original_manifest.fragments);
assert_eq!(dataset.manifest.schema, original_manifest.schema);
dataset.delete("i > 30").await.unwrap();
assert_eq!(dataset.manifest.version, 4);
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(dataset.count_fragments(), 1);
assert!(fragments[0].metadata.deletion_file.is_some());
}
#[rstest]
#[tokio::test]
async fn test_update_config() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::UInt32,
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..100))],
);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let mut desired_config = HashMap::new();
desired_config.insert("lance:test".to_string(), "value".to_string());
desired_config.insert("other-key".to_string(), "other-value".to_string());
dataset.update_config(desired_config.clone()).await.unwrap();
assert_eq!(dataset.manifest.config, desired_config);
desired_config.remove("other-key");
dataset.delete_config_keys(&["other-key"]).await.unwrap();
assert_eq!(dataset.manifest.config, desired_config);
}
#[rstest]
#[tokio::test]
async fn test_tag(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::UInt32,
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..100))],
);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(dataset.manifest.version, 1);
dataset.delete("i > 50").await.unwrap();
assert_eq!(dataset.manifest.version, 2);
assert_eq!(dataset.tags.list().await.unwrap().len(), 0);
let bad_tag_creation = dataset.tags.create("tag1", 3).await;
assert_eq!(
bad_tag_creation.err().unwrap().to_string(),
"Version not found error: version 3 does not exist"
);
let bad_tag_deletion = dataset.tags.delete("tag1").await;
assert_eq!(
bad_tag_deletion.err().unwrap().to_string(),
"Ref not found error: tag tag1 does not exist"
);
dataset.tags.create("tag1", 1).await.unwrap();
assert_eq!(dataset.tags.list().await.unwrap().len(), 1);
let another_bad_tag_creation = dataset.tags.create("tag1", 1).await;
assert_eq!(
another_bad_tag_creation.err().unwrap().to_string(),
"Ref conflict error: tag tag1 already exists"
);
dataset.tags.delete("tag1").await.unwrap();
assert_eq!(dataset.tags.list().await.unwrap().len(), 0);
dataset.tags.create("tag1", 1).await.unwrap();
dataset.tags.create("tag2", 1).await.unwrap();
dataset.tags.create("v1.0.0-rc1", 1).await.unwrap();
assert_eq!(dataset.tags.list().await.unwrap().len(), 3);
let bad_checkout = dataset.checkout_version("tag3").await;
assert_eq!(
bad_checkout.err().unwrap().to_string(),
"Ref not found error: tag tag3 does not exist"
);
dataset = dataset.checkout_version("tag1").await.unwrap();
assert_eq!(dataset.manifest.version, 1);
let first_ver = DatasetBuilder::from_uri(test_uri)
.with_tag("tag1")
.load()
.await
.unwrap();
assert_eq!(first_ver.version().version, 1);
let bad_tag_update = dataset.tags.update("tag3", 1).await;
assert_eq!(
bad_tag_update.err().unwrap().to_string(),
"Ref not found error: tag tag3 does not exist"
);
let another_bad_tag_update = dataset.tags.update("tag1", 3).await;
assert_eq!(
another_bad_tag_update.err().unwrap().to_string(),
"Version not found error: version 3 does not exist"
);
dataset.tags.update("tag1", 2).await.unwrap();
dataset = dataset.checkout_version("tag1").await.unwrap();
assert_eq!(dataset.manifest.version, 2);
dataset.tags.update("tag1", 1).await.unwrap();
dataset = dataset.checkout_version("tag1").await.unwrap();
assert_eq!(dataset.manifest.version, 1);
}
#[rstest]
#[tokio::test]
async fn test_search_empty(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"vec",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
128,
),
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
Float32Array::from_iter_values(vec![]),
128,
)
.unwrap(),
);
let data = RecordBatch::try_new(schema.clone(), vec![vectors]);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
let mut stream = dataset
.scan()
.nearest(
"vec",
&Float32Array::from_iter_values((0..128).map(|_| 0.1)),
1,
)
.unwrap()
.try_into_stream()
.await
.unwrap();
while let Some(batch) = stream.next().await {
let schema = batch.unwrap().schema();
assert_eq!(schema.fields.len(), 2);
assert_eq!(
schema.field_with_name("vec").unwrap(),
&ArrowField::new(
"vec",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
128
),
false,
)
);
assert_eq!(
schema.field_with_name(DIST_COL).unwrap(),
&ArrowField::new(DIST_COL, DataType::Float32, true)
);
}
}
#[rstest]
#[tokio::test]
async fn test_search_empty_after_delete(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
#[values(false, true)] use_stable_row_id: bool,
) {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let data = gen().col("vec", array::rand_vec::<Float32Type>(Dimension::from(32)));
let reader = data.into_reader_rows(RowCount::from(1000), BatchCount::from(10));
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
enable_move_stable_row_ids: use_stable_row_id,
..Default::default()
}),
)
.await
.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 50);
dataset
.create_index(&["vec"], IndexType::Vector, None, ¶ms, true)
.await
.unwrap();
dataset.delete("true").await.unwrap();
let indices = dataset.load_indices().await.unwrap();
assert_eq!(indices.len(), 0);
let mut stream = dataset
.scan()
.nearest(
"vec",
&Float32Array::from_iter_values((0..128).map(|_| 0.1)),
1,
)
.unwrap()
.try_into_stream()
.await
.unwrap();
while let Some(batch) = stream.next().await {
let schema = batch.unwrap().schema();
assert_eq!(schema.fields.len(), 2);
assert_eq!(
schema.field_with_name("vec").unwrap(),
&ArrowField::new(
"vec",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
128
),
false,
)
);
assert_eq!(
schema.field_with_name(DIST_COL).unwrap(),
&ArrowField::new(DIST_COL, DataType::Float32, true)
);
}
dataset.delete(" True").await.unwrap();
let mut stream = dataset
.scan()
.nearest(
"vec",
&Float32Array::from_iter_values((0..128).map(|_| 0.1)),
1,
)
.unwrap()
.try_into_stream()
.await
.unwrap();
while let Some(batch) = stream.next().await {
let schema = batch.unwrap().schema();
assert_eq!(schema.fields.len(), 2);
assert_eq!(
schema.field_with_name("vec").unwrap(),
&ArrowField::new(
"vec",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
128
),
false,
)
);
assert_eq!(
schema.field_with_name(DIST_COL).unwrap(),
&ArrowField::new(DIST_COL, DataType::Float32, true)
);
}
}
#[rstest]
#[tokio::test]
async fn test_num_small_files(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = tempdir().unwrap();
let dimensions = 16;
let column_name = "vec";
let field = ArrowField::new(
column_name,
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
dimensions,
),
false,
);
let schema = Arc::new(ArrowSchema::new(vec![field]));
let float_arr = generate_random_array(512 * dimensions as usize);
let vectors =
arrow_array::FixedSizeListArray::try_new_from_values(float_arr, dimensions).unwrap();
let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
let reader =
RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone());
let test_uri = test_dir.path().to_str().unwrap();
let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
dataset.validate().await.unwrap();
assert!(dataset.num_small_files(1024).await > 0);
assert!(dataset.num_small_files(512).await == 0);
}
#[tokio::test]
async fn test_read_struct_of_dictionary_arrays() {
let test_dir = tempdir().unwrap();
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(ArrowFields::from(vec![ArrowField::new(
"d",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)])),
true,
)]));
let mut batches: Vec<RecordBatch> = Vec::new();
for _ in 1..2 {
let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
dict_builder.append("a").unwrap();
dict_builder.append("b").unwrap();
dict_builder.append("c").unwrap();
dict_builder.append("d").unwrap();
let struct_array = Arc::new(StructArray::from(vec![(
Arc::new(ArrowField::new(
"d",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)),
Arc::new(dict_builder.finish()) as ArrayRef,
)]));
let batch =
RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();
batches.push(batch);
}
let test_uri = test_dir.path().to_str().unwrap();
let batch_reader =
RecordBatchIterator::new(batches.clone().into_iter().map(Ok), arrow_schema.clone());
Dataset::write(batch_reader, test_uri, Some(WriteParams::default()))
.await
.unwrap();
let result = scan_dataset(test_uri).await.unwrap();
assert_eq!(batches, result);
}
async fn scan_dataset(uri: &str) -> Result<Vec<RecordBatch>> {
let results = Dataset::open(uri)
.await?
.scan()
.try_into_stream()
.await?
.try_collect::<Vec<_>>()
.await?;
Ok(results)
}
fn copy_dir_all(
src: impl AsRef<std::path::Path>,
dst: impl AsRef<std::path::Path>,
) -> std::io::Result<()> {
use std::fs;
fs::create_dir_all(&dst)?;
for entry in fs::read_dir(src)? {
let entry = entry?;
let ty = entry.file_type()?;
if ty.is_dir() {
copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
} else {
fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
}
}
Ok(())
}
fn copy_test_data_to_tmp(table_path: &str) -> std::io::Result<TempDir> {
use std::path::PathBuf;
let mut src = PathBuf::new();
src.push(env!("CARGO_MANIFEST_DIR"));
src.push("../../test_data");
src.push(table_path);
let test_dir = tempdir().unwrap();
copy_dir_all(src.as_path(), test_dir.path())?;
Ok(test_dir)
}
#[rstest]
#[tokio::test]
async fn test_v0_7_5_migration() {
let test_dir = copy_test_data_to_tmp("v0.7.5/with_deletions").unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 90);
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 10);
let total_physical_rows = futures::stream::iter(dataset.get_fragments())
.then(|f| async move { f.physical_rows().await })
.try_fold(0, |acc, x| async move { Ok(acc + x) })
.await
.unwrap();
assert_eq!(total_physical_rows, 100);
let schema = Arc::new(ArrowSchema::from(dataset.schema()));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(100..105))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 95);
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 10);
let total_physical_rows = futures::stream::iter(dataset.get_fragments())
.then(|f| async move { f.physical_rows().await })
.try_fold(0, |acc, x| async move { Ok(acc + x) })
.await
.unwrap();
assert_eq!(total_physical_rows, 105);
dataset.validate().await.unwrap();
let expected = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(
(0..10).chain(20..105),
))],
)
.unwrap();
let actual_batches = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
assert_eq!(actual, expected);
}
#[rstest]
#[tokio::test]
async fn test_fix_v0_8_0_broken_migration() {
let test_dir = copy_test_data_to_tmp("v0.8.0/migrated_from_v0.7.5").unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(dataset.count_rows(None).await.unwrap(), 92);
assert_eq!(dataset.count_deleted_rows().await.unwrap(), 10);
let total_physical_rows = futures::stream::iter(dataset.get_fragments())
.then(|f| async move { f.physical_rows().await })
.try_fold(0, |acc, x| async move { Ok(acc + x) })
.await
.unwrap();
assert_eq!(total_physical_rows, 102);
let schema = Arc::new(ArrowSchema::from(dataset.schema()));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(100..105))],
)
.unwrap();
let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let write_params = WriteParams {
mode: WriteMode::Append,
data_storage_version: Some(LanceFileVersion::Legacy),
..Default::default()
};
let dataset = Dataset::write(batches, test_uri, Some(write_params))
.await
.unwrap();
let physical_rows: Vec<_> = dataset
.get_fragments()
.iter()
.map(|f| f.metadata.physical_rows)
.collect();
assert_eq!(physical_rows, vec![Some(100), Some(2), Some(5)]);
let num_deletions: Vec<_> = dataset
.get_fragments()
.iter()
.map(|f| {
f.metadata
.deletion_file
.as_ref()
.and_then(|df| df.num_deleted_rows)
})
.collect();
assert_eq!(num_deletions, vec![Some(10), None, None]);
assert_eq!(dataset.count_rows(None).await.unwrap(), 97);
let expected = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(
(0..10).chain(20..100).chain(0..2).chain(100..105),
))],
)
.unwrap();
let actual_batches = dataset
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let actual = concat_batches(&actual_batches[0].schema(), &actual_batches).unwrap();
assert_eq!(actual, expected);
}
#[rstest]
#[tokio::test]
async fn test_v0_8_14_invalid_index_fragment_bitmap(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) {
let test_dir = copy_test_data_to_tmp("v0.8.14/corrupt_index").unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let mut dataset = Dataset::open(test_uri).await.unwrap();
let mut scan = dataset.scan();
let data = scan
.limit(Some(10), None)
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let schema = data[0].schema();
let data = RecordBatchIterator::new(data.into_iter().map(arrow::error::Result::Ok), schema);
let broken_version = dataset.version().version;
dataset
.append(
data,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await
.unwrap();
for idx in dataset.load_indices().await.unwrap().iter() {
assert!(idx.fragment_bitmap.as_ref().unwrap().contains(0));
}
let mut dataset = dataset.checkout_version(broken_version).await.unwrap();
dataset.restore().await.unwrap();
compact_files(&mut dataset, CompactionOptions::default(), None)
.await
.unwrap();
for idx in dataset.load_indices().await.unwrap().iter() {
assert!(idx.fragment_bitmap.as_ref().unwrap().contains(0));
}
let mut scan = dataset.scan();
let query_vec = Float32Array::from(vec![0_f32; 128]);
let batches = scan
.nearest("vector", &query_vec, 2000)
.unwrap()
.nprobs(4)
.prefilter(true)
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
assert_eq!(row_count, 1900);
}
#[tokio::test]
async fn test_fix_v0_10_5_corrupt_schema() {
let test_dir = copy_test_data_to_tmp("v0.10.5/corrupt_schema").unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let mut dataset = Dataset::open(test_uri).await.unwrap();
let validate_res = dataset.validate().await;
assert!(validate_res.is_err());
dataset.delete("false").await.unwrap();
dataset.validate().await.unwrap();
let data = dataset.scan().try_into_batch().await.unwrap();
assert_eq!(
data["b"]
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values(),
&[0, 4, 8, 12]
);
assert_eq!(
data["c"]
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values(),
&[0, 5, 10, 15]
);
}
#[rstest]
#[tokio::test]
async fn test_bfloat16_roundtrip(
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
data_storage_version: LanceFileVersion,
) -> Result<()> {
let inner_field = Arc::new(
ArrowField::new("item", DataType::FixedSizeBinary(2), true).with_metadata(
[
(ARROW_EXT_NAME_KEY.into(), BFLOAT16_EXT_NAME.into()),
(ARROW_EXT_META_KEY.into(), "".into()),
]
.into(),
),
);
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"fsl",
DataType::FixedSizeList(inner_field.clone(), 2),
false,
)]));
let values = bfloat16::BFloat16Array::from_iter_values(
(0..6).map(|i| i as f32).map(half::bf16::from_f32),
);
let vectors = FixedSizeListArray::new(inner_field, 2, Arc::new(values.into_inner()), None);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vectors)]).unwrap();
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()),
test_uri,
Some(WriteParams {
data_storage_version: Some(data_storage_version),
..Default::default()
}),
)
.await?;
let data = dataset.scan().try_into_batch().await?;
assert_eq!(batch, data);
Ok(())
}
#[tokio::test]
async fn test_overwrite_mixed_version() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)]));
let arr = Arc::new(Int32Array::from(vec![1, 2, 3]));
let data = RecordBatch::try_new(schema.clone(), vec![arr]).unwrap();
let reader =
RecordBatchIterator::new(vec![data.clone()].into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
data_storage_version: Some(LanceFileVersion::Legacy),
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(
dataset
.manifest
.data_storage_format
.lance_file_version()
.unwrap(),
LanceFileVersion::Legacy
);
let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema);
let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(
dataset
.manifest
.data_storage_format
.lance_file_version()
.unwrap(),
LanceFileVersion::Legacy
);
}
#[tokio::test]
async fn test_open_nonexisting_dataset() {
let test_dir = tempdir().unwrap();
let base_dir = test_dir.path();
let dataset_dir = base_dir.join("non_existing");
let dataset_uri = dataset_dir.to_str().unwrap();
let res = Dataset::open(dataset_uri).await;
assert!(res.is_err());
assert!(!dataset_dir.exists());
}
#[tokio::test]
async fn test_manifest_partially_fits() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"x",
DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
false,
)]));
let dictionary = Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
));
let indices = Int16Array::from_iter_values(0..1000);
let batches = vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(
Int16DictionaryArray::try_new(indices, dictionary.clone()).unwrap(),
)],
)
.unwrap()];
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
Dataset::write(batches, test_uri, None).await.unwrap();
let dataset = Dataset::open(test_uri).await.unwrap();
assert_eq!(1000, dataset.count_rows(None).await.unwrap());
}
#[tokio::test]
async fn test_dataset_uri_roundtrips() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)]));
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let vectors = Arc::new(Int32Array::from_iter_values(vec![]));
let data = RecordBatch::try_new(schema.clone(), vec![vectors]);
let reader = RecordBatchIterator::new(vec![data.unwrap()].into_iter().map(Ok), schema);
let dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
..Default::default()
}),
)
.await
.unwrap();
let uri = dataset.uri();
assert_eq!(uri, test_uri);
let ds2 = Dataset::open(uri).await.unwrap();
assert_eq!(
ds2.latest_version_id().await.unwrap(),
dataset.latest_version_id().await.unwrap()
);
}
#[tokio::test]
async fn test_fts_on_multiple_columns() {
let tempdir = tempfile::tempdir().unwrap();
let params = InvertedIndexParams::default();
let title_col =
GenericStringArray::<i32>::from(vec!["title hello", "title lance", "title common"]);
let content_col = GenericStringArray::<i32>::from(vec![
"content world",
"content database",
"content common",
]);
let batch = RecordBatch::try_new(
arrow_schema::Schema::new(vec![
arrow_schema::Field::new("title", title_col.data_type().to_owned(), false),
arrow_schema::Field::new("content", title_col.data_type().to_owned(), false),
])
.into(),
vec![
Arc::new(title_col) as ArrayRef,
Arc::new(content_col) as ArrayRef,
],
)
.unwrap();
let schema = batch.schema();
let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(batches, tempdir.path().to_str().unwrap(), None)
.await
.unwrap();
dataset
.create_index(&["title"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
dataset
.create_index(&["content"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
let results = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("title".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(results.num_rows(), 3);
let results = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("content".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(results.num_rows(), 3);
let results = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("common".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(results.num_rows(), 2);
}
#[tokio::test]
async fn test_fts_unindexed_data() {
let tempdir = tempfile::tempdir().unwrap();
let params = InvertedIndexParams::default();
let title_col =
GenericStringArray::<i32>::from(vec!["title hello", "title lance", "title common"]);
let content_col = GenericStringArray::<i32>::from(vec![
"content world",
"content database",
"content common",
]);
let batch = RecordBatch::try_new(
arrow_schema::Schema::new(vec![
arrow_schema::Field::new("title", title_col.data_type().to_owned(), false),
arrow_schema::Field::new("content", title_col.data_type().to_owned(), false),
])
.into(),
vec![
Arc::new(title_col) as ArrayRef,
Arc::new(content_col) as ArrayRef,
],
)
.unwrap();
let schema = batch.schema();
let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
let mut dataset = Dataset::write(batches, tempdir.path().to_str().unwrap(), None)
.await
.unwrap();
dataset
.create_index(&["title"], IndexType::Inverted, None, ¶ms, true)
.await
.unwrap();
let results = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("title".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(results.num_rows(), 3);
let title_col = GenericStringArray::<i32>::from(vec!["new title"]);
let content_col = GenericStringArray::<i32>::from(vec!["new content"]);
let batch = RecordBatch::try_new(
arrow_schema::Schema::new(vec![
arrow_schema::Field::new("title", title_col.data_type().to_owned(), false),
arrow_schema::Field::new("content", title_col.data_type().to_owned(), false),
])
.into(),
vec![
Arc::new(title_col) as ArrayRef,
Arc::new(content_col) as ArrayRef,
],
)
.unwrap();
let schema = batch.schema();
let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
let dataset = Dataset::write(
batches,
tempdir.path().to_str().unwrap(),
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await
.unwrap();
let results = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("title".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(results.num_rows(), 4);
let results = dataset
.scan()
.full_text_search(FullTextSearchQuery::new("new".to_owned()))
.unwrap()
.try_into_batch()
.await
.unwrap();
assert_eq!(results.num_rows(), 1);
}
#[tokio::test]
async fn concurrent_create() {
async fn write(uri: &str) -> Result<()> {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
Dataset::write(empty_reader, uri, None).await?;
Ok(())
}
for _ in 0..5 {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let (res1, res2) = tokio::join!(write(test_uri), write(test_uri));
assert!(res1.is_ok() || res2.is_ok());
if res1.is_err() {
assert!(
matches!(res1, Err(Error::DatasetAlreadyExists { .. })),
"{:?}",
res1
);
} else {
assert!(
matches!(res2, Err(Error::DatasetAlreadyExists { .. })),
"{:?}",
res2
);
}
}
}
#[tokio::test]
async fn test_insert_subschema() {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", DataType::Int32, false),
ArrowField::new("b", DataType::Int32, true),
]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
let mut dataset = Dataset::write(empty_reader, "memory://", None)
.await
.unwrap();
dataset.validate().await.unwrap();
let just_b = Arc::new(schema.project(&[1]).unwrap());
let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int32Array::from(vec![1]))])
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone());
let res = dataset.append(reader, None).await;
assert!(
matches!(res, Err(Error::SchemaMismatch { .. })),
"Expected Error::SchemaMismatch, got {:?}",
res
);
let just_a = Arc::new(schema.project(&[0]).unwrap());
let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))])
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone());
dataset.append(reader, None).await.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].metadata.files.len(), 1);
assert_eq!(&fragments[0].metadata.files[0].fields, &[0]);
let data = dataset.scan().try_into_batch().await.unwrap();
let expected = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(Int32Array::from(vec![None])),
],
)
.unwrap();
assert_eq!(data, expected);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![2])),
Arc::new(Int32Array::from(vec![3])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
dataset.append(reader, None).await.unwrap();
dataset.validate().await.unwrap();
let data = dataset.scan().try_into_batch().await.unwrap();
let expected = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(Int32Array::from(vec![None, Some(3)])),
],
)
.unwrap();
assert_eq!(data, expected);
compact_files(&mut dataset, CompactionOptions::default(), None)
.await
.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].metadata.files.len(), 1);
assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 1]);
let data = dataset.scan().try_into_batch().await.unwrap();
assert_eq!(data, expected);
}
#[tokio::test]
async fn test_insert_nested_subschemas() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let field_a = Arc::new(ArrowField::new("a", DataType::Int32, true));
let field_b = Arc::new(ArrowField::new("b", DataType::Int32, false));
let field_c = Arc::new(ArrowField::new("c", DataType::Int32, true));
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(vec![field_a.clone(), field_b.clone(), field_c.clone()].into()),
true,
)]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
let dataset = Dataset::write(empty_reader, test_uri, None).await.unwrap();
dataset.validate().await.unwrap();
let append_options = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let just_b_a = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(vec![field_b.clone(), field_a.clone()].into()),
true,
)]));
let batch = RecordBatch::try_new(
just_b_a.clone(),
vec![Arc::new(StructArray::from(vec![
(
field_b.clone(),
Arc::new(Int32Array::from(vec![1])) as ArrayRef,
),
(field_a.clone(), Arc::new(Int32Array::from(vec![2]))),
]))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b_a.clone());
let dataset = Dataset::write(reader, test_uri, Some(append_options.clone()))
.await
.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 1);
assert_eq!(fragments[0].metadata.files.len(), 1);
assert_eq!(&fragments[0].metadata.files[0].fields, &[0, 2, 1]);
assert_eq!(&fragments[0].metadata.files[0].column_indices, &[0, 1, 2]);
let just_c_b = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(vec![field_c.clone(), field_b.clone()].into()),
true,
)]));
let batch = RecordBatch::try_new(
just_c_b.clone(),
vec![Arc::new(StructArray::from(vec![
(
field_c.clone(),
Arc::new(Int32Array::from(vec![4])) as ArrayRef,
),
(field_b.clone(), Arc::new(Int32Array::from(vec![3]))),
]))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_c_b.clone());
let dataset = Dataset::write(reader, test_uri, Some(append_options.clone()))
.await
.unwrap();
dataset.validate().await.unwrap();
let fragments = dataset.get_fragments();
assert_eq!(fragments.len(), 2);
assert_eq!(fragments[1].metadata.files.len(), 1);
assert_eq!(&fragments[1].metadata.files[0].fields, &[0, 3, 2]);
assert_eq!(&fragments[1].metadata.files[0].column_indices, &[0, 1, 2]);
let just_a_c = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"s",
DataType::Struct(vec![field_a.clone(), field_c.clone()].into()),
true,
)]));
let batch = RecordBatch::try_new(
just_a_c.clone(),
vec![Arc::new(StructArray::from(vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![5])) as ArrayRef,
),
(field_c.clone(), Arc::new(Int32Array::from(vec![6]))),
]))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a_c.clone());
let res = Dataset::write(reader, test_uri, Some(append_options)).await;
assert!(
matches!(res, Err(Error::SchemaMismatch { .. })),
"Expected Error::SchemaMismatch, got {:?}",
res
);
let data = dataset.scan().try_into_batch().await.unwrap();
let expected = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StructArray::from(vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![Some(2), None])) as ArrayRef,
),
(field_b.clone(), Arc::new(Int32Array::from(vec![1, 3]))),
(
field_c.clone(),
Arc::new(Int32Array::from(vec![None, Some(4)])),
),
]))],
)
.unwrap();
assert_eq!(data, expected);
let result = dataset
.take(&[1, 0], Arc::new(dataset.schema().clone()))
.await
.unwrap();
let expected = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StructArray::from(vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![None, Some(2)])) as ArrayRef,
),
(field_b.clone(), Arc::new(Int32Array::from(vec![3, 1]))),
(
field_c.clone(),
Arc::new(Int32Array::from(vec![Some(4), None])),
),
]))],
)
.unwrap();
assert_eq!(result, expected);
}
#[tokio::test]
async fn test_insert_balanced_subschemas() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let field_a = ArrowField::new("a", DataType::Int32, true);
let field_b = ArrowField::new("b", DataType::Int64, true);
let schema = Arc::new(ArrowSchema::new(vec![
field_a.clone(),
field_b.clone().with_metadata(
[(
LANCE_STORAGE_CLASS_SCHEMA_META_KEY.to_string(),
"blob".to_string(),
)]
.into(),
),
]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
let options = WriteParams {
enable_move_stable_row_ids: true,
enable_v2_manifest_paths: true,
..Default::default()
};
let mut dataset = Dataset::write(empty_reader, test_uri, Some(options))
.await
.unwrap();
dataset.validate().await.unwrap();
let just_a = Arc::new(ArrowSchema::new(vec![field_a.clone()]));
let batch = RecordBatch::try_new(just_a.clone(), vec![Arc::new(Int32Array::from(vec![1]))])
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_a.clone());
let result = dataset.append(reader, None).await;
assert!(result.is_err());
assert!(matches!(result, Err(Error::SchemaMismatch { .. })));
let just_b = Arc::new(ArrowSchema::new(vec![field_b.clone()]));
let batch = RecordBatch::try_new(just_b.clone(), vec![Arc::new(Int64Array::from(vec![2]))])
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(batch)], just_b.clone());
let result = dataset.append(reader, None).await;
assert!(result.is_err());
assert!(matches!(result, Err(Error::SchemaMismatch { .. })));
}
}