use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{Array, AsArray, Int32Array, Int64Array, StringArray};
use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::to_json_bytes;
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::{DynObjectStore, ObjectStoreExt};
use delta_kernel::schema::{DataType, MetadataColumnSpec, SchemaRef, StructField, StructType};
use delta_kernel::transaction::CommitResult;
use delta_kernel::{DeltaResult, Error, Snapshot};
use itertools::Itertools;
use serde_json::{Deserializer, Value};
use tempfile::{tempdir, TempDir};
use test_utils::{
create_default_engine_mt_executor, create_table, engine_store_setup, read_scan, test_read,
};
use url::Url;
async fn create_row_tracking_table(
tmp_dir: &TempDir,
table_name: &str,
schema: SchemaRef,
) -> DeltaResult<(
Url,
Arc<DefaultEngine<TokioBackgroundExecutor>>,
Arc<DynObjectStore>,
)> {
let tmp_test_dir_url = Url::from_directory_path(tmp_dir.path())
.map_err(|_| Error::generic("Failed to convert directory path to URL"))?;
let (store, engine, table_location) = engine_store_setup(table_name, Some(&tmp_test_dir_url));
let table_url = create_table(
store.clone(),
table_location,
schema,
&[], true, vec![], vec!["domainMetadata", "rowTracking"],
)
.await
.map_err(|e| Error::generic(format!("Failed to create table: {e}")))?;
Ok((table_url, Arc::new(engine), store))
}
async fn write_data_to_table(
table_url: &Url,
engine: Arc<DefaultEngine<TokioBackgroundExecutor>>,
data: Vec<ArrowEngineData>,
) -> DeltaResult<CommitResult> {
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let committer = Box::new(FileSystemCommitter::new());
let mut txn = snapshot
.transaction(committer, engine.as_ref())?
.with_data_change(true);
let write_context = Arc::new(txn.unpartitioned_write_context()?);
let tasks = data.into_iter().map(|data| {
let engine = engine.clone();
let write_context = write_context.clone();
tokio::task::spawn(async move { engine.write_parquet(&data, write_context.as_ref()).await })
});
let add_files_metadata = futures::future::join_all(tasks).await.into_iter().flatten();
for meta in add_files_metadata {
let metadata = meta?;
txn.add_files(metadata);
}
txn.commit(engine.as_ref())
}
async fn setup_number_table(
tmp_dir: &TempDir,
name: &str,
) -> DeltaResult<(
SchemaRef,
Url,
Arc<DefaultEngine<TokioBackgroundExecutor>>,
Arc<DynObjectStore>,
)> {
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);
let (table_url, engine, store) =
create_row_tracking_table(tmp_dir, name, schema.clone()).await?;
Ok((schema, table_url, engine, store))
}
fn int32_array(data: Vec<i32>) -> Arc<dyn Array> {
Arc::new(Int32Array::from(data))
}
fn int64_array(data: Vec<i64>) -> Arc<dyn Array> {
Arc::new(Int64Array::from(data))
}
fn string_array(data: Vec<String>) -> Arc<dyn Array> {
Arc::new(StringArray::from(data))
}
fn generate_data<I>(schema: SchemaRef, batches: I) -> DeltaResult<Vec<ArrowEngineData>>
where
I: IntoIterator<Item = Vec<Arc<dyn Array>>>,
{
let arrow_schema: Arc<ArrowSchema> = Arc::new(schema.as_ref().try_into_arrow()?);
batches
.into_iter()
.map(|batch_columns| -> DeltaResult<ArrowEngineData> {
let record_batch = RecordBatch::try_new(arrow_schema.clone(), batch_columns)?;
Ok(ArrowEngineData::new(record_batch))
})
.collect::<Result<Vec<_>, _>>()
}
async fn verify_row_tracking_in_commit(
store: &Arc<DynObjectStore>,
table_url: &Url,
commit_version: u64,
expected_base_row_ids: Vec<i64>,
expected_row_id_high_water_mark: i64,
) -> DeltaResult<()> {
let commit_url = table_url.join(&format!("_delta_log/{commit_version:020}.json"))?;
let commit = store.get(&Path::from_url_path(commit_url.path())?).await?;
let parsed_actions: Vec<_> = Deserializer::from_slice(&commit.bytes().await?)
.into_iter::<Value>()
.try_collect()?;
let (mut base_row_ids, default_commit_versions): (Vec<_>, Vec<_>) = parsed_actions
.iter()
.filter_map(|action| {
action.get("add").map(|add| {
let base_row_id = add
.get("baseRowId")
.cloned()
.expect("Add action should have baseRowId field")
.as_i64()
.expect("baseRowId should be an i64");
let default_commit_version = add
.get("defaultRowCommitVersion")
.cloned()
.expect("Add action should have defaultRowCommitVersion field")
.as_i64()
.expect("defaultRowCommitVersion should be an i64");
(base_row_id, default_commit_version)
})
})
.unzip();
base_row_ids.sort();
assert_eq!(base_row_ids, expected_base_row_ids);
assert_eq!(
default_commit_versions,
vec![commit_version as i64; default_commit_versions.len()]
);
let row_tracking_domain_config = parsed_actions
.iter()
.filter_map(|action| {
action.get("domainMetadata").and_then(|meta| {
let domain = meta
.get("domain")
.expect("Domain metadata must have a domain");
match domain.as_str() {
Some("delta.rowTracking") => Some(
meta.get("configuration")
.expect("Domain metadata must have a configuration")
.as_str()
.expect("Configuration should be a string"),
),
_ => None,
}
})
})
.collect::<Vec<_>>();
assert_eq!(
row_tracking_domain_config.len(),
1,
"There must be exactly one row tracking domain metadata action"
);
let row_id_high_water_mark = serde_json::from_str::<Value>(row_tracking_domain_config[0])?
.get("rowIdHighWaterMark")
.expect("rowIdHighWaterMark should be present")
.as_i64()
.expect("rowIdHighWaterMark should be an i64");
assert_eq!(
row_id_high_water_mark, expected_row_id_high_water_mark,
"rowIdHighWaterMark should match expected value"
);
Ok(())
}
#[tokio::test]
async fn test_row_tracking_append() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_append").await?;
let data = generate_data(
schema.clone(),
[
vec![int32_array(vec![1, 2, 3])],
vec![int32_array(vec![4, 5, 6])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0, 3], 5, )
.await?;
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]))],
)?),
&table_url,
engine,
)?;
Ok(())
}
#[tokio::test]
async fn test_row_tracking_single_record_batches() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_single_records").await?;
let data = generate_data(
schema.clone(),
[
vec![int32_array(vec![1])],
vec![int32_array(vec![2])],
vec![int32_array(vec![3])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0, 1, 2], 2, )
.await?;
Ok(())
}
#[tokio::test]
async fn test_row_tracking_large_batch() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_large_batch").await?;
let large_batch: Vec<i32> = (1..=1000).collect();
let data = generate_data(schema.clone(), [vec![int32_array(large_batch.clone())]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0], 999, )
.await?;
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(large_batch))],
)?),
&table_url,
engine,
)?;
Ok(())
}
#[tokio::test]
async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_consecutive_commits").await?;
let data_1 = generate_data(
schema.clone(),
[
vec![int32_array(vec![1, 2, 3])],
vec![int32_array(vec![4, 5, 6])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data_1)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0, 3], 5, )
.await?;
let data_2 = generate_data(schema.clone(), [vec![int32_array(vec![7, 8])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data_2)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
2, vec![6], 7, )
.await?;
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![7, 8, 1, 2, 3, 4, 5, 6]))],
)?),
&table_url,
engine,
)?;
Ok(())
}
#[tokio::test]
async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("id", DataType::LONG),
StructField::nullable("name", DataType::STRING),
])?);
let (table_url, engine, store) =
create_row_tracking_table(&tmp_test_dir, "test_three_transactions", schema.clone()).await?;
let data_1 = generate_data(
schema.clone(),
[
vec![int64_array(vec![1]), string_array(vec!["a".to_string()])],
vec![
int64_array(vec![2, 3, 4]),
string_array(vec!["b".to_string(), "c".to_string(), "d".to_string()]),
],
vec![
int64_array(vec![5, 6]),
string_array(vec!["e".to_string(), "f".to_string()]),
],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data_1)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0, 1, 4], 5, )
.await?;
let data_2 = generate_data(
schema.clone(),
[vec![
int64_array(vec![7, 8]),
string_array(vec!["g".to_string(), "h".to_string()]),
]],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data_2)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
2, vec![6], 7, )
.await?;
let data_3 = generate_data(
schema.clone(),
[
vec![
int64_array(vec![9, 10]),
string_array(vec!["i".to_string(), "j".to_string()]),
],
vec![
int64_array(vec![11, 12]),
string_array(vec!["k".to_string(), "l".to_string()]),
],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data_3)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
3, vec![8, 10], 11, )
.await?;
Ok(())
}
#[tokio::test]
async fn test_row_tracking_with_regular_and_empty_adds() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_append").await?;
let data = generate_data(
schema.clone(),
[
vec![int32_array(vec![1, 2, 3])],
vec![int32_array(Vec::<i32>::new())],
vec![int32_array(vec![4, 5, 6])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0, 3, 3], 5, )
.await?;
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]))],
)?),
&table_url,
engine,
)?;
Ok(())
}
#[tokio::test]
async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_append").await?;
let data = generate_data(
schema.clone(),
[
vec![int32_array(Vec::<i32>::new())],
vec![int32_array(Vec::<i32>::new())],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0, 0], -1, )
.await?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let batches = read_scan(&scan, engine)?;
assert!(batches.is_empty(), "Table should be empty");
Ok(())
}
#[tokio::test]
async fn test_row_tracking_without_adds() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (_schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_consecutive_commits").await?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?;
assert!(txn.commit(engine.as_ref())?.is_committed());
let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?;
let commit = store.get(&Path::from_url_path(commit_url.path())?).await?;
let parsed_actions: Vec<_> = Deserializer::from_slice(&commit.bytes().await?)
.into_iter::<Value>()
.try_collect()?;
assert_eq!(parsed_actions.len(), 1, "Expected only one action");
assert!(parsed_actions[0].get("commitInfo").is_some());
Ok(())
}
#[tokio::test]
async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_test_dir, "test_parallel_row_tracking").await?;
let engine1 = engine.clone();
let engine2 = engine;
let snapshot1 = Snapshot::builder_for(table_url.clone()).build(engine1.as_ref())?;
let snapshot2 = Snapshot::builder_for(table_url.clone()).build(engine2.as_ref())?;
let mut txn1 = snapshot1
.transaction(Box::new(FileSystemCommitter::new()), engine1.as_ref())?
.with_engine_info("transaction 1")
.with_data_change(true);
let mut txn2 = snapshot2
.transaction(Box::new(FileSystemCommitter::new()), engine2.as_ref())?
.with_engine_info("transaction 2")
.with_data_change(true);
let data1 = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;
let data2 = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![4, 5]))],
)?;
let write_context1 = Arc::new(txn1.unpartitioned_write_context()?);
let write_context2 = Arc::new(txn2.unpartitioned_write_context()?);
let metadata1 = engine1
.write_parquet(&ArrowEngineData::new(data1), write_context1.as_ref())
.await?;
let metadata2 = engine2
.write_parquet(&ArrowEngineData::new(data2), write_context2.as_ref())
.await?;
txn1.add_files(metadata1);
txn2.add_files(metadata2);
let result1 = txn1.commit(engine1.as_ref())?;
match result1 {
CommitResult::CommittedTransaction(committed) => {
assert_eq!(
committed.commit_version(),
1,
"First transaction should commit at version 1"
);
}
CommitResult::ConflictedTransaction(conflicted) => {
panic!(
"First transaction should not conflict, got conflict at version {}",
conflicted.conflict_version()
);
}
CommitResult::RetryableTransaction(_) => {
panic!("First transaction should not be retryable error");
}
}
let result2 = txn2.commit(engine2.as_ref())?;
match result2 {
CommitResult::CommittedTransaction(committed) => {
panic!(
"Second transaction should conflict, but got committed at version {}",
committed.commit_version()
);
}
CommitResult::ConflictedTransaction(conflicted) => {
assert_eq!(
conflicted.conflict_version(),
1,
"Conflict should be at version 1"
);
}
CommitResult::RetryableTransaction(_) => {
panic!("Second transaction should not be retryable error");
}
}
verify_row_tracking_in_commit(
&store,
&table_url,
1, vec![0], 2, )
.await?;
test_read(
&ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], )?),
&table_url,
engine1,
)?;
Ok(())
}
#[tokio::test]
async fn test_no_row_tracking_fields_without_feature() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_test_dir = tempdir()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);
let tmp_test_dir_url = Url::from_directory_path(tmp_test_dir.path())
.map_err(|_| Error::generic("Failed to convert directory path to URL"))?;
let (store, engine, table_location) =
engine_store_setup("test_no_row_tracking", Some(&tmp_test_dir_url));
let table_url = create_table(
store.clone(),
table_location,
schema.clone(),
&[],
true,
vec![], vec![], )
.await
.map_err(|e| Error::generic(format!("Failed to create table: {e}")))?;
let engine = Arc::new(engine);
let data = generate_data(
schema.clone(),
[
vec![int32_array(vec![1, 2, 3])],
vec![int32_array(vec![4, 5, 6])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?;
let commit = store.get(&Path::from_url_path(commit_url.path())?).await?;
let parsed_actions: Vec<_> = Deserializer::from_slice(&commit.bytes().await?)
.into_iter::<Value>()
.try_collect()?;
let add_actions: Vec<_> = parsed_actions
.iter()
.filter_map(|action| action.get("add"))
.collect();
assert!(!add_actions.is_empty(), "Expected at least one add action");
for add_action in add_actions {
assert!(
add_action.get("baseRowId").is_none(),
"Add action should not have baseRowId field when row tracking is disabled"
);
assert!(
add_action.get("defaultRowCommitVersion").is_none(),
"Add action should not have defaultRowCommitVersion field when row tracking is disabled"
);
}
let row_tracking_domain_metadata: Vec<_> = parsed_actions
.iter()
.filter_map(|action| {
action.get("domainMetadata").and_then(|meta| {
let domain = meta.get("domain")?;
match domain.as_str() {
Some("delta.rowTracking") => Some(meta),
_ => None,
}
})
})
.collect();
assert!(
row_tracking_domain_metadata.is_empty(),
"Should not have any row tracking domain metadata when row tracking is disabled"
);
Ok(())
}
fn read_row_id_scan(
snapshot: Arc<Snapshot>,
engine: Arc<dyn delta_kernel::Engine>,
) -> DeltaResult<Vec<RecordBatch>> {
let scan_schema = Arc::new(
snapshot
.schema()
.add_metadata_column("row_id", MetadataColumnSpec::RowId)?,
);
let scan = snapshot.scan_builder().with_schema(scan_schema).build()?;
read_scan(&scan, engine)
}
fn collect_row_ids(batches: &[RecordBatch]) -> Vec<i64> {
batches
.iter()
.flat_map(|b| {
b.column_by_name("row_id")
.expect("row_id column not found in batch")
.as_primitive::<Int64Type>()
.values()
.to_vec()
})
.collect()
}
#[tokio::test]
async fn test_read_row_ids_basic() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let (schema, table_url, engine, _store) =
setup_number_table(&tmp_dir, "test_read_row_ids_basic").await?;
let data = generate_data(schema.clone(), [vec![int32_array(vec![10, 20, 30])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let batches = read_row_id_scan(snapshot, engine)?;
let mut row_ids = collect_row_ids(&batches);
row_ids.sort_unstable();
assert_eq!(row_ids, vec![0, 1, 2], "Row IDs must be sequential from 0");
Ok(())
}
#[tokio::test]
async fn test_read_row_ids_multiple_files_one_commit() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let (schema, table_url, engine, _store) =
setup_number_table(&tmp_dir, "test_read_row_ids_multiple_files").await?;
let data = generate_data(
schema.clone(),
[
vec![int32_array(vec![1, 2, 3])],
vec![int32_array(vec![4, 5, 6, 7])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let batches = read_row_id_scan(snapshot, engine)?;
let mut all_ids = collect_row_ids(&batches);
all_ids.sort_unstable();
assert_eq!(
all_ids,
(0i64..7).collect::<Vec<_>>(),
"Row IDs must be non-overlapping across files"
);
for batch in &batches {
let ids: Vec<i64> = batch
.column_by_name("row_id")
.expect("row_id column not found")
.as_primitive::<Int64Type>()
.values()
.to_vec();
let min = *ids.iter().min().unwrap();
let expected = (min..min + ids.len() as i64).collect::<Vec<_>>();
assert_eq!(ids, expected, "IDs within a file must be contiguous");
}
Ok(())
}
#[tokio::test]
async fn test_read_row_ids_multiple_commits() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let (schema, table_url, engine, _store) =
setup_number_table(&tmp_dir, "test_read_row_ids_multiple_commits").await?;
let data1 = generate_data(schema.clone(), [vec![int32_array(vec![1, 2, 3])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data1)
.await?
.is_committed());
let data2 = generate_data(schema.clone(), [vec![int32_array(vec![4, 5])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data2)
.await?
.is_committed());
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let batches = read_row_id_scan(snapshot, engine)?;
let mut all_ids = collect_row_ids(&batches);
all_ids.sort_unstable();
assert_eq!(
all_ids,
vec![0, 1, 2, 3, 4],
"Row IDs must be globally unique and monotonically increasing across commits"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_read_row_ids_after_checkpoint() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let (schema, table_url, engine, _store) =
setup_number_table(&tmp_dir, "test_read_row_ids_after_checkpoint").await?;
let data = generate_data(schema.clone(), [vec![int32_array(vec![1, 2, 3])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
let mt_engine = create_default_engine_mt_executor(&table_url)?;
let snapshot = Snapshot::builder_for(table_url.clone()).build(mt_engine.as_ref())?;
snapshot.checkpoint(mt_engine.as_ref())?;
let fresh_snapshot = Snapshot::builder_for(table_url.clone()).build(mt_engine.as_ref())?;
let batches = read_row_id_scan(fresh_snapshot, mt_engine.clone())?;
let mut ids_after_ckpt = collect_row_ids(&batches);
ids_after_ckpt.sort_unstable();
assert_eq!(
ids_after_ckpt,
vec![0, 1, 2],
"Row IDs must be unchanged after loading from checkpoint"
);
let data2 = generate_data(schema.clone(), [vec![int32_array(vec![4, 5])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data2)
.await?
.is_committed());
let snapshot2 = Snapshot::builder_for(table_url.clone()).build(mt_engine.as_ref())?;
let batches2 = read_row_id_scan(snapshot2, mt_engine)?;
let mut all_ids = collect_row_ids(&batches2);
all_ids.sort_unstable();
assert_eq!(
all_ids,
vec![0, 1, 2, 3, 4],
"Row IDs must continue from the high watermark after checkpoint with no resets or duplicates"
);
Ok(())
}
#[tokio::test]
async fn test_read_row_ids_coexist_with_row_index() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let (schema, table_url, engine, _store) =
setup_number_table(&tmp_dir, "test_read_row_ids_coexist_with_row_index").await?;
let data = generate_data(
schema.clone(),
[
vec![int32_array(vec![1, 2, 3])],
vec![int32_array(vec![4, 5])],
],
)?;
assert!(write_data_to_table(&table_url, engine.clone(), data)
.await?
.is_committed());
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan_schema = Arc::new(
snapshot
.schema()
.add_metadata_column("row_id", MetadataColumnSpec::RowId)?
.add_metadata_column("row_index", MetadataColumnSpec::RowIndex)?,
);
let scan = snapshot.scan_builder().with_schema(scan_schema).build()?;
let batches = read_scan(&scan, engine)?;
for batch in &batches {
assert_eq!(
batch.num_columns(),
3,
"Expected number | row_id | row_index"
);
let row_ids: Vec<i64> = batch
.column_by_name("row_id")
.expect("row_id column not found")
.as_primitive::<Int64Type>()
.values()
.to_vec();
let row_indexes: Vec<i64> = batch
.column_by_name("row_index")
.expect("row_index column not found")
.as_primitive::<Int64Type>()
.values()
.to_vec();
let n = batch.num_rows() as i64;
assert_eq!(
row_indexes,
(0..n).collect::<Vec<_>>(),
"Row index must reset to 0 for each file"
);
let base = *row_ids.iter().min().unwrap();
assert_eq!(
row_ids,
(base..base + n).collect::<Vec<_>>(),
"Row IDs within a file must equal baseRowId + row_index"
);
}
let mut all_ids = collect_row_ids(&batches);
all_ids.sort_unstable();
assert_eq!(
all_ids,
vec![0, 1, 2, 3, 4],
"Row IDs must be globally unique when coexisting with row index"
);
Ok(())
}
#[tokio::test]
#[ignore = "log compaction is not yet supported, tracked in #2337"]
async fn test_read_row_ids_after_log_compaction() -> DeltaResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let tmp_dir = tempdir()?;
let (schema, table_url, engine, store) =
setup_number_table(&tmp_dir, "test_read_row_ids_after_log_compaction").await?;
let data1 = generate_data(schema.clone(), [vec![int32_array(vec![1, 2, 3])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data1)
.await?
.is_committed());
let data2 = generate_data(schema.clone(), [vec![int32_array(vec![4, 5])]])?;
assert!(write_data_to_table(&table_url, engine.clone(), data2)
.await?
.is_committed());
let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let mut writer = snapshot.log_compaction_writer(0, 2)?;
let compaction_path = writer.compaction_path().clone();
let batches = writer
.compaction_data(engine.as_ref())?
.collect::<DeltaResult<Vec<_>>>()?;
let json_bytes = to_json_bytes(batches.into_iter().map(Ok))?;
store
.put(
&Path::from_url_path(compaction_path.path())?,
json_bytes.into(),
)
.await
.map_err(|e| Error::generic(e.to_string()))?;
let fresh_snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let scan_batches = read_row_id_scan(fresh_snapshot, engine)?;
let mut all_ids = collect_row_ids(&scan_batches);
all_ids.sort_unstable();
assert_eq!(
all_ids,
vec![0, 1, 2, 3, 4],
"Row IDs must be preserved and correct after log compaction"
);
Ok(())
}