use std::sync::Arc;
use snafu::{location, Location};
use futures::future::Either;
use futures::{StreamExt, TryStreamExt};
use lance_core::{
format::{pb, Index, Manifest},
io::commit::{CommitConfig, CommitError},
Error, Result,
};
use object_store::path::Path;
use prost::Message;
use super::deletion::read_deletion_file;
use super::ObjectStore;
use crate::dataset::fragment::FileFragment;
use crate::dataset::transaction::{Operation, Transaction};
use crate::dataset::{write_manifest_file, ManifestWriteConfig};
use crate::format::{DeletionFile, Fragment};
use crate::index::DatasetIndexInternalExt;
use crate::Dataset;
#[cfg(all(target_feature = "dynamodb", tests))]
mod dynamodb;
#[cfg(tests)]
mod external_manifest;
pub use lance_core::io::commit::latest_manifest_path;
async fn read_transaction_file(
object_store: &ObjectStore,
base_path: &Path,
transaction_file: &str,
) -> Result<Transaction> {
let path = base_path.child("_transactions").child(transaction_file);
let result = object_store.inner.get(&path).await?;
let data = result.bytes().await?;
let transaction = pb::Transaction::decode(data)?;
(&transaction).try_into()
}
async fn write_transaction_file(
object_store: &ObjectStore,
base_path: &Path,
transaction: &Transaction,
) -> Result<String> {
let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid);
let path = base_path.child("_transactions").child(file_name.as_str());
let message = pb::Transaction::from(transaction);
let buf = message.encode_to_vec();
object_store.inner.put(&path, buf.into()).await?;
Ok(file_name)
}
fn check_transaction(
transaction: &Transaction,
other_version: u64,
other_transaction: &Option<Transaction>,
) -> Result<()> {
if other_transaction.is_none() {
return Err(crate::Error::Internal {
message: format!(
"There was a conflicting transaction at version {}, \
and it was missing transaction metadata.",
other_version
),
location: location!(),
});
}
if transaction.conflicts_with(other_transaction.as_ref().unwrap()) {
return Err(crate::Error::CommitConflict {
version: other_version,
source: format!(
"There was a concurrent commit that conflicts with this one and it \
cannot be automatically resolved. Please rerun the operation off the latest version \
of the table.\n Transaction: {:?}\n Conflicting Transaction: {:?}",
transaction, other_transaction
)
.into(),
location: location!(),
});
}
Ok(())
}
pub(crate) async fn commit_new_dataset(
object_store: &ObjectStore,
base_path: &Path,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
) -> Result<Manifest> {
let transaction_file = write_transaction_file(object_store, base_path, transaction).await?;
let (mut manifest, indices) =
transaction.build_manifest(None, vec![], &transaction_file, write_config)?;
write_manifest_file(
object_store,
base_path,
&mut manifest,
if indices.is_empty() {
None
} else {
Some(indices.clone())
},
write_config,
)
.await?;
Ok(manifest)
}
async fn migrate_manifest(
dataset: &Dataset,
manifest: &mut Manifest,
recompute_stats: bool,
) -> Result<()> {
if !recompute_stats
&& manifest.fragments.iter().all(|f| {
f.physical_rows.is_some()
&& (f
.deletion_file
.as_ref()
.map(|d| d.num_deleted_rows.is_some())
.unwrap_or(true))
})
{
return Ok(());
}
manifest.fragments =
Arc::new(migrate_fragments(dataset, &manifest.fragments, recompute_stats).await?);
Ok(())
}
pub(crate) async fn migrate_fragments(
dataset: &Dataset,
fragments: &[Fragment],
recompute_stats: bool,
) -> Result<Vec<Fragment>> {
let dataset = Arc::new(dataset.clone());
let new_fragments = futures::stream::iter(fragments)
.map(|fragment| async {
let physical_rows = if recompute_stats {
None
} else {
fragment.physical_rows
};
let physical_rows = if let Some(physical_rows) = physical_rows {
Either::Right(futures::future::ready(Ok(physical_rows)))
} else {
let file_fragment = FileFragment::new(dataset.clone(), fragment.clone());
Either::Left(async move { file_fragment.physical_rows().await })
};
let num_deleted_rows = match &fragment.deletion_file {
None => Either::Left(futures::future::ready(Ok(None))),
Some(DeletionFile {
num_deleted_rows: Some(deleted_rows),
..
}) if !recompute_stats => {
Either::Left(futures::future::ready(Ok(Some(*deleted_rows))))
}
Some(_) => Either::Right(async {
let deletion_vector =
read_deletion_file(&dataset.base, fragment, dataset.object_store()).await?;
if let Some(deletion_vector) = deletion_vector {
Ok(Some(deletion_vector.len()))
} else {
Ok(None)
}
}),
};
let (physical_rows, num_deleted_rows) =
futures::future::try_join(physical_rows, num_deleted_rows).await?;
let deletion_file = fragment
.deletion_file
.as_ref()
.map(|deletion_file| DeletionFile {
num_deleted_rows,
..deletion_file.clone()
});
Ok::<_, Error>(Fragment {
physical_rows: Some(physical_rows),
deletion_file,
..fragment.clone()
})
})
.buffered(num_cpus::get() * 2)
.boxed();
new_fragments.try_collect().await
}
async fn migrate_indices(dataset: &Dataset, indices: &mut [Index]) -> Result<()> {
for index in indices {
let mut must_recalculate_fragment_bitmap = index.fragment_bitmap.is_none();
must_recalculate_fragment_bitmap |=
if let Some(writer_version) = &dataset.manifest.writer_version {
writer_version.older_than(0, 8, 15)
} else {
true
};
if must_recalculate_fragment_bitmap {
debug_assert_eq!(index.fields.len(), 1);
let idx_field = dataset.schema().field_by_id(index.fields[0]).ok_or_else(|| Error::Internal { message: format!("Index with uuid {} referred to field with id {} which did not exist in dataset", index.uuid, index.fields[0]), location: location!() })?;
let idx = dataset
.open_generic_index(&idx_field.name, &index.uuid.to_string())
.await?;
index.fragment_bitmap = Some(idx.calculate_included_frags().await?);
}
}
Ok(())
}
pub(crate) async fn commit_transaction(
dataset: &Dataset,
object_store: &ObjectStore,
transaction: &Transaction,
write_config: &ManifestWriteConfig,
commit_config: &CommitConfig,
) -> Result<Manifest> {
let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?;
let mut dataset = dataset.clone();
let mut other_transactions = Vec::new();
let mut version = transaction.read_version;
loop {
version += 1;
match dataset.checkout_version(version).await {
Ok(next_dataset) => {
let other_txn = if let Some(txn_file) = &next_dataset.manifest.transaction_file {
Some(read_transaction_file(object_store, &next_dataset.base, txn_file).await?)
} else {
None
};
other_transactions.push(other_txn);
dataset = next_dataset;
}
Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) => {
break;
}
Err(e) => {
return Err(e);
}
}
}
let mut target_version = version;
for (version_offset, other_transaction) in other_transactions.iter().enumerate() {
let other_version = transaction.read_version + version_offset as u64 + 1;
check_transaction(transaction, other_version, other_transaction)?;
}
for _ in 0..commit_config.num_retries {
let (mut manifest, mut indices) = match transaction.operation {
Operation::Restore { version } => {
Transaction::restore_old_manifest(
object_store,
&dataset.base,
version,
write_config,
&transaction_file,
)
.await?
}
_ => transaction.build_manifest(
Some(dataset.manifest.as_ref()),
dataset.load_indices().await?,
&transaction_file,
write_config,
)?,
};
manifest.version = target_version;
let previous_writer_version = &dataset.manifest.writer_version;
let recompute_stats = previous_writer_version.is_none();
migrate_manifest(&dataset, &mut manifest, recompute_stats).await?;
migrate_indices(&dataset, &mut indices).await?;
let result = write_manifest_file(
object_store,
&dataset.base,
&mut manifest,
if indices.is_empty() {
None
} else {
Some(indices.clone())
},
write_config,
)
.await;
match result {
Ok(()) => {
return Ok(manifest);
}
Err(CommitError::CommitConflict) => {
dataset = dataset.checkout_version(target_version).await?;
let other_transaction =
if let Some(txn_file) = dataset.manifest.transaction_file.as_ref() {
Some(read_transaction_file(object_store, &dataset.base, txn_file).await?)
} else {
None
};
check_transaction(transaction, target_version, &other_transaction)?;
target_version += 1;
}
Err(CommitError::OtherError(err)) => {
return Err(err);
}
}
}
Err(crate::Error::CommitConflict {
version: target_version,
source: format!(
"Failed to commit the transaction after {} retries.",
commit_config.num_retries
)
.into(),
location: location!(),
})
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use futures::future::join_all;
use lance_arrow::FixedSizeListArrayExt;
use lance_core::io::commit::{
CommitError, CommitHandler, CommitLease, CommitLock, RenameCommitHandler,
UnsafeCommitHandler,
};
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
use lance_testing::datagen::generate_random_array;
use super::*;
use crate::dataset::{transaction::Operation, WriteMode, WriteParams};
use crate::index::{vector::VectorIndexParams, DatasetIndexExt};
use crate::io::object_store::ObjectStoreParams;
use crate::Dataset;
async fn test_commit_handler(handler: Arc<dyn CommitHandler>, should_succeed: bool) {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"x",
DataType::Int64,
false,
)]));
let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(data)], schema);
let options = WriteParams {
store_params: Some(ObjectStoreParams {
commit_handler: Some(handler),
..Default::default()
}),
..Default::default()
};
let dataset = Dataset::write(reader, "memory://test", Some(options))
.await
.unwrap();
let tasks = (0..10).map(|_| {
let mut dataset = dataset.clone();
tokio::task::spawn(async move {
dataset
.delete("x = 2")
.await
.map(|_| dataset.manifest.version)
})
});
let task_results: Vec<Option<u64>> = join_all(tasks)
.await
.iter()
.map(|res| match res {
Ok(Ok(version)) => Some(*version),
_ => None,
})
.collect();
let num_successes = task_results.iter().filter(|x| x.is_some()).count();
let distinct_results: HashSet<_> = task_results.iter().filter_map(|x| x.as_ref()).collect();
if should_succeed {
assert_eq!(
num_successes,
distinct_results.len(),
"Expected no two tasks to succeed for the same version. Got {:?}",
task_results
);
} else {
assert!(num_successes >= distinct_results.len(),);
}
}
#[tokio::test]
async fn test_rename_commit_handler() {
let handler = Arc::new(RenameCommitHandler);
test_commit_handler(handler, true).await;
}
#[tokio::test]
async fn test_custom_commit() {
#[derive(Debug)]
struct CustomCommitHandler {
locked_version: Arc<Mutex<Option<u64>>>,
}
struct CustomCommitLease {
version: u64,
locked_version: Arc<Mutex<Option<u64>>>,
}
#[async_trait::async_trait]
impl CommitLock for CustomCommitHandler {
type Lease = CustomCommitLease;
async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError> {
let mut locked_version = self.locked_version.lock().unwrap();
if locked_version.is_some() {
return Err(CommitError::CommitConflict);
}
*locked_version = Some(version);
Ok(CustomCommitLease {
version,
locked_version: self.locked_version.clone(),
})
}
}
#[async_trait::async_trait]
impl CommitLease for CustomCommitLease {
async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
let mut locked_version = self.locked_version.lock().unwrap();
if *locked_version != Some(self.version) {
return Err(CommitError::CommitConflict);
}
*locked_version = None;
Ok(())
}
}
let locked_version = Arc::new(Mutex::new(None));
let handler = Arc::new(CustomCommitHandler { locked_version });
test_commit_handler(handler, true).await;
}
#[tokio::test]
async fn test_unsafe_commit_handler() {
let handler = Arc::new(UnsafeCommitHandler);
test_commit_handler(handler, false).await;
}
#[tokio::test]
async fn test_roundtrip_transaction_file() {
let object_store = ObjectStore::memory();
let base_path = Path::from("test");
let transaction = Transaction::new(
42,
Operation::Append { fragments: vec![] },
Some("hello world".to_string()),
);
let file_name = write_transaction_file(&object_store, &base_path, &transaction)
.await
.unwrap();
let read_transaction = read_transaction_file(&object_store, &base_path, &file_name)
.await
.unwrap();
assert_eq!(transaction.read_version, read_transaction.read_version);
assert_eq!(transaction.uuid, read_transaction.uuid);
assert!(matches!(
read_transaction.operation,
Operation::Append { .. }
));
assert_eq!(transaction.tag, read_transaction.tag);
}
#[tokio::test]
async fn test_concurrent_create_index() {
let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let dimension = 16;
let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
"vector1",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimension,
),
false,
),
Field::new(
"vector2",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimension,
),
false,
),
]));
let float_arr = generate_random_array(512 * dimension as usize);
let vectors = Arc::new(
<arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
float_arr, dimension,
)
.unwrap(),
);
let batches =
vec![
RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()])
.unwrap(),
];
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
dataset.validate().await.unwrap();
let params = VectorIndexParams::ivf_pq(10, 8, 2, false, MetricType::L2, 50);
let futures: Vec<_> = ["vector1", "vector1", "vector2"]
.iter()
.map(|col_name| {
let mut dataset = dataset.clone();
let params = params.clone();
tokio::spawn(async move {
dataset
.create_index(&[col_name], IndexType::Vector, None, ¶ms, true)
.await
})
})
.collect();
let results = join_all(futures).await;
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}
let dataset = dataset.checkout_version(1).await.unwrap();
assert!(dataset.load_indices().await.unwrap().is_empty());
let dataset = dataset.checkout_version(2).await.unwrap();
assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
let dataset = dataset.checkout_version(3).await.unwrap();
let indices = dataset.load_indices().await.unwrap();
assert!(!indices.is_empty() && indices.len() <= 2);
if indices.len() == 2 {
let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
fields.sort();
assert_eq!(fields, vec![0, 1]);
} else {
assert_eq!(indices[0].fields, vec![0]);
}
let dataset = dataset.checkout_version(4).await.unwrap();
let indices = dataset.load_indices().await.unwrap();
assert_eq!(indices.len(), 2);
let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
fields.sort();
assert_eq!(fields, vec![0, 1]);
}
#[tokio::test]
async fn test_concurrent_writes() {
for write_mode in [WriteMode::Append, WriteMode::Overwrite] {
let test_dir = tempfile::tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"i",
DataType::Int32,
false,
)]));
let dataset = Dataset::write(
RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
test_uri,
None,
)
.await
.unwrap();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let futures: Vec<_> = (0..5)
.map(|_| {
let batch = batch.clone();
let schema = schema.clone();
let uri = test_uri.to_string();
tokio::spawn(async move {
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
Dataset::write(
reader,
&uri,
Some(WriteParams {
mode: write_mode,
..Default::default()
}),
)
.await
})
})
.collect();
let results = join_all(futures).await;
for result in results {
assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
}
let dataset = dataset.checkout_version(6).await.unwrap();
match write_mode {
WriteMode::Append => {
assert_eq!(dataset.get_fragments().len(), 5);
}
WriteMode::Overwrite => {
assert_eq!(dataset.get_fragments().len(), 1);
}
_ => unreachable!(),
}
dataset.validate().await.unwrap()
}
}
}