use std::collections::{HashMap, HashSet};
use std::num::NonZero;
use std::sync::Arc;
use std::time::Instant;
use conflict_resolver::TransactionRebase;
use lance_core::utils::backoff::{Backoff, SlotBackoff};
use lance_core::utils::mask::RowAddrTreeMap;
use lance_file::version::LanceFileVersion;
use lance_index::metrics::NoOpMetricsCollector;
use lance_io::utils::CachedFileSize;
use lance_table::format::{
DETACHED_VERSION_MASK, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest,
WriterVersion, is_detached_version, list_index_files_with_sizes, pb,
};
use lance_table::io::commit::{
CommitConfig, CommitError, CommitHandler, ManifestLocation, ManifestNamingScheme,
};
use rand::{Rng, rng};
use super::ObjectStore;
use crate::Dataset;
use crate::dataset::cleanup::auto_cleanup_hook;
use crate::dataset::fragment::FileFragment;
use crate::dataset::transaction::{Operation, Transaction};
use crate::dataset::{
ManifestWriteConfig, NewTransactionResult, TRANSACTIONS_DIR, load_new_transactions,
write_manifest_file,
};
use crate::index::DatasetIndexInternalExt;
use crate::io::deletion::read_dataset_deletion_file;
use crate::session::Session;
use crate::session::caches::DSMetadataCache;
use crate::session::index_caches::IndexMetadataKey;
use futures::future::Either;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use lance_core::{Error, Result};
use lance_index::{DatasetIndexExt, is_system_index};
use lance_io::object_store::ObjectStoreRegistry;
use log;
use object_store::path::Path;
use prost::Message;
pub mod conflict_resolver;
#[cfg(all(feature = "dynamodb_tests", test))]
mod dynamodb;
#[cfg(test)]
mod external_manifest;
pub mod namespace_manifest;
#[cfg(all(feature = "dynamodb_tests", test))]
mod s3_test;
#[allow(dead_code)]
pub(crate) async fn read_transaction_file(
object_store: &ObjectStore,
base_path: &Path,
transaction_file: &str,
) -> Result<Transaction> {
let path = base_path.child(TRANSACTIONS_DIR).child(transaction_file);
let result = object_store.inner.get(&path).await?;
let data = result.bytes().await?;
let transaction = pb::Transaction::decode(data)?;
transaction.try_into()
}
pub(crate) async fn write_transaction_file(
object_store: &ObjectStore,
base_path: &Path,
transaction: &Transaction,
) -> Result<String> {
let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid);
let path = base_path.child(TRANSACTIONS_DIR).child(file_name.as_str());
let message = pb::Transaction::from(transaction);
let buf = message.encode_to_vec();
object_store.inner.put(&path, buf.into()).await?;
Ok(file_name)
}
#[allow(clippy::too_many_arguments)]
async fn do_commit_new_dataset(
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
base_path: &Path,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
manifest_naming_scheme: ManifestNamingScheme,
metadata_cache: &DSMetadataCache,
store_registry: Arc<ObjectStoreRegistry>,
) -> Result<(Manifest, ManifestLocation)> {
let transaction_file = if !write_config.disable_transaction_file() {
write_transaction_file(object_store, base_path, transaction).await?
} else {
String::new()
};
let (mut manifest, indices) = if let Operation::Clone {
is_shallow,
ref_name,
ref_version,
ref_path,
branch_name,
..
} = &transaction.operation
{
let source_base_path =
ObjectStore::extract_path_from_uri(store_registry, ref_path.as_str())?;
let source_manifest_location = commit_handler
.resolve_version_location(&source_base_path, *ref_version, &object_store.inner)
.await?;
let source_manifest = Dataset::load_manifest(
object_store,
&source_manifest_location,
base_path.to_string().as_str(),
&Session::default(),
)
.await?;
if *is_shallow {
let new_base_id = source_manifest
.base_paths
.keys()
.max()
.map(|id| *id + 1)
.unwrap_or(0);
let new_manifest = source_manifest.shallow_clone(
ref_name.clone(),
ref_path.clone(),
new_base_id,
branch_name.clone(),
transaction_file,
);
let updated_indices = if let Some(index_section_pos) = source_manifest.index_section {
let reader = object_store.open(&source_manifest_location.path).await?;
let section: pb::IndexSection =
lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?;
section
.indices
.into_iter()
.map(|index_pb| {
let mut index = IndexMetadata::try_from(index_pb)?;
index.base_id = Some(new_base_id);
Ok(index)
})
.collect::<Result<Vec<_>>>()?
} else {
vec![]
};
(new_manifest, updated_indices)
} else {
let mut new_manifest = source_manifest.clone();
new_manifest.base_paths.clear();
new_manifest.branch = None;
new_manifest.tag = None;
new_manifest.index_section = None; let mut new_frags = new_manifest.fragments.as_ref().clone();
for f in &mut new_frags {
for df in &mut f.files {
df.base_id = None;
}
if let Some(d) = f.deletion_file.as_mut() {
d.base_id = None;
}
}
new_manifest.fragments = Arc::new(new_frags);
let mut updated_indices = Vec::new();
if let Some(index_section_pos) = source_manifest.index_section {
let reader = object_store.open(&source_manifest_location.path).await?;
let section: pb::IndexSection =
lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?;
updated_indices = section
.indices
.into_iter()
.map(|index_pb| {
let mut index = IndexMetadata::try_from(index_pb)?;
index.base_id = None;
Ok(index)
})
.collect::<Result<Vec<_>>>()?;
}
(new_manifest, updated_indices)
}
} else {
let (manifest, indices) =
transaction.build_manifest(None, vec![], &transaction_file, write_config)?;
(manifest, indices)
};
let result = write_manifest_file(
object_store,
commit_handler,
base_path,
&mut manifest,
if indices.is_empty() {
None
} else {
Some(indices.clone())
},
write_config,
manifest_naming_scheme,
Some(transaction),
)
.await;
match result {
Ok(manifest_location) => {
let tx_key = crate::session::caches::TransactionKey {
version: manifest.version,
};
metadata_cache
.insert_with_key(&tx_key, Arc::new(transaction.clone()))
.await;
let manifest_key = crate::session::caches::ManifestKey {
version: manifest_location.version,
e_tag: manifest_location.e_tag.as_deref(),
};
metadata_cache
.insert_with_key(&manifest_key, Arc::new(manifest.clone()))
.await;
Ok((manifest, manifest_location))
}
Err(CommitError::CommitConflict) => {
Err(crate::Error::dataset_already_exists(base_path.to_string()))
}
Err(CommitError::OtherError(err)) => Err(err),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn commit_new_dataset(
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
base_path: &Path,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
manifest_naming_scheme: ManifestNamingScheme,
metadata_cache: &crate::session::caches::DSMetadataCache,
store_registry: Arc<ObjectStoreRegistry>,
) -> Result<(Manifest, ManifestLocation)> {
do_commit_new_dataset(
object_store,
commit_handler,
base_path,
transaction,
write_config,
manifest_naming_scheme,
metadata_cache,
store_registry,
)
.await
}
pub fn manifest_needs_migration(manifest: &Manifest, indices: &[IndexMetadata]) -> bool {
manifest.writer_version.is_none()
|| manifest.fragments.iter().any(|f| {
f.physical_rows.is_none()
|| (f
.deletion_file
.as_ref()
.map(|d| d.num_deleted_rows.is_none())
.unwrap_or(false))
})
|| indices
.iter()
.any(|i| must_recalculate_fragment_bitmap(i, manifest.writer_version.as_ref()))
}
async fn migrate_manifest(
dataset: &Dataset,
manifest: &mut Manifest,
recompute_stats: bool,
) -> Result<()> {
if !recompute_stats
&& manifest.fragments.iter().all(|f| {
f.num_rows().map(|n| n > 0).unwrap_or(false)
&& f.files.iter().all(|f| f.file_size_bytes.get().is_some())
})
{
return Ok(());
}
manifest.fragments =
Arc::new(migrate_fragments(dataset, &manifest.fragments, recompute_stats).await?);
Ok(())
}
fn check_storage_version(manifest: &mut Manifest) -> Result<()> {
let data_storage_version = manifest.data_storage_format.lance_file_version()?;
if manifest.data_storage_format.lance_file_version()? == LanceFileVersion::Legacy {
if let Some(actual_file_version) =
Fragment::try_infer_version(&manifest.fragments).map_err(|e| Error::internal(format!(
"The dataset contains a mixture of file versions. You will need to rollback to an earlier version: {}",
e
)))?
&& actual_file_version > data_storage_version {
log::warn!(
"Data storage version {} is less than the actual file version {}. This has been automatically updated.",
data_storage_version,
actual_file_version
);
manifest.data_storage_format = DataStorageFormat::new(actual_file_version);
}
} else {
if let Some(actual_file_version) = Fragment::try_infer_version(&manifest.fragments)?
&& actual_file_version != data_storage_version
{
return Err(Error::internal(format!(
"The operation added files with version {}. However, the data storage version is {}.",
actual_file_version, data_storage_version
)));
}
}
Ok(())
}
fn fix_schema(manifest: &mut Manifest) -> Result<()> {
if manifest.fragments.iter().all(|f| f.files.len() <= 1) {
return Ok(());
}
let mut fields_with_duplicate_ids = HashSet::new();
let mut seen_fields = HashSet::new();
for fragment in manifest.fragments.iter() {
for file in fragment.files.iter() {
for field_id in file.fields.iter() {
if *field_id >= 0 && !seen_fields.insert(*field_id) {
fields_with_duplicate_ids.insert(*field_id);
}
}
}
seen_fields.clear();
}
if fields_with_duplicate_ids.is_empty() {
return Ok(());
}
let mut field_id_seed = manifest.max_field_id() + 1;
let mut old_field_id_mapping: HashMap<i32, i32> = HashMap::new();
let mut fields_with_duplicate_ids = fields_with_duplicate_ids.into_iter().collect::<Vec<_>>();
fields_with_duplicate_ids.sort_unstable();
for field_id in fields_with_duplicate_ids {
old_field_id_mapping.insert(field_id, field_id_seed);
field_id_seed += 1;
}
let mut fragments = manifest.fragments.as_ref().clone();
seen_fields.clear();
for fragment in fragments.iter_mut() {
for field_id in fragment
.files
.iter_mut()
.rev()
.flat_map(|file| file.fields.iter_mut())
{
if let Some(new_field_id) = old_field_id_mapping.get(field_id)
&& seen_fields.insert(*field_id)
{
*field_id = *new_field_id;
}
}
seen_fields.clear();
}
for (old_field_id, new_field_id) in &old_field_id_mapping {
let field = manifest.schema.mut_field_by_id(*old_field_id).unwrap();
field.id = *new_field_id;
}
let remaining_field_ids = manifest
.schema
.fields_pre_order()
.map(|f| f.id)
.collect::<HashSet<_>>();
for fragment in fragments.iter_mut() {
fragment.files.retain(|file| {
file.fields
.iter()
.any(|field_id| remaining_field_ids.contains(field_id))
});
}
manifest.fragments = Arc::new(fragments);
Ok(())
}
pub(crate) async fn migrate_fragments(
dataset: &Dataset,
fragments: &[Fragment],
recompute_stats: bool,
) -> Result<Vec<Fragment>> {
let dataset = Arc::new(dataset.clone());
let new_fragments = futures::stream::iter(fragments)
.map(|fragment| async {
let physical_rows = if recompute_stats {
None
} else {
fragment.physical_rows
};
let physical_rows = if let Some(physical_rows) = physical_rows {
Either::Right(futures::future::ready(Ok(physical_rows)))
} else {
let file_fragment = FileFragment::new(dataset.clone(), fragment.clone());
Either::Left(async move { file_fragment.physical_rows().await })
};
let num_deleted_rows = match &fragment.deletion_file {
None => Either::Left(futures::future::ready(Ok(None))),
Some(DeletionFile {
num_deleted_rows: Some(deleted_rows),
..
}) if !recompute_stats => {
Either::Left(futures::future::ready(Ok(Some(*deleted_rows))))
}
Some(deletion_file) => Either::Right(async {
let deletion_vector =
read_dataset_deletion_file(dataset.as_ref(), fragment.id, deletion_file)
.await?;
Ok(Some(deletion_vector.len()))
}),
};
let (physical_rows, num_deleted_rows) =
futures::future::try_join(physical_rows, num_deleted_rows).await?;
let mut data_files = fragment.files.clone();
let object_store = dataset.object_store();
let get_sizes = data_files
.iter()
.map(|file| {
if let Some(size) = file.file_size_bytes.get() {
Either::Left(futures::future::ready(Ok(size)))
} else {
Either::Right(async {
object_store
.size(&dataset.base.child("data").child(file.path.clone()))
.map_ok(|size| {
NonZero::new(size).ok_or_else(|| {
Error::internal(format!("File {} has size 0", file.path))
})
})
.await?
})
}
})
.collect::<Vec<_>>();
let sizes = futures::future::try_join_all(get_sizes).await?;
data_files.iter_mut().zip(sizes).for_each(|(file, size)| {
file.file_size_bytes = CachedFileSize::new(size.into());
});
let deletion_file = fragment
.deletion_file
.as_ref()
.map(|deletion_file| DeletionFile {
num_deleted_rows,
..deletion_file.clone()
});
Ok::<_, Error>(Fragment {
physical_rows: Some(physical_rows),
deletion_file,
files: data_files,
..fragment.clone()
})
})
.buffered(dataset.object_store.io_parallelism())
.try_filter(|frag| futures::future::ready(frag.num_rows().map(|n| n > 0).unwrap_or(false)))
.boxed();
new_fragments.try_collect().await
}
fn must_recalculate_fragment_bitmap(
index: &IndexMetadata,
version: Option<&WriterVersion>,
) -> bool {
if index.fragment_bitmap.is_none() {
return true;
}
if let Some(version) = version {
if version.library != "lance" {
return false;
}
let cutoff = semver::Version::new(0, 8, 15);
version
.lance_lib_version()
.map(|lance_lib_version| lance_lib_version < cutoff)
.unwrap_or(true)
} else {
true
}
}
async fn migrate_indices(dataset: &Dataset, indices: &mut [IndexMetadata]) -> Result<()> {
let needs_recalculating = match detect_overlapping_fragments(indices) {
Ok(()) => vec![],
Err(BadFragmentBitmapError { bad_indices }) => {
bad_indices.into_iter().map(|(name, _)| name).collect()
}
};
for index in indices.iter_mut() {
if needs_recalculating.contains(&index.name)
|| must_recalculate_fragment_bitmap(index, dataset.manifest.writer_version.as_ref())
&& !is_system_index(index)
{
debug_assert_eq!(index.fields.len(), 1);
let idx_field = dataset.schema().field_by_id(index.fields[0]).ok_or_else(|| Error::internal(format!("Index with uuid {} referred to field with id {} which did not exist in dataset", index.uuid, index.fields[0])))?;
let idx = dataset
.open_generic_index(
&idx_field.name,
&index.uuid.to_string(),
&NoOpMetricsCollector,
)
.await?;
index.fragment_bitmap = Some(idx.calculate_included_frags().await?);
}
if index.index_details.is_none() {
log::debug!(
"the index with uuid {} is missing index metadata. This probably means it was written with Lance version <= 0.19.2. This is not a problem.",
index.uuid
);
}
if index.files.is_none() && !is_system_index(index) {
let result = async {
let index_dir = dataset
.indice_files_dir(index)?
.child(index.uuid.to_string());
list_index_files_with_sizes(&dataset.object_store, &index_dir).await
}
.await;
match result {
Ok(files) => {
log::debug!(
"Migrated file sizes for index {} (uuid: {}): {} files",
index.name,
index.uuid,
files.len()
);
index.files = Some(files);
}
Err(e) => {
log::debug!(
"Could not collect file sizes for index {} (uuid: {}): {}",
index.name,
index.uuid,
e
);
}
}
}
}
Ok(())
}
pub(crate) struct BadFragmentBitmapError {
pub bad_indices: Vec<(String, Vec<u32>)>,
}
pub(crate) fn detect_overlapping_fragments(
indices: &[IndexMetadata],
) -> std::result::Result<(), BadFragmentBitmapError> {
let index_names: HashSet<&str> = indices.iter().map(|i| i.name.as_str()).collect();
let mut bad_indices = Vec::new(); for name in index_names {
let mut seen_fragment_ids = HashSet::new();
let mut overlap = Vec::new();
for index in indices.iter().filter(|i| i.name == name) {
if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() {
for fragment in fragment_bitmap {
if !seen_fragment_ids.insert(fragment) {
overlap.push(fragment);
}
}
}
}
if !overlap.is_empty() {
bad_indices.push((name.to_string(), overlap));
}
}
if bad_indices.is_empty() {
Ok(())
} else {
Err(BadFragmentBitmapError { bad_indices })
}
}
pub(crate) async fn do_commit_detached_transaction(
dataset: &Dataset,
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
commit_config: &CommitConfig,
) -> Result<(Manifest, ManifestLocation)> {
let transaction_file = if !write_config.disable_transaction_file() {
write_transaction_file(object_store, &dataset.base, transaction).await?
} else {
String::new()
};
let mut backoff = Backoff::default();
while backoff.attempt() < commit_config.num_retries {
let random_version = rng().random::<u64>() | DETACHED_VERSION_MASK;
let (mut manifest, mut indices) = match transaction.operation {
Operation::Restore { version } => {
Transaction::restore_old_manifest(
object_store,
commit_handler,
&dataset.base,
version,
write_config,
&transaction_file,
&dataset.manifest,
)
.await?
}
_ => transaction.build_manifest(
Some(dataset.manifest.as_ref()),
dataset.load_indices().await?.as_ref().clone(),
&transaction_file,
write_config,
)?,
};
manifest.version = random_version;
migrate_manifest(dataset, &mut manifest, false).await?;
fix_schema(&mut manifest)?;
check_storage_version(&mut manifest)?;
migrate_indices(dataset, &mut indices).await?;
let result = write_manifest_file(
object_store,
commit_handler,
&dataset.base,
&mut manifest,
if indices.is_empty() {
None
} else {
Some(indices.clone())
},
write_config,
ManifestNamingScheme::V2,
Some(transaction),
)
.await;
match result {
Ok(location) => {
return Ok((manifest, location));
}
Err(CommitError::CommitConflict) => {
tokio::time::sleep(backoff.next_backoff()).await;
}
Err(CommitError::OtherError(err)) => {
return Err(err);
}
}
}
Err(crate::Error::commit_conflict_source(
0,
format!(
"Failed find unused random u64 after {} retries.",
commit_config.num_retries
)
.into(),
))
}
pub(crate) async fn commit_detached_transaction(
dataset: &Dataset,
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
commit_config: &CommitConfig,
) -> Result<(Manifest, ManifestLocation)> {
do_commit_detached_transaction(
dataset,
object_store,
commit_handler,
transaction,
write_config,
commit_config,
)
.await
}
async fn load_and_sort_new_transactions(
dataset: &Dataset,
) -> Result<(Dataset, Vec<(u64, Arc<Transaction>)>)> {
let NewTransactionResult {
dataset: new_ds,
new_transactions,
} = load_new_transactions(dataset);
let new_transactions = new_transactions.try_collect::<Vec<_>>();
let (new_ds, mut txns) = futures::future::try_join(new_ds, new_transactions).await?;
txns.sort_by_key(|(version, _)| *version);
Ok((new_ds, txns))
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn commit_transaction(
dataset: &Dataset,
object_store: &ObjectStore,
commit_handler: &dyn CommitHandler,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
commit_config: &CommitConfig,
manifest_naming_scheme: ManifestNamingScheme,
affected_rows: Option<&RowAddrTreeMap>,
) -> Result<(Manifest, ManifestLocation)> {
let read_version = transaction.read_version;
let mut target_version = read_version + 1;
let original_dataset = dataset.clone();
let strict_overwrite = matches!(transaction.operation, Operation::Overwrite { .. })
&& commit_config.num_retries == 0;
let mut dataset =
if dataset.manifest.version != read_version && (read_version != 0 || strict_overwrite) {
dataset.checkout_version(read_version).await?
} else {
dataset.clone()
};
let mut transaction = transaction.clone();
let num_attempts = std::cmp::max(commit_config.num_retries, 1);
let mut backoff = SlotBackoff::default();
let start = Instant::now();
let mut other_transactions: Vec<(u64, Arc<Transaction>)>;
while backoff.attempt() < num_attempts {
if !strict_overwrite {
(dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?;
let mut rebase =
TransactionRebase::try_new(&original_dataset, transaction, affected_rows).await?;
for (other_version, other_transaction) in other_transactions.iter() {
rebase.check_txn(other_transaction, *other_version)?;
}
transaction = rebase.finish(&dataset).await?;
}
let transaction_file = if !write_config.disable_transaction_file() {
write_transaction_file(object_store, &dataset.base, &transaction).await?
} else {
String::new()
};
target_version = dataset.manifest.version + 1;
if is_detached_version(target_version) {
return Err(Error::internal(
"more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.",
));
}
let (mut manifest, mut indices) = match transaction.operation {
Operation::Restore { version } => {
Transaction::restore_old_manifest(
object_store,
commit_handler,
&dataset.base,
version,
write_config,
&transaction_file,
&dataset.manifest,
)
.await?
}
_ => transaction.build_manifest(
Some(dataset.manifest.as_ref()),
dataset.load_indices().await?.as_ref().clone(),
&transaction_file,
write_config,
)?,
};
manifest.version = target_version;
let previous_writer_version = &dataset.manifest.writer_version;
let recompute_stats = previous_writer_version.is_none();
migrate_manifest(&dataset, &mut manifest, recompute_stats).await?;
fix_schema(&mut manifest)?;
check_storage_version(&mut manifest)?;
migrate_indices(&dataset, &mut indices).await?;
let result = write_manifest_file(
object_store,
commit_handler,
&dataset.base,
&mut manifest,
if indices.is_empty() {
None
} else {
Some(indices.clone())
},
write_config,
manifest_naming_scheme,
Some(&transaction),
)
.await;
match result {
Ok(manifest_location) => {
let tx_key = crate::session::caches::TransactionKey {
version: target_version,
};
dataset
.metadata_cache
.insert_with_key(&tx_key, Arc::new(transaction.clone()))
.await;
let manifest_key = crate::session::caches::ManifestKey {
version: manifest_location.version,
e_tag: manifest_location.e_tag.as_deref(),
};
dataset
.metadata_cache
.insert_with_key(&manifest_key, Arc::new(manifest.clone()))
.await;
if !indices.is_empty() {
let key = IndexMetadataKey {
version: target_version,
};
dataset
.index_cache
.insert_with_key(&key, Arc::new(indices))
.await;
}
if !commit_config.skip_auto_cleanup {
match auto_cleanup_hook(&dataset, &manifest).await {
Ok(Some(stats)) => log::info!("Auto cleanup triggered: {:?}", stats),
Err(e) => log::error!("Error encountered during auto_cleanup_hook: {}", e),
_ => {}
};
}
return Ok((manifest, manifest_location));
}
Err(CommitError::CommitConflict) => {
let next_attempt_i = backoff.attempt() + 1;
if backoff.attempt() == 0 {
backoff = backoff.with_unit((start.elapsed().as_millis() * 11 / 10) as u32);
}
if next_attempt_i < num_attempts {
tokio::time::sleep(backoff.next_backoff()).await;
continue;
} else {
break;
}
}
Err(CommitError::OtherError(err)) => {
return Err(err);
}
}
}
Err(crate::Error::commit_conflict_source(
target_version,
format!(
"Failed to commit the transaction after {} retries.",
commit_config.num_retries
)
.into(),
))
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use arrow_array::types::Int32Type;
use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use futures::future::join_all;
use lance_arrow::FixedSizeListArrayExt;
use lance_core::datatypes::{Field, Schema};
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::{BatchCount, RowCount, array, gen_batch};
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
use lance_table::format::{DataFile, DataStorageFormat};
use lance_table::io::commit::{
CommitLease, CommitLock, RenameCommitHandler, UnsafeCommitHandler,
};
use lance_testing::datagen::generate_random_array;
use super::*;
use crate::Dataset;
use crate::dataset::{WriteMode, WriteParams};
use crate::index::vector::VectorIndexParams;
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
async fn test_commit_handler(handler: Arc<dyn CommitHandler>, should_succeed: bool) {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"x",
DataType::Int64,
false,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data)], schema);
let options = WriteParams {
commit_handler: Some(handler),
..Default::default()
};
let dataset = Dataset::write(reader, "memory://test", Some(options))
.await
.unwrap();
let tasks = (0..10).map(|_| {
let mut dataset = dataset.clone();
tokio::task::spawn(async move {
dataset
.delete("x = 2")
.await
.map(|_| dataset.manifest.version)
})
});
let task_results: Vec<Option<u64>> = join_all(tasks)
.await
.iter()
.map(|res| match res {
Ok(Ok(version)) => Some(*version),
_ => None,
})
.collect();
let num_successes = task_results.iter().filter(|x| x.is_some()).count();
let distinct_results: HashSet<_> = task_results.iter().filter_map(|x| x.as_ref()).collect();
if should_succeed {
assert_eq!(
num_successes,
distinct_results.len(),
"Expected no two tasks to succeed for the same version. Got {:?}",
task_results
);
} else {
assert!(num_successes >= distinct_results.len(),);
}
}
#[tokio::test]
async fn test_rename_commit_handler() {
let handler = Arc::new(RenameCommitHandler);
test_commit_handler(handler, true).await;
}
#[tokio::test]
async fn test_custom_commit() {
#[derive(Debug)]
struct CustomCommitHandler {
locked_version: Arc<Mutex<Option<u64>>>,
}
struct CustomCommitLease {
version: u64,
locked_version: Arc<Mutex<Option<u64>>>,
}
#[async_trait::async_trait]
impl CommitLock for CustomCommitHandler {
type Lease = CustomCommitLease;
async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError> {
let mut locked_version = self.locked_version.lock().unwrap();
if locked_version.is_some() {
return Err(CommitError::CommitConflict);
}
*locked_version = Some(version);
Ok(CustomCommitLease {
version,
locked_version: self.locked_version.clone(),
})
}
}
#[async_trait::async_trait]
impl CommitLease for CustomCommitLease {
async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
let mut locked_version = self.locked_version.lock().unwrap();
if *locked_version != Some(self.version) {
return Err(CommitError::CommitConflict);
}
*locked_version = None;
Ok(())
}
}
let locked_version = Arc::new(Mutex::new(None));
let handler = Arc::new(CustomCommitHandler { locked_version });
test_commit_handler(handler, true).await;
}
#[tokio::test]
async fn test_unsafe_commit_handler() {
let handler = Arc::new(UnsafeCommitHandler);
test_commit_handler(handler, false).await;
}
#[tokio::test]
async fn test_roundtrip_transaction_file() {
let object_store = ObjectStore::memory();
let base_path = Path::from("test");
let transaction = Transaction::new(
42,
Operation::Append { fragments: vec![] },
Some("hello world".to_string()),
);
let file_name = write_transaction_file(&object_store, &base_path, &transaction)
.await
.unwrap();
let read_transaction = read_transaction_file(&object_store, &base_path, &file_name)
.await
.unwrap();
assert_eq!(transaction.read_version, read_transaction.read_version);
assert_eq!(transaction.uuid, read_transaction.uuid);
assert!(matches!(
read_transaction.operation,
Operation::Append { .. }
));
assert_eq!(transaction.tag, read_transaction.tag);
}
#[tokio::test]
async fn test_concurrent_create_index() {
let test_dir = TempStrDir::default();
let test_uri = test_dir.as_str();
let dimension = 16;
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
"vector1",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
dimension,
),
false,
),
ArrowField::new(
"vector2",
DataType::FixedSizeList(
Arc::new(ArrowField::new("item", DataType::Float32, true)),
dimension,
),
false,
),
]));
let float_arr = generate_random_array(512 * dimension as usize);
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
float_arr, dimension,
)
.unwrap(),
);
let batches = vec![
RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()]).unwrap(),
];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
dataset.validate().await.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 50);
let futures: Vec<_> = ["vector1", "vector1", "vector2"]
.iter()
.map(|col_name| {
let mut dataset = dataset.clone();
let params = params.clone();
tokio::spawn(async move {
dataset
.create_index(&[col_name], IndexType::Vector, None, ¶ms, true)
.await
})
})
.collect();
let results = join_all(futures).await;
let success_count = results
.iter()
.filter(|result| matches!(result, Ok(Ok(_))))
.count();
let retryable_count = results
.iter()
.filter(|result| matches!(result, Ok(Err(Error::RetryableCommitConflict { .. }))))
.count();
assert_eq!(success_count, 2, "{results:?}");
assert_eq!(retryable_count, 1, "{results:?}");
let dataset = dataset.checkout_version(1).await.unwrap();
assert!(dataset.load_indices().await.unwrap().is_empty());
let dataset = dataset.checkout_version(2).await.unwrap();
assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
let dataset = dataset.checkout_version(3).await.unwrap();
let indices = dataset.load_indices().await.unwrap();
assert!(!indices.is_empty() && indices.len() <= 2);
if indices.len() == 2 {
let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
fields.sort();
assert_eq!(fields, vec![0, 1]);
} else {
assert_eq!(indices[0].fields, vec![0]);
}
assert!(dataset.checkout_version(4).await.is_err());
}
#[tokio::test]
async fn test_load_and_sort_new_transactions() {
let mut dataset = lance_datagen::gen_batch()
.col("i", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10))
.await
.unwrap();
for i in 0..100 {
dataset
.update_config(vec![(format!("key_{}", i), format!("value_{}", i))])
.await
.unwrap();
}
let dataset_v1 = dataset.checkout_version(1).await.unwrap();
let (_, transactions) = load_and_sort_new_transactions(&dataset_v1).await.unwrap();
let versions: Vec<u64> = transactions.iter().map(|(v, _)| *v).collect();
for i in 1..versions.len() {
assert!(
versions[i] > versions[i - 1],
"Transactions not in order: version {} came after version {}",
versions[i],
versions[i - 1]
);
}
assert_eq!(transactions.len(), 100);
assert_eq!(versions.first(), Some(&2));
assert_eq!(versions.last(), Some(&101));
}
#[tokio::test]
async fn test_concurrent_writes() {
let test_dir = TempStrDir::default();
let test_uri = test_dir.as_str();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let dataset = Dataset::write(
RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
test_uri,
None,
)
.await
.unwrap();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let futures: Vec<_> = (0..5)
.map(|_| {
let batch = batch.clone();
let schema = schema.clone();
let uri = test_uri.to_string();
tokio::spawn(async move {
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
Dataset::write(
reader,
&uri,
Some(WriteParams {
mode: WriteMode::Append,
..Default::default()
}),
)
.await
})
})
.collect();
let results = join_all(futures).await;
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}
let dataset = dataset.checkout_version(6).await.unwrap();
assert_eq!(dataset.get_fragments().len(), 5);
dataset.validate().await.unwrap()
}
#[tokio::test]
async fn test_restore_does_not_decrease_max_fragment_id() {
let reader = gen_batch()
.col("i", array::step::<Int32Type>())
.into_reader_rows(RowCount::from(3), BatchCount::from(1));
let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap();
for _ in 0..2 {
let reader = gen_batch()
.col("i", array::step::<Int32Type>())
.into_reader_rows(RowCount::from(3), BatchCount::from(1));
dataset.append(reader, None).await.unwrap();
}
let latest_max = dataset.manifest.max_fragment_id().unwrap_or(0);
let mut dataset_v1 = dataset.checkout_version(1).await.unwrap();
dataset_v1.restore().await.unwrap();
let restored_max = dataset_v1.manifest.max_fragment_id().unwrap_or(0);
assert!(
restored_max >= latest_max,
"max_fragment_id should not decrease on restore: before={}, after={}",
latest_max,
restored_max
);
}
async fn get_empty_dataset() -> (TempStrDir, Dataset) {
let test_dir = TempStrDir::default();
let test_uri = test_dir.as_str();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let ds = Dataset::write(
RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
test_uri,
None,
)
.await
.unwrap();
(test_dir, ds)
}
#[tokio::test]
async fn test_good_concurrent_config_writes() {
let (_tmpdir, dataset) = get_empty_dataset().await;
let original_num_config_keys = dataset.manifest.config.len();
let futures: Vec<_> = ["key1", "key2", "key3", "key4", "key5"]
.iter()
.map(|key| {
let mut dataset = dataset.clone();
tokio::spawn(async move {
dataset
.update_config(HashMap::from([(
key.to_string(),
Some("value".to_string()),
)]))
.await
})
})
.collect();
let results = join_all(futures).await;
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}
let dataset = dataset.checkout_version(6).await.unwrap();
assert_eq!(dataset.manifest.config.len(), 5 + original_num_config_keys);
dataset.validate().await.unwrap();
let futures: Vec<_> = ["key1", "key1", "key1", "key2", "key2"]
.iter()
.map(|key| {
let mut dataset = dataset.clone();
tokio::spawn(async move {
dataset
.update_config(HashMap::from([(key.to_string(), None)]))
.await
})
})
.collect();
let results = join_all(futures).await;
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}
let dataset = dataset.checkout_version(11).await.unwrap();
assert_eq!(dataset.manifest.config.len(), 3 + original_num_config_keys);
dataset.validate().await.unwrap()
}
#[tokio::test]
async fn test_bad_concurrent_config_writes() {
let (_tmpdir, dataset) = get_empty_dataset().await;
let futures: Vec<_> = ["key1", "key1", "key2", "key3", "key4"]
.iter()
.map(|key| {
let mut dataset = dataset.clone();
tokio::spawn(async move {
dataset
.update_config(HashMap::from([(
key.to_string(),
Some("value".to_string()),
)]))
.await
})
})
.collect();
let results = join_all(futures).await;
let mut first_operation_failed = false;
for (i, result) in results.into_iter().enumerate() {
let result = result.unwrap();
match i {
0 => {
if result.is_err() {
first_operation_failed = true;
assert!(
matches!(&result, &Err(Error::IncompatibleTransaction { .. })),
"{:?}",
result,
);
}
}
1 => match first_operation_failed {
true => assert!(result.is_ok(), "{:?}", result),
false => {
assert!(
matches!(&result, &Err(Error::IncompatibleTransaction { .. })),
"{:?}",
result,
);
}
},
_ => assert!(result.is_ok(), "{:?}", result),
}
}
}
#[test]
fn test_fix_schema() {
let mut field0 =
Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
field0.set_id(-1, &mut 0);
let mut field2 =
Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
field2.set_id(-1, &mut 2);
let schema = Schema {
fields: vec![field0.clone(), field2.clone()],
metadata: Default::default(),
};
let fragments = vec![
Fragment {
id: 0,
files: vec![
DataFile::new_legacy_from_fields("path1", vec![0, 1, 2], None),
DataFile::new_legacy_from_fields("unused", vec![9], None),
],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
},
Fragment {
id: 1,
files: vec![
DataFile::new_legacy_from_fields("path2", vec![0, 1, 2], None),
DataFile::new_legacy_from_fields("path3", vec![2], None),
],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
},
];
let mut manifest = Manifest::new(
schema,
Arc::new(fragments),
DataStorageFormat::default(),
HashMap::new(),
);
fix_schema(&mut manifest).unwrap();
field2.id = 10;
let expected_schema = Schema {
fields: vec![field0, field2],
metadata: Default::default(),
};
assert_eq!(manifest.schema, expected_schema);
let expected_fragments = vec![
Fragment {
id: 0,
files: vec![DataFile::new_legacy_from_fields(
"path1",
vec![0, 1, 10],
None,
)],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
},
Fragment {
id: 1,
files: vec![
DataFile::new_legacy_from_fields("path2", vec![0, 1, 2], None),
DataFile::new_legacy_from_fields("path3", vec![10], None),
],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
},
];
assert_eq!(manifest.fragments.as_ref(), &expected_fragments);
}
}