use std::collections::HashMap;
use std::sync::Arc;
use lance_core::utils::mask::RowAddrTreeMap;
use lance_file::version::LanceFileVersion;
use lance_io::object_store::{ObjectStore, ObjectStoreParams};
use lance_table::{
format::{DataStorageFormat, is_detached_version},
io::commit::{CommitConfig, CommitHandler, ManifestNamingScheme},
};
use crate::{
Dataset, Error, Result,
dataset::{
ManifestWriteConfig, ReadParams,
builder::DatasetBuilder,
commit_detached_transaction, commit_new_dataset, commit_transaction,
refs::Refs,
transaction::{Operation, Transaction},
},
session::Session,
};
use super::{WriteDestination, resolve_commit_handler};
use crate::dataset::branch_location::BranchLocation;
use crate::dataset::transaction::validate_operation;
use lance_core::utils::tracing::{DATASET_COMMITTED_EVENT, TRACE_DATASET_EVENTS};
use tracing::info;
#[derive(Debug, Clone)]
pub struct CommitBuilder<'a> {
dest: WriteDestination<'a>,
use_stable_row_ids: Option<bool>,
enable_v2_manifest_paths: bool,
storage_format: Option<LanceFileVersion>,
commit_handler: Option<Arc<dyn CommitHandler>>,
store_params: Option<ObjectStoreParams>,
object_store: Option<Arc<ObjectStore>>,
session: Option<Arc<Session>>,
detached: bool,
commit_config: CommitConfig,
affected_rows: Option<RowAddrTreeMap>,
transaction_properties: Option<Arc<HashMap<String, String>>>,
}
impl<'a> CommitBuilder<'a> {
pub fn new(dest: impl Into<WriteDestination<'a>>) -> Self {
Self {
dest: dest.into(),
use_stable_row_ids: None,
enable_v2_manifest_paths: true,
storage_format: None,
commit_handler: None,
store_params: None,
object_store: None,
session: None,
detached: false,
commit_config: Default::default(),
affected_rows: None,
transaction_properties: None,
}
}
pub fn use_stable_row_ids(mut self, use_stable_row_ids: bool) -> Self {
self.use_stable_row_ids = Some(use_stable_row_ids);
self
}
pub fn with_storage_format(mut self, storage_format: LanceFileVersion) -> Self {
self.storage_format = Some(storage_format);
self
}
pub fn with_object_store(mut self, object_store: Arc<ObjectStore>) -> Self {
self.object_store = Some(object_store);
self
}
pub fn with_commit_handler(mut self, commit_handler: Arc<dyn CommitHandler>) -> Self {
self.commit_handler = Some(commit_handler);
self
}
pub fn with_store_params(mut self, store_params: ObjectStoreParams) -> Self {
self.store_params = Some(store_params);
self
}
pub fn with_session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}
pub fn enable_v2_manifest_paths(mut self, enable: bool) -> Self {
self.enable_v2_manifest_paths = enable;
self
}
pub fn with_detached(mut self, detached: bool) -> Self {
self.detached = detached;
self
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.commit_config.num_retries = max_retries;
self
}
pub fn with_skip_auto_cleanup(mut self, skip_auto_cleanup: bool) -> Self {
self.commit_config.skip_auto_cleanup = skip_auto_cleanup;
self
}
pub fn with_affected_rows(mut self, affected_rows: RowAddrTreeMap) -> Self {
self.affected_rows = Some(affected_rows);
self
}
pub fn with_transaction_properties(
mut self,
transaction_properties: HashMap<String, String>,
) -> Self {
self.transaction_properties = Some(Arc::new(transaction_properties));
self
}
pub async fn execute(self, transaction: Transaction) -> Result<Dataset> {
let session = self
.session
.or_else(|| self.dest.dataset().map(|ds| ds.session.clone()))
.unwrap_or_default();
let (object_store, base_path, commit_handler) = match &self.dest {
WriteDestination::Dataset(dataset) => (
dataset.object_store.clone(),
dataset.base.clone(),
dataset.commit_handler.clone(),
),
WriteDestination::Uri(uri) => {
let commit_handler = if let (Some(_), Some(commit_handler)) =
(&self.object_store, &self.commit_handler)
{
commit_handler.clone()
} else {
resolve_commit_handler(uri, self.commit_handler.clone(), &self.store_params)
.await?
};
let (object_store, base_path) = if let Some(passed_store) = self.object_store {
(
passed_store,
ObjectStore::extract_path_from_uri(session.store_registry(), uri)?,
)
} else {
ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&self.store_params.clone().unwrap_or_default(),
)
.await?
};
(object_store, base_path, commit_handler)
}
};
let dest = match &self.dest {
WriteDestination::Dataset(dataset) => WriteDestination::Dataset(dataset.clone()),
WriteDestination::Uri(uri) => {
let mut builder = DatasetBuilder::from_uri(uri)
.with_read_params(ReadParams {
store_options: self.store_params.clone(),
commit_handler: self.commit_handler.clone(),
..Default::default()
})
.with_session(session.clone());
if is_detached_version(transaction.read_version) {
builder = builder.with_version(transaction.read_version)
}
match builder.load().await {
Ok(dataset) => WriteDestination::Dataset(Arc::new(dataset)),
Err(Error::DatasetNotFound { .. } | Error::NotFound { .. }) => {
WriteDestination::Uri(uri)
}
Err(e) => return Err(e),
}
}
};
if dest.dataset().is_none()
&& !matches!(
transaction.operation,
Operation::Overwrite { .. } | Operation::Clone { .. }
)
{
return Err(Error::dataset_not_found(
base_path.to_string(),
"The dataset must already exist unless the operation is Overwrite".into(),
));
}
if let Some(dataset) = dest.dataset() {
validate_operation(Some(&dataset.manifest), &transaction.operation)?;
} else {
validate_operation(None, &transaction.operation)?;
}
let (metadata_cache, index_cache) = match &dest {
WriteDestination::Dataset(ds) => (ds.metadata_cache.clone(), ds.index_cache.clone()),
WriteDestination::Uri(uri) => (
Arc::new(session.metadata_cache.for_dataset(uri)),
Arc::new(session.index_cache.for_dataset(uri)),
),
};
let manifest_naming_scheme = if let Some(ds) = dest.dataset() {
ds.manifest_location.naming_scheme
} else if self.enable_v2_manifest_paths {
ManifestNamingScheme::V2
} else {
ManifestNamingScheme::V1
};
let use_stable_row_ids = if let Some(ds) = dest.dataset() {
ds.manifest.uses_stable_row_ids()
} else {
self.use_stable_row_ids.unwrap_or(false)
};
if let Some(ds) = dest.dataset()
&& let Some(storage_format) = self.storage_format
{
let passed_storage_format = DataStorageFormat::new(storage_format);
if ds.manifest.data_storage_format != passed_storage_format
&& !matches!(transaction.operation, Operation::Overwrite { .. })
{
return Err(Error::invalid_input_source(format!(
"Storage format mismatch. Existing dataset uses {:?}, but new data uses {:?}",
ds.manifest.data_storage_format,
passed_storage_format
).into()));
}
}
let manifest_config = ManifestWriteConfig {
use_stable_row_ids,
storage_format: self.storage_format.map(DataStorageFormat::new),
..Default::default()
};
let (manifest, manifest_location) = if let Some(dataset) = dest.dataset() {
if self.detached {
if matches!(manifest_naming_scheme, ManifestNamingScheme::V1) {
return Err(Error::not_supported_source(
"detached commits cannot be used with v1 manifest paths".into(),
));
}
commit_detached_transaction(
dataset,
object_store.as_ref(),
commit_handler.as_ref(),
&transaction,
&manifest_config,
&self.commit_config,
)
.await?
} else {
commit_transaction(
dataset,
object_store.as_ref(),
commit_handler.as_ref(),
&transaction,
&manifest_config,
&self.commit_config,
manifest_naming_scheme,
self.affected_rows.as_ref(),
)
.await?
}
} else if self.detached {
return Err(Error::not_supported_source(
"detached commits cannot currently be used to create new datasets".into(),
));
} else {
commit_new_dataset(
object_store.as_ref(),
commit_handler.as_ref(),
&base_path,
&transaction,
&manifest_config,
manifest_naming_scheme,
metadata_cache.as_ref(),
session.store_registry(),
)
.await?
};
info!(
target: TRACE_DATASET_EVENTS,
event=DATASET_COMMITTED_EVENT,
uri=dest.uri(),
read_version=transaction.read_version,
committed_version=manifest.version,
detached=self.detached,
operation=&transaction.operation.name()
);
let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
match &self.dest {
WriteDestination::Dataset(dataset) => Ok(Dataset {
manifest: Arc::new(manifest),
manifest_location,
session,
fragment_bitmap,
..dataset.as_ref().clone()
}),
WriteDestination::Uri(uri) => {
let refs = Refs::new(
object_store.clone(),
commit_handler.clone(),
BranchLocation {
path: base_path.clone(),
uri: uri.to_string(),
branch: manifest.branch.clone(),
},
);
Ok(Dataset {
object_store,
base: base_path,
uri: uri.to_string(),
manifest: Arc::new(manifest),
manifest_location,
session,
commit_handler,
refs,
index_cache,
fragment_bitmap,
metadata_cache,
file_reader_options: None,
store_params: self.store_params.clone().map(Box::new),
})
}
}
}
pub async fn execute_batch(self, transactions: Vec<Transaction>) -> Result<BatchCommitResult> {
if transactions.is_empty() {
return Err(Error::invalid_input_source(
"No transactions to commit".into(),
));
}
if transactions
.iter()
.any(|t| !matches!(t.operation, Operation::Append { .. }))
{
return Err(Error::not_supported_source(
"Only append transactions are supported in batch commits".into(),
));
}
let read_version = transactions.iter().map(|t| t.read_version).min().unwrap();
let merged = Transaction {
uuid: uuid::Uuid::new_v4().hyphenated().to_string(),
operation: Operation::Append {
fragments: transactions
.iter()
.flat_map(|t| match &t.operation {
Operation::Append { fragments } => fragments.clone(),
_ => unreachable!(),
})
.collect(),
},
read_version,
tag: None,
transaction_properties: None,
};
let dataset = self.execute(merged.clone()).await?;
Ok(BatchCommitResult { dataset, merged })
}
}
pub struct BatchCommitResult {
pub dataset: Dataset,
pub merged: Transaction,
}
#[cfg(test)]
mod tests {
use arrow::array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use lance_io::utils::CachedFileSize;
use lance_io::{assert_io_eq, assert_io_gt};
use lance_table::format::{DataFile, Fragment};
use std::time::Duration;
use object_store::throttle::ThrottleConfig;
use crate::utils::test::ThrottledStoreWrapper;
use crate::dataset::{InsertBuilder, WriteParams};
use super::*;
fn sample_fragment() -> Fragment {
let (major_version, minor_version) = LanceFileVersion::Stable.to_numbers();
Fragment {
id: 0,
files: vec![DataFile {
path: "file.lance".to_string(),
fields: vec![0],
column_indices: vec![0],
file_major_version: major_version,
file_minor_version: minor_version,
file_size_bytes: CachedFileSize::new(100),
base_id: None,
}],
deletion_file: None,
row_id_meta: None,
physical_rows: Some(10),
last_updated_at_version_meta: None,
created_at_version_meta: None,
}
}
fn sample_transaction(read_version: u64) -> Transaction {
Transaction {
uuid: uuid::Uuid::new_v4().hyphenated().to_string(),
operation: Operation::Append {
fragments: vec![sample_fragment()],
},
read_version,
tag: None,
transaction_properties: None,
}
}
#[tokio::test]
async fn test_reuse_session() {
let session = Arc::new(Session::default());
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10_i32))],
)
.unwrap();
let dataset = InsertBuilder::new("memory://test")
.with_params(&WriteParams {
session: Some(session.clone()),
enable_v2_manifest_paths: true,
..Default::default()
})
.execute(vec![batch])
.await
.unwrap();
let dataset = Arc::new(dataset);
let io_stats = dataset.object_store().io_stats_incremental();
assert_io_gt!(io_stats, read_iops, 0);
assert_io_gt!(io_stats, write_iops, 0);
for i in 0..5 {
let new_ds = CommitBuilder::new(dataset.clone())
.execute(sample_transaction(1))
.await
.unwrap();
assert_eq!(new_ds.manifest.version, i + 2);
let io_stats = dataset.object_store().io_stats_incremental();
assert_io_eq!(io_stats, read_iops, 1, "check latest version, i = {} ", i);
assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest, i = {}", i);
}
let new_ds = CommitBuilder::new("memory://test")
.with_session(dataset.session.clone())
.execute(sample_transaction(1))
.await
.unwrap();
assert_eq!(new_ds.manifest().version, 7);
let io_stats = dataset.object_store().io_stats_incremental();
assert_io_eq!(io_stats, read_iops, 5, "load dataset + check version");
assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest");
let new_session = Arc::new(Session::new(0, 0, session.store_registry()));
let new_ds = CommitBuilder::new("memory://test")
.with_session(new_session)
.execute(sample_transaction(1))
.await
.unwrap();
assert_eq!(new_ds.manifest().version, 8);
let io_stats = dataset.object_store().io_stats_incremental();
assert_io_gt!(io_stats, read_iops, 10);
assert_io_eq!(io_stats, write_iops, 2, "write txn + manifest");
}
#[tokio::test]
async fn test_commit_iops() {
let session = Arc::new(Session::default());
let write_params = WriteParams {
session: Some(session.clone()),
..Default::default()
};
let data = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)])),
vec![Arc::new(Int32Array::from(vec![0; 5]))],
)
.unwrap();
let dataset = InsertBuilder::new("memory://")
.with_params(&write_params)
.execute(vec![data])
.await
.unwrap();
dataset.object_store().io_stats_incremental(); let read_version = dataset.manifest().version;
let new_ds = CommitBuilder::new(Arc::new(dataset))
.execute(sample_transaction(read_version))
.await
.unwrap();
let io_stats = new_ds.object_store().io_stats_incremental();
assert_io_eq!(io_stats, read_iops, 1);
assert_io_eq!(io_stats, write_iops, 2);
assert_io_eq!(io_stats, num_stages, 3);
}
#[tokio::test]
#[rstest::rstest]
async fn test_commit_conflict_iops(#[values(true, false)] use_cache: bool) {
let cache_size = if use_cache { 1_000_000 } else { 0 };
let session = Arc::new(Session::new(0, cache_size, Default::default()));
let throttled = Arc::new(ThrottledStoreWrapper {
config: ThrottleConfig {
wait_list_per_call: Duration::from_millis(5),
wait_get_per_call: Duration::from_millis(5),
wait_put_per_call: Duration::from_millis(5),
..Default::default()
},
});
let write_params = WriteParams {
store_params: Some(ObjectStoreParams {
object_store_wrapper: Some(throttled),
..Default::default()
}),
session: Some(session.clone()),
..Default::default()
};
let data = RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![ArrowField::new(
"a",
DataType::Int32,
false,
)])),
vec![Arc::new(Int32Array::from(vec![0; 5]))],
)
.unwrap();
let mut dataset = InsertBuilder::new("memory://")
.with_params(&write_params)
.execute(vec![data])
.await
.unwrap();
let original_dataset = Arc::new(dataset.clone());
let num_other_txns = 3;
for _ in 0..num_other_txns {
dataset = CommitBuilder::new(original_dataset.clone())
.execute(sample_transaction(dataset.manifest().version))
.await
.unwrap();
}
dataset.object_store().io_stats_incremental();
let new_ds = CommitBuilder::new(original_dataset.clone())
.execute(sample_transaction(original_dataset.manifest().version))
.await
.unwrap();
let io_stats = new_ds.object_store().io_stats_incremental();
if use_cache {
assert_io_eq!(io_stats, read_iops, 1); assert_io_eq!(io_stats, num_stages, 3);
} else {
use lance_io::assert_io_lt;
assert_io_eq!(io_stats, read_iops, 1 + num_other_txns * 2);
assert_io_lt!(io_stats, num_stages, 6);
}
assert_io_eq!(io_stats, write_iops, 2); }
#[tokio::test]
async fn test_commit_batch() {
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::Int32,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10_i32))],
)
.unwrap();
let dataset = InsertBuilder::new("memory://test")
.execute(vec![batch])
.await
.unwrap();
let dataset = Arc::new(dataset);
let res = CommitBuilder::new(dataset.clone())
.execute_batch(vec![])
.await;
assert!(matches!(res, Err(Error::InvalidInput { .. })));
let update_transaction = Transaction {
uuid: uuid::Uuid::new_v4().hyphenated().to_string(),
operation: Operation::Update {
updated_fragments: vec![],
new_fragments: vec![],
removed_fragment_ids: vec![],
fields_modified: vec![],
merged_generations: Vec::new(),
fields_for_preserving_frag_bitmap: vec![],
update_mode: None,
inserted_rows_filter: None,
},
read_version: 1,
tag: None,
transaction_properties: None,
};
let res = CommitBuilder::new(dataset.clone())
.execute_batch(vec![update_transaction])
.await;
assert!(matches!(res, Err(Error::NotSupported { .. })));
let append1 = sample_transaction(1);
let append2 = sample_transaction(2);
let mut expected_fragments = vec![];
if let Operation::Append { fragments } = &append1.operation {
expected_fragments.extend(fragments.clone());
}
if let Operation::Append { fragments } = &append2.operation {
expected_fragments.extend(fragments.clone());
}
let res = CommitBuilder::new(dataset.clone())
.execute_batch(vec![append1.clone(), append2.clone()])
.await
.unwrap();
let transaction = res.merged;
assert!(
matches!(transaction.operation, Operation::Append { fragments } if fragments == expected_fragments)
);
assert_eq!(transaction.read_version, 1);
}
}