mod clustering;
mod column_mapping;
mod ctas;
mod ict;
mod partitioned;
mod row_tracking;
mod timestamp_ntz;
mod variant;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::table_features::{
TableFeature, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use delta_kernel::table_properties::TableProperties;
use delta_kernel::transaction::create_table::{create_table, CreateTableTransaction};
use delta_kernel::DeltaResult;
use rstest::rstest;
use serde_json::Value;
use test_utils::{assert_result_error_with_message, test_table_setup};
pub(crate) fn simple_schema() -> DeltaResult<Arc<StructType>> {
Ok(Arc::new(StructType::try_new(vec![
StructField::new("id", DataType::INTEGER, true),
StructField::new("value", DataType::STRING, true),
])?))
}
pub(crate) fn partition_test_schema() -> DeltaResult<Arc<StructType>> {
Ok(Arc::new(StructType::try_new(vec![
StructField::new("id", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
StructField::new("value", DataType::STRING, true),
])?))
}
#[tokio::test]
async fn test_create_simple_table() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::new("event_id", DataType::LONG, true),
StructField::new("user_id", DataType::LONG, true),
StructField::new("event_type", DataType::STRING, true),
StructField::new("timestamp", DataType::TIMESTAMP, true),
StructField::new("properties", DataType::STRING, true),
])?);
let _ = create_table(&table_path, schema.clone(), "DeltaKernel-RS/0.17.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
assert_eq!(snapshot.schema().fields().len(), 5);
let protocol = snapshot.table_configuration().protocol();
assert_eq!(
protocol.min_reader_version(),
TABLE_FEATURES_MIN_READER_VERSION
);
assert_eq!(
protocol.min_writer_version(),
TABLE_FEATURES_MIN_WRITER_VERSION
);
assert!(protocol.reader_features().is_some_and(|f| f.is_empty()));
assert!(protocol.writer_features().is_some_and(|f| f.is_empty()));
assert_eq!(snapshot.table_properties(), &TableProperties::default());
let field_names: Vec<_> = snapshot
.schema()
.fields()
.map(|f| f.name().to_string())
.collect();
assert!(field_names.contains(&"event_id".to_string()));
assert!(field_names.contains(&"user_id".to_string()));
assert!(field_names.contains(&"event_type".to_string()));
assert!(field_names.contains(&"timestamp".to_string()));
assert!(field_names.contains(&"properties".to_string()));
Ok(())
}
#[tokio::test]
async fn test_create_table_with_user_domain_metadata() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = simple_schema()?;
let txn = create_table(&table_path, schema, "Test/1.0")
.with_table_properties([("delta.feature.domainMetadata", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
let domain = "app.settings";
let config = r#"{"version": 1, "enabled": true}"#;
let _ = txn
.with_domain_metadata(domain.to_string(), config.to_string())
.commit(engine.as_ref())?;
let table_url = delta_kernel::try_parse_uri(&table_path)?;
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
assert!(
snapshot
.table_configuration()
.is_feature_supported(&TableFeature::DomainMetadata),
"DomainMetadata feature should be enabled"
);
let retrieved_config = snapshot.get_domain_metadata(domain, engine.as_ref())?;
assert_eq!(
retrieved_config,
Some(config.to_string()),
"Domain metadata should be persisted and retrievable"
);
let parsed: Value = serde_json::from_str(retrieved_config.as_ref().unwrap())?;
assert_eq!(parsed["version"], 1);
assert_eq!(parsed["enabled"], true);
let missing = snapshot.get_domain_metadata("nonexistent.domain", engine.as_ref())?;
assert!(missing.is_none(), "Non-existent domain should return None");
Ok(())
}
#[tokio::test]
async fn test_create_table_already_exists() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::new("user_id", DataType::LONG, true),
StructField::new("username", DataType::STRING, true),
StructField::new("email", DataType::STRING, true),
StructField::new("created_at", DataType::TIMESTAMP, true),
StructField::new("is_active", DataType::BOOLEAN, true),
])?);
let _ = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
assert_result_error_with_message(result, "already exists");
Ok(())
}
#[tokio::test]
async fn test_create_table_empty_schema_not_supported() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![])?);
let result = create_table(&table_path, schema, "InvalidApp/0.1.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
assert_result_error_with_message(result, "cannot be empty");
Ok(())
}
fn top_level_non_null_schema() -> Arc<StructType> {
Arc::new(
StructType::try_new(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("value", DataType::STRING, true),
])
.expect("non-null top-level schema should be valid"),
)
}
fn nested_non_null_schema() -> Arc<StructType> {
let nested = StructType::try_new(vec![StructField::new("child", DataType::INTEGER, false)])
.expect("nested non-null schema should be valid");
Arc::new(
StructType::try_new(vec![StructField::new(
"nested",
DataType::Struct(Box::new(nested)),
true,
)])
.expect("top-level nested schema should be valid"),
)
}
#[rstest]
#[case::top_level_non_null(top_level_non_null_schema())]
#[case::nested_non_null(nested_non_null_schema())]
fn test_create_table_non_null_columns_require_invariants_feature(
#[case] schema: Arc<StructType>,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let result = create_table(&table_path, schema, "InvalidApp/0.1.0")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
assert_result_error_with_message(result, "Non-null column");
Ok(())
}
#[tokio::test]
async fn test_create_table_log_actions() -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::new("user_id", DataType::LONG, true),
StructField::new("action", DataType::STRING, true),
])?);
let engine_info = "AuditService/2.1.0";
let _ = create_table(&table_path, schema, engine_info)
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let log_file_path = format!("{table_path}/_delta_log/00000000000000000000.json");
let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file");
let actions: Vec<Value> = log_contents
.lines()
.map(|line| serde_json::from_str(line).expect("Failed to parse JSON"))
.collect();
assert_eq!(
actions.len(),
3,
"Expected 3 actions (commitInfo, protocol, metaData), found {}",
actions.len()
);
let commit_info_action = &actions[0];
assert!(
commit_info_action.get("commitInfo").is_some(),
"First action should be commitInfo"
);
let commit_info = commit_info_action.get("commitInfo").unwrap();
assert!(
commit_info.get("timestamp").is_some(),
"CommitInfo should have timestamp"
);
assert!(
commit_info.get("engineInfo").is_some(),
"CommitInfo should have engineInfo"
);
assert!(
commit_info.get("operation").is_some(),
"CommitInfo should have operation"
);
assert_eq!(
commit_info["operation"], "CREATE TABLE",
"Operation should be CREATE TABLE"
);
let protocol_action = &actions[1];
assert!(
protocol_action.get("protocol").is_some(),
"Second action should be protocol"
);
let protocol = protocol_action.get("protocol").unwrap();
assert_eq!(
protocol["minReaderVersion"],
TABLE_FEATURES_MIN_READER_VERSION
);
assert_eq!(
protocol["minWriterVersion"],
TABLE_FEATURES_MIN_WRITER_VERSION
);
let metadata_action = &actions[2];
assert!(
metadata_action.get("metaData").is_some(),
"Third action should be metaData"
);
let metadata = metadata_action.get("metaData").unwrap();
assert!(metadata.get("id").is_some(), "Metadata should have id");
assert!(
metadata.get("schemaString").is_some(),
"Metadata should have schemaString"
);
assert!(
metadata.get("createdTime").is_some(),
"Metadata should have createdTime"
);
assert_eq!(
commit_info["engineInfo"], engine_info,
"CommitInfo should contain the engine info we provided"
);
assert!(
commit_info.get("txnId").is_some(),
"CommitInfo should have txnId"
);
let kernel_version = commit_info.get("kernelVersion");
assert!(
kernel_version.is_some(),
"CommitInfo should have kernelVersion"
);
assert!(
kernel_version.unwrap().as_str().unwrap().starts_with("v"),
"Kernel version should start with 'v'"
);
Ok(())
}
fn create_test_create_table_txn() -> DeltaResult<(
Arc<impl delta_kernel::Engine>,
CreateTableTransaction,
tempfile::TempDir,
)> {
let (tempdir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(
StructType::try_new(vec![
StructField::nullable("id", DataType::INTEGER),
StructField::nullable("name", DataType::STRING),
])
.expect("valid schema"),
);
let txn = create_table(&table_path, schema, "test_engine")
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
Ok((engine, txn, tempdir))
}
#[tokio::test]
async fn test_create_table_txn_debug() -> DeltaResult<()> {
let (_engine, txn, _tempdir) = create_test_create_table_txn()?;
let debug_str = format!("{txn:?}");
assert!(
debug_str.contains("Transaction") && debug_str.contains("create_table"),
"Debug output should contain Transaction info: {debug_str}"
);
Ok(())
}
#[rstest]
#[case("vacuumProtocolCheck", TableFeature::VacuumProtocolCheck, true, true)]
#[case("v2Checkpoint", TableFeature::V2Checkpoint, true, true)]
#[case("deletionVectors", TableFeature::DeletionVectors, true, false)]
#[case("typeWidening", TableFeature::TypeWidening, true, false)]
#[case("appendOnly", TableFeature::AppendOnly, false, false)]
#[case("changeDataFeed", TableFeature::ChangeDataFeed, false, false)]
#[case("rowTracking", TableFeature::RowTracking, false, false)]
fn test_create_table_with_feature_signal(
#[case] feature_name: &str,
#[case] feature: TableFeature,
#[case] is_reader_writer: bool,
#[case] enabled_when_supported: bool,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let property_key = format!("delta.feature.{feature_name}");
let _ = create_table(&table_path, simple_schema()?, "Test/1.0")
.with_table_properties([(property_key.as_str(), "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let table_config = snapshot.table_configuration();
assert!(
table_config.is_feature_supported(&feature),
"{feature_name} should be supported"
);
assert_eq!(
table_config.is_feature_enabled(&feature),
enabled_when_supported,
"{feature_name}: is_feature_enabled should be {enabled_when_supported}"
);
let protocol = table_config.protocol();
assert!(
protocol
.writer_features()
.is_some_and(|f| f.contains(&feature)),
"{feature_name} should be in writer features"
);
if is_reader_writer {
assert!(
protocol
.reader_features()
.is_some_and(|f| f.contains(&feature)),
"{feature_name} should be in reader features"
);
}
Ok(())
}
#[rstest]
fn test_create_table_with_checkpoint_stats_properties(
#[values(true, false)] write_stats_as_json: bool,
#[values(true, false)] write_stats_as_struct: bool,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let json_val = write_stats_as_json.to_string();
let struct_val = write_stats_as_struct.to_string();
let _ = create_table(&table_path, simple_schema()?, "Test/1.0")
.with_table_properties([
("delta.checkpoint.writeStatsAsJson", json_val.as_str()),
("delta.checkpoint.writeStatsAsStruct", struct_val.as_str()),
])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let tp = snapshot.table_properties();
assert_eq!(tp.checkpoint_write_stats_as_json, Some(write_stats_as_json));
assert_eq!(
tp.checkpoint_write_stats_as_struct,
Some(write_stats_as_struct)
);
Ok(())
}
#[rstest]
#[case("delta.enableDeletionVectors", TableFeature::DeletionVectors, true)]
#[case("delta.enableTypeWidening", TableFeature::TypeWidening, true)]
#[case("delta.enableChangeDataFeed", TableFeature::ChangeDataFeed, false)]
#[case("delta.appendOnly", TableFeature::AppendOnly, false)]
#[case("delta.enableRowTracking", TableFeature::RowTracking, false)]
fn test_create_table_with_enablement_property(
#[case] property: &str,
#[case] feature: TableFeature,
#[case] is_reader_writer: bool,
#[values(true, false)] expect_enabled: bool,
) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let value = expect_enabled.to_string();
let _ = create_table(&table_path, simple_schema()?, "Test/1.0")
.with_table_properties([(property, value.as_str())])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let table_config = snapshot.table_configuration();
assert_eq!(
table_config.is_feature_supported(&feature),
expect_enabled,
"{property}={value}: feature supported should be {expect_enabled}"
);
assert_eq!(
table_config.is_feature_enabled(&feature),
expect_enabled,
"{property}={value}: feature enabled should be {expect_enabled}"
);
let protocol = table_config.protocol();
assert_eq!(
protocol
.writer_features()
.is_some_and(|f| f.contains(&feature)),
expect_enabled,
"{property}={value}: in writer features should be {expect_enabled}"
);
if is_reader_writer {
assert_eq!(
protocol
.reader_features()
.is_some_and(|f| f.contains(&feature)),
expect_enabled,
"{property}={value}: in reader features should be {expect_enabled}"
);
}
Ok(())
}
#[rstest]
#[case::without_cm(false)]
#[case::with_cm(true)]
fn test_create_table_special_char_column_name(#[case] cm_enabled: bool) -> DeltaResult<()> {
let (_temp_dir, table_path, engine) = test_table_setup()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::new("valid_col", DataType::INTEGER, true),
StructField::new("bad column", DataType::STRING, true),
])?);
let mut builder = create_table(&table_path, schema, "Test/1.0");
if cm_enabled {
builder = builder.with_table_properties([("delta.columnMapping.mode", "name")]);
}
let result = builder.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
if cm_enabled {
let txn = result?;
let _ = txn.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 0);
let field_names: Vec<_> = snapshot
.schema()
.fields()
.map(|f| f.name().clone())
.collect();
assert!(
field_names.contains(&"bad column".to_string()),
"Schema should contain field 'bad column', got: {field_names:?}"
);
} else {
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("invalid character"),
"Expected invalid character error, got: {err}"
);
}
Ok(())
}