use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{Int32Array, StringArray};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::row_tracking::RowTrackingDomainMetadata;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::table_features::{
TableFeature, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::transaction::data_layout::DataLayout;
use delta_kernel::DeltaResult;
use rstest::rstest;
use test_utils::{
get_materialized_row_tracking_column_names, get_row_tracking_add_actions, insert_data,
read_actions_from_commit, test_table_setup,
};
use url::Url;
fn assert_row_tracking_protocol(snapshot: &Snapshot) {
let protocol = snapshot.table_configuration().protocol();
assert!(protocol.min_reader_version() >= TABLE_FEATURES_MIN_READER_VERSION);
assert!(protocol.min_writer_version() >= TABLE_FEATURES_MIN_WRITER_VERSION);
assert!(
protocol
.writer_features()
.is_some_and(|f| f.contains(&TableFeature::RowTracking)),
"rowTracking should be in writer features"
);
assert!(
!protocol
.reader_features()
.is_some_and(|f| f.contains(&TableFeature::RowTracking)),
"rowTracking should NOT be in reader features"
);
assert!(
protocol
.writer_features()
.is_some_and(|f| f.contains(&TableFeature::DomainMetadata)),
"domainMetadata should be in writer features"
);
}
#[rstest]
#[tokio::test]
async fn test_create_table_with_row_tracking(
#[values(
("delta.enableRowTracking", "true"),
("delta.feature.rowTracking", "supported")
)]
activation: (&str, &str),
#[values(false, true)] with_data: bool,
) -> DeltaResult<()> {
let (key, value) = activation;
let expect_property_enabled = key == "delta.enableRowTracking";
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = super::simple_schema()?;
let mut txn = create_table(&table_path, schema.clone(), "Test/1.0")
.with_table_properties([(key, value)])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
if with_data {
let arrow_schema = Arc::new(schema.as_ref().try_into_arrow()?);
let batch = RecordBatch::try_new(
arrow_schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
],
)
.map_err(|e| delta_kernel::Error::generic(e.to_string()))?;
let write_context = Arc::new(txn.unpartitioned_write_context()?);
let add_files = engine
.write_parquet(&ArrowEngineData::new(batch), write_context.as_ref())
.await?;
txn.add_files(add_files);
}
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot = committed
.post_commit_snapshot()
.expect("should have snapshot");
assert_row_tracking_protocol(snapshot);
if !with_data {
let disk_snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_row_tracking_protocol(&disk_snapshot);
}
let table_url = Url::from_directory_path(&table_path).expect("valid path");
let disk_snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let expected_high_water_mark: i64 = if with_data { 4 } else { -1 };
assert_eq!(
RowTrackingDomainMetadata::get_high_water_mark(&disk_snapshot, engine.as_ref())?,
Some(expected_high_water_mark),
);
let add_actions = get_row_tracking_add_actions(&table_url, 0).expect("failed to read commit");
if with_data {
assert_eq!(add_actions.len(), 1, "Expected one add action");
assert_eq!(add_actions[0].base_row_id, Some(0), "baseRowId should be 0");
assert_eq!(
add_actions[0].default_row_commit_version,
Some(0),
"defaultRowCommitVersion should be 0"
);
} else {
assert!(
add_actions.is_empty(),
"Expected no add actions for empty create"
);
}
let col_names =
get_materialized_row_tracking_column_names(&table_url, 0).expect("failed to read commit");
if expect_property_enabled {
let row_id_col = col_names
.row_id_column_name
.as_deref()
.expect("materializedRowIdColumnName should be set");
assert!(row_id_col.starts_with("_row-id-col-"), "got {row_id_col}");
let commit_version_col = col_names
.row_commit_version_column_name
.as_deref()
.expect("materializedRowCommitVersionColumnName should be set");
assert!(
commit_version_col.starts_with("_row-commit-version-col-"),
"got {commit_version_col}"
);
} else {
assert!(
col_names.row_id_column_name.is_none(),
"materializedRowIdColumnName should NOT be set for feature-signal-only tables"
);
assert!(
col_names.row_commit_version_column_name.is_none(),
"materializedRowCommitVersionColumnName should NOT be set for feature-signal-only tables"
);
let metadata_actions =
read_actions_from_commit(&table_url, 0, "metaData").expect("failed to read commit");
assert!(
metadata_actions[0]
.get("configuration")
.and_then(|c| c.get("delta.enableRowTracking"))
.is_none(),
"delta.enableRowTracking should NOT be set for feature-signal-only tables"
);
}
Ok(())
}
#[tokio::test]
async fn test_create_table_with_multiple_files_and_row_tracking() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = super::simple_schema()?;
let mut txn = create_table(&table_path, schema.clone(), "Test/1.0")
.with_table_properties([("delta.enableRowTracking", "true")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
let arrow_schema: Arc<delta_kernel::arrow::datatypes::Schema> =
Arc::new(schema.as_ref().try_into_arrow()?);
let batch1 = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.map_err(|e| delta_kernel::Error::generic(e.to_string()))?;
let batch2 = RecordBatch::try_new(
arrow_schema,
vec![
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8])),
Arc::new(StringArray::from(vec!["d", "e", "f", "g", "h"])),
],
)
.map_err(|e| delta_kernel::Error::generic(e.to_string()))?;
let write_context = Arc::new(txn.unpartitioned_write_context()?);
let adds1 = engine
.write_parquet(&ArrowEngineData::new(batch1), write_context.as_ref())
.await?;
let adds2 = engine
.write_parquet(&ArrowEngineData::new(batch2), write_context.as_ref())
.await?;
txn.add_files(adds1);
txn.add_files(adds2);
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
assert_eq!(committed.commit_version(), 0);
let table_url = Url::from_directory_path(&table_path).expect("valid path");
let add_actions = get_row_tracking_add_actions(&table_url, 0).expect("failed to read commit");
assert_eq!(add_actions.len(), 2, "Expected two add actions");
assert_eq!(add_actions[0].base_row_id, Some(0));
assert_eq!(add_actions[0].default_row_commit_version, Some(0));
assert_eq!(add_actions[1].base_row_id, Some(3));
assert_eq!(add_actions[1].default_row_commit_version, Some(0));
let disk_snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_eq!(
RowTrackingDomainMetadata::get_high_water_mark(&disk_snapshot, engine.as_ref())?,
Some(7),
"HWM should be 7 for 8 total rows (3 + 5) starting from -1"
);
Ok(())
}
#[test]
fn test_create_table_with_row_tracking_and_clustering() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let committed = create_table(&table_path, super::simple_schema()?, "Test/1.0")
.with_table_properties([("delta.enableRowTracking", "true")])
.with_data_layout(DataLayout::clustered(["id"]))
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?
.unwrap_committed();
let snapshot = committed
.post_commit_snapshot()
.expect("should have snapshot");
let table_url = Url::from_directory_path(&table_path).expect("valid path");
let protocol = snapshot.table_configuration().protocol();
let writer_features = protocol
.writer_features()
.expect("should have writer features");
assert!(writer_features.contains(&TableFeature::RowTracking));
assert!(writer_features.contains(&TableFeature::ClusteredTable));
let dm_count = writer_features
.iter()
.filter(|f| **f == TableFeature::DomainMetadata)
.count();
assert_eq!(dm_count, 1, "DomainMetadata should not be duplicated");
let dm_actions =
read_actions_from_commit(&table_url, 0, "domainMetadata").expect("failed to read commit");
assert!(
dm_actions
.iter()
.any(|dm| dm["domain"] == "delta.rowTracking"),
"row tracking domain metadata should be present"
);
assert!(
dm_actions
.iter()
.any(|dm| dm["domain"] == "delta.clustering"),
"clustering domain metadata should be present"
);
Ok(())
}
#[tokio::test]
async fn test_create_table_with_row_tracking_and_clustering_and_data() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = super::simple_schema()?;
let mut txn = create_table(&table_path, schema.clone(), "Test/1.0")
.with_table_properties([("delta.enableRowTracking", "true")])
.with_data_layout(DataLayout::clustered(["id"]))
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
let arrow_schema = Arc::new(schema.as_ref().try_into_arrow()?);
let batch = RecordBatch::try_new(
arrow_schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
],
)
.map_err(|e| delta_kernel::Error::generic(e.to_string()))?;
let write_context = Arc::new(txn.unpartitioned_write_context()?);
let add_files = engine
.write_parquet(&ArrowEngineData::new(batch), write_context.as_ref())
.await?;
txn.add_files(add_files);
let committed = txn.commit(engine.as_ref())?.unwrap_committed();
let snapshot = committed
.post_commit_snapshot()
.expect("should have snapshot");
let table_url = Url::from_directory_path(&table_path).expect("valid path");
let writer_features = snapshot
.table_configuration()
.protocol()
.writer_features()
.expect("should have writer features");
assert!(writer_features.contains(&TableFeature::RowTracking));
assert!(writer_features.contains(&TableFeature::ClusteredTable));
assert_eq!(
writer_features
.iter()
.filter(|f| **f == TableFeature::DomainMetadata)
.count(),
1,
"DomainMetadata should appear exactly once"
);
let dm_actions =
read_actions_from_commit(&table_url, 0, "domainMetadata").expect("failed to read commit");
assert!(
dm_actions
.iter()
.any(|dm| dm["domain"] == "delta.rowTracking"),
"row tracking domain metadata should be present"
);
assert!(
dm_actions
.iter()
.any(|dm| dm["domain"] == "delta.clustering"),
"clustering domain metadata should be present"
);
let add_actions = get_row_tracking_add_actions(&table_url, 0).expect("failed to read commit");
assert_eq!(add_actions.len(), 1, "Expected one add action");
assert_eq!(add_actions[0].base_row_id, Some(0));
assert_eq!(add_actions[0].default_row_commit_version, Some(0));
let disk_snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_eq!(
RowTrackingDomainMetadata::get_high_water_mark(&disk_snapshot, engine.as_ref())?,
Some(4),
"5 rows -> high water mark = 4"
);
Ok(())
}
#[tokio::test]
async fn test_feature_signal_create_then_append_assigns_correct_base_row_id() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let _ = create_table(&table_path, super::simple_schema()?, "Test/1.0")
.with_table_properties([("delta.feature.rowTracking", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let table_url = Url::from_directory_path(&table_path).expect("valid path");
let v0_snapshot = Snapshot::builder_for(&table_path)
.at_version(0)
.build(engine.as_ref())?;
assert_eq!(
RowTrackingDomainMetadata::get_high_water_mark(&v0_snapshot, engine.as_ref())?,
Some(-1),
"Initial high water mark should be -1"
);
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let _ = insert_data(
snapshot,
&engine,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.await?;
let add_actions = get_row_tracking_add_actions(&table_url, 1).expect("failed to read commit");
assert_eq!(add_actions.len(), 1, "Expected one add action");
assert_eq!(
add_actions[0].base_row_id,
Some(0),
"First file after create should start at baseRowId 0 (high water mark was -1)"
);
assert_eq!(add_actions[0].default_row_commit_version, Some(1));
let v1_snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_eq!(
RowTrackingDomainMetadata::get_high_water_mark(&v1_snapshot, engine.as_ref())?,
Some(2),
"3 rows starting from 0 -> high water mark = 2"
);
Ok(())
}