use std::collections::HashMap;
use std::sync::Arc;
use chrono::{NaiveDate, NaiveDateTime, TimeZone, Utc};
use delta_kernel::actions::{MAX_VALUES, MIN_VALUES, NULL_COUNT};
use delta_kernel::arrow::array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
};
use delta_kernel::arrow::datatypes::Schema as ArrowSchema;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::Scalar;
use delta_kernel::schema::{DataType, StructField, StructType};
use delta_kernel::table_features::ColumnMappingMode;
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::transaction::data_layout::DataLayout;
use delta_kernel::Snapshot;
use rstest::rstest;
use test_utils::{
begin_transaction, get_column, read_scan, test_table_setup_mt, write_batch_to_table,
};
use url::Url;
use crate::common::read_utils::read_parquet_file;
#[rstest]
#[case::cm_none(ColumnMappingMode::None)]
#[case::cm_name(ColumnMappingMode::Name)]
#[case::cm_id(ColumnMappingMode::Id)]
#[tokio::test(flavor = "multi_thread")]
async fn test_write_partitioned_normal_values_roundtrip(
#[case] cm_mode: ColumnMappingMode,
#[values(true, false)] write_partition_values_parsed: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, snapshot, engine) = setup_and_write(
all_types_schema(),
PARTITION_COLS,
cm_mode,
write_partition_values_parsed,
normal_arrow_columns(),
normal_partition_values()?,
)
.await?;
assert_eq!(snapshot.table_configuration().partition_columns().len(), 13);
let (add, rel_path) = read_single_add(&table_path, 1)?;
match cm_mode {
ColumnMappingMode::None => {
let ntz_segment = if cfg!(target_os = "windows") {
"p_timestamp_ntz=2025-03-31%252015%253A30%253A00.123456/"
} else {
"p_timestamp_ntz=2025-03-31%2015%253A30%253A00.123456/"
};
let expected_prefix = format!(
"p_string=hello/p_int=42/p_long=9876543210/p_short=7/\
p_byte=3/p_float=1.25/p_double=99.99/p_boolean=true/p_date=2025-03-31/\
p_timestamp=2025-03-31T15%253A30%253A00.123456Z/p_decimal=123.45/\
p_binary=Hello/{ntz_segment}"
);
assert!(
rel_path.starts_with(&expected_prefix),
"CM off: relative path mismatch.\n \
expected: {expected_prefix}<uuid>.parquet\n got: {rel_path}"
);
assert!(rel_path.ends_with(".parquet"));
}
ColumnMappingMode::Name | ColumnMappingMode::Id => {
assert_cm_path(&rel_path);
}
}
let pv = add["partitionValues"].as_object().unwrap();
match cm_mode {
ColumnMappingMode::None => {
for (key, val) in EXPECTED_NORMAL_PVS {
assert_eq!(
pv.get(*key).and_then(|v| v.as_str()),
Some(*val),
"partitionValues[{key}] mismatch"
);
}
}
ColumnMappingMode::Name | ColumnMappingMode::Id => {
let logical_schema = snapshot.schema();
for (logical_key, expected_val) in EXPECTED_NORMAL_PVS {
let field = logical_schema.field(logical_key).unwrap();
let physical_key = field.physical_name(cm_mode);
assert_eq!(
pv.get(physical_key).and_then(|v| v.as_str()),
Some(*expected_val),
"partitionValues[{physical_key}] (logical: {logical_key}) mismatch"
);
}
}
}
verify_and_checkpoint(&snapshot, engine, assert_normal_values)?;
Ok(())
}
#[rstest]
#[case::cm_none(ColumnMappingMode::None)]
#[case::cm_name(ColumnMappingMode::Name)]
#[case::cm_id(ColumnMappingMode::Id)]
#[tokio::test(flavor = "multi_thread")]
async fn test_write_partitioned_null_values_roundtrip(
#[case] cm_mode: ColumnMappingMode,
#[values(true, false)] write_partition_values_parsed: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, snapshot, engine) = setup_and_write(
all_types_schema(),
PARTITION_COLS,
cm_mode,
write_partition_values_parsed,
null_arrow_columns(),
null_partition_values()?,
)
.await?;
let (add, rel_path) = read_single_add(&table_path, 1)?;
match cm_mode {
ColumnMappingMode::None => {
let expected_prefix = hive_prefix(PARTITION_COLS, "__HIVE_DEFAULT_PARTITION__");
assert!(
rel_path.starts_with(&expected_prefix),
"CM off null: relative path mismatch.\n \
expected: {expected_prefix}<uuid>.parquet\n got: {rel_path}"
);
}
ColumnMappingMode::Name | ColumnMappingMode::Id => {
assert_cm_path(&rel_path);
}
}
let pv = add["partitionValues"].as_object().unwrap();
assert_eq!(pv.len(), PARTITION_COLS.len());
for val in pv.values() {
assert!(
val.is_null(),
"all partition values should be null, got: {val}"
);
}
verify_and_checkpoint(&snapshot, engine, assert_all_partition_columns_null)?;
Ok(())
}
macro_rules! platform_path {
($win:literal, $non_win:literal) => {
if cfg!(target_os = "windows") {
$win
} else {
$non_win
}
};
}
#[rstest]
#[case::percent("a%b", "p=a%2525b/")]
#[case::quote("a\"b", "p=a%2522b/")]
#[case::hash("a#b", "p=a%2523b/")]
#[case::question("a?b", "p=a%253Fb/")]
#[case::backslash("a\\b", "p=a%255Cb/")]
#[case::caret("a^b", "p=a%255Eb/")]
#[case::left_brace("a{b", "p=a%257Bb/")]
#[case::left_bracket("a[b", "p=a%255Bb/")]
#[case::right_bracket("a]b", "p=a%255Db/")]
#[case::colon("a:b", "p=a%253Ab/")]
#[case::slash("a/b", "p=a%252Fb/")]
#[case::equals("a=b", "p=a%253Db/")]
#[case::apostrophe("a'b", "p=a%2527b/")]
#[case::asterisk("a*b", "p=a%252Ab/")]
#[case::slash_percent("Serbia/srb%", "p=Serbia%252Fsrb%2525/")]
#[case::backtick("a`b", "p=a%60b/")]
#[case::right_brace("a}b", "p=a%7Db/")]
#[case::space("a b", platform_path!("p=a%2520b/", "p=a%20b/"))]
#[case::less_than("a<b", platform_path!("p=a%253Cb/", "p=a%3Cb/"))]
#[case::greater_than("a>b", platform_path!("p=a%253Eb/", "p=a%3Eb/"))]
#[case::pipe("a|b", platform_path!("p=a%257Cb/", "p=a%7Cb/"))]
#[case::multi_space("a b", platform_path!("p=a%2520%2520%2520b/", "p=a%20%20%20b/"))]
#[case::null_byte("a\0b", "p=a%2500b/")]
#[case::tab("a\tb", "p=a%2509b/")]
#[case::newline("a\nb", "p=a%250Ab/")]
#[case::del("a\x7Fb", "p=a%257Fb/")]
#[tokio::test(flavor = "multi_thread")]
async fn test_write_partitioned_path_encodes_special_chars(
#[case] value: &str,
#[case] expected_path_prefix: &str,
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(
StructType::try_new(vec![
StructField::nullable("value", DataType::INTEGER),
StructField::nullable("p", DataType::STRING),
])
.unwrap(),
);
let (_tmp_dir, table_path, snapshot, engine) = setup_and_write(
schema,
&["p"],
cm_mode,
true, vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec![value])) as ArrayRef,
],
HashMap::from([("p".to_string(), Scalar::String(value.into()))]),
)
.await?;
let (add, rel_path) = read_single_add(&table_path, 1)?;
let logical_schema = snapshot.schema();
let physical_key = logical_schema.field("p").unwrap().physical_name(cm_mode);
let pv = add["partitionValues"].as_object().unwrap();
assert_eq!(
pv.get(physical_key).and_then(|v| v.as_str()),
Some(value),
"partitionValues[{physical_key}] mismatch"
);
match cm_mode {
ColumnMappingMode::None => {
assert!(
rel_path.starts_with(expected_path_prefix),
"CM off: expected path to start with {expected_path_prefix:?}, got {rel_path:?}"
);
assert!(rel_path.ends_with(".parquet"));
}
ColumnMappingMode::Name | ColumnMappingMode::Id => {
assert_cm_path(&rel_path);
}
}
verify_and_checkpoint(&snapshot, engine, |sorted| {
assert_eq!(sorted.num_rows(), 1);
assert_eq!(
sorted
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(0),
1,
"value column mismatch"
);
assert_eq!(
sorted
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0),
value,
"partition column `p` mismatch"
);
})?;
Ok(())
}
fn all_types_schema() -> Arc<StructType> {
Arc::new(
StructType::try_new(vec![
StructField::nullable("value", DataType::INTEGER),
StructField::nullable("p_string", DataType::STRING),
StructField::nullable("p_int", DataType::INTEGER),
StructField::nullable("p_long", DataType::LONG),
StructField::nullable("p_short", DataType::SHORT),
StructField::nullable("p_byte", DataType::BYTE),
StructField::nullable("p_float", DataType::FLOAT),
StructField::nullable("p_double", DataType::DOUBLE),
StructField::nullable("p_boolean", DataType::BOOLEAN),
StructField::nullable("p_date", DataType::DATE),
StructField::nullable("p_timestamp", DataType::TIMESTAMP),
StructField::nullable("p_decimal", DataType::decimal(10, 2).unwrap()),
StructField::nullable("p_binary", DataType::BINARY),
StructField::nullable("p_timestamp_ntz", DataType::TIMESTAMP_NTZ),
])
.unwrap(),
)
}
const PARTITION_COLS: &[&str] = &[
"p_string",
"p_int",
"p_long",
"p_short",
"p_byte",
"p_float",
"p_double",
"p_boolean",
"p_date",
"p_timestamp",
"p_decimal",
"p_binary",
"p_timestamp_ntz",
];
fn normal_arrow_columns() -> Vec<ArrayRef> {
let ts = ts_to_micros("2025-03-31 15:30:00.123456");
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec!["hello"])),
Arc::new(Int32Array::from(vec![42])),
Arc::new(Int64Array::from(vec![9_876_543_210i64])),
Arc::new(Int16Array::from(vec![7i16])),
Arc::new(Int8Array::from(vec![3i8])),
Arc::new(Float32Array::from(vec![1.25f32])),
Arc::new(Float64Array::from(vec![99.99f64])),
Arc::new(BooleanArray::from(vec![true])),
Arc::new(Date32Array::from(vec![date_to_days("2025-03-31")])),
ts_array(ts),
decimal_array(12345, 10, 2),
Arc::new(BinaryArray::from_vec(vec![b"Hello"])),
ts_ntz_array(ts),
]
}
fn normal_partition_values() -> Result<HashMap<String, Scalar>, Box<dyn std::error::Error>> {
let ts = ts_to_micros("2025-03-31 15:30:00.123456");
Ok(HashMap::from([
("p_string".into(), Scalar::String("hello".into())),
("p_int".into(), Scalar::Integer(42)),
("p_long".into(), Scalar::Long(9_876_543_210)),
("p_short".into(), Scalar::Short(7)),
("p_byte".into(), Scalar::Byte(3)),
("p_float".into(), Scalar::Float(1.25)),
("p_double".into(), Scalar::Double(99.99)),
("p_boolean".into(), Scalar::Boolean(true)),
("p_date".into(), Scalar::Date(date_to_days("2025-03-31"))),
("p_timestamp".into(), Scalar::Timestamp(ts)),
("p_decimal".into(), Scalar::decimal(12345, 10, 2)?),
("p_binary".into(), Scalar::Binary(b"Hello".to_vec())),
("p_timestamp_ntz".into(), Scalar::TimestampNtz(ts)),
]))
}
const EXPECTED_NORMAL_PVS: &[(&str, &str)] = &[
("p_string", "hello"),
("p_int", "42"),
("p_long", "9876543210"),
("p_short", "7"),
("p_byte", "3"),
("p_float", "1.25"),
("p_double", "99.99"),
("p_boolean", "true"),
("p_date", "2025-03-31"),
("p_timestamp", "2025-03-31T15:30:00.123456Z"),
("p_decimal", "123.45"),
("p_binary", "Hello"),
("p_timestamp_ntz", "2025-03-31 15:30:00.123456"),
];
fn null_arrow_columns() -> Vec<ArrayRef> {
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec![None::<&str>])),
Arc::new(Int32Array::from(vec![None::<i32>])),
Arc::new(Int64Array::from(vec![None::<i64>])),
Arc::new(Int16Array::from(vec![None::<i16>])),
Arc::new(Int8Array::from(vec![None::<i8>])),
Arc::new(Float32Array::from(vec![None::<f32>])),
Arc::new(Float64Array::from(vec![None::<f64>])),
Arc::new(BooleanArray::from(vec![None::<bool>])),
Arc::new(Date32Array::from(vec![None::<i32>])),
Arc::new(TimestampMicrosecondArray::from(vec![None::<i64>]).with_timezone("UTC")),
Arc::new(
Decimal128Array::from(vec![None::<i128>])
.with_precision_and_scale(10, 2)
.unwrap(),
),
Arc::new(BinaryArray::from(vec![None::<&[u8]>])),
Arc::new(TimestampMicrosecondArray::from(vec![None::<i64>])),
]
}
fn null_partition_values() -> Result<HashMap<String, Scalar>, Box<dyn std::error::Error>> {
Ok(HashMap::from([
("p_string".into(), Scalar::Null(DataType::STRING)),
("p_int".into(), Scalar::Null(DataType::INTEGER)),
("p_long".into(), Scalar::Null(DataType::LONG)),
("p_short".into(), Scalar::Null(DataType::SHORT)),
("p_byte".into(), Scalar::Null(DataType::BYTE)),
("p_float".into(), Scalar::Null(DataType::FLOAT)),
("p_double".into(), Scalar::Null(DataType::DOUBLE)),
("p_boolean".into(), Scalar::Null(DataType::BOOLEAN)),
("p_date".into(), Scalar::Null(DataType::DATE)),
("p_timestamp".into(), Scalar::Null(DataType::TIMESTAMP)),
("p_decimal".into(), Scalar::Null(DataType::decimal(10, 2)?)),
("p_binary".into(), Scalar::Null(DataType::BINARY)),
(
"p_timestamp_ntz".into(),
Scalar::Null(DataType::TIMESTAMP_NTZ),
),
]))
}
macro_rules! assert_col {
($batch:expr, $idx:expr, $arr_ty:ty, $expected:expr) => {
assert_eq!(
$batch
.column($idx)
.as_any()
.downcast_ref::<$arr_ty>()
.unwrap()
.value(0),
$expected,
"column {} ({}) value mismatch",
$idx,
$batch.schema().field($idx).name()
);
};
}
fn assert_normal_values(sorted: &RecordBatch) {
let ts = ts_to_micros("2025-03-31 15:30:00.123456");
assert_eq!(sorted.num_rows(), 1);
assert_col!(sorted, 0, Int32Array, 1); assert_col!(sorted, 1, StringArray, "hello"); assert_col!(sorted, 2, Int32Array, 42); assert_col!(sorted, 3, Int64Array, 9_876_543_210i64); assert_col!(sorted, 4, Int16Array, 7i16); assert_col!(sorted, 5, Int8Array, 3i8); assert_col!(sorted, 6, Float32Array, 1.25f32); assert_col!(sorted, 7, Float64Array, 99.99f64); assert_col!(sorted, 8, BooleanArray, true); assert_col!(sorted, 9, Date32Array, date_to_days("2025-03-31")); assert_col!(sorted, 10, TimestampMicrosecondArray, ts); assert_col!(sorted, 11, Decimal128Array, 12345); assert_eq!(
sorted
.column(12)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.value(0),
b"Hello"
);
assert_col!(sorted, 13, TimestampMicrosecondArray, ts); }
fn assert_all_partition_columns_null(sorted: &RecordBatch) {
assert_eq!(sorted.num_rows(), 1);
for col_idx in 1..=13 {
assert!(
sorted.column(col_idx).is_null(0),
"partition column at index {col_idx} ({}) should be null",
sorted.schema().field(col_idx).name()
);
}
}
fn assert_cm_path(rel_path: &str) {
let segments: Vec<&str> = rel_path.split('/').collect();
assert_eq!(
segments.len(),
2,
"CM on: path should be <prefix>/<file>, got: {rel_path}"
);
assert_eq!(segments[0].len(), 2, "prefix should be 2 chars");
assert!(segments[0].chars().all(|c| c.is_ascii_alphanumeric()));
assert!(segments[1].ends_with(".parquet"));
}
fn cm_mode_str(mode: ColumnMappingMode) -> &'static str {
match mode {
ColumnMappingMode::None => "none",
ColumnMappingMode::Id => "id",
ColumnMappingMode::Name => "name",
}
}
fn create_partitioned_table(
table_path: &str,
engine: &dyn delta_kernel::Engine,
schema: Arc<StructType>,
partition_cols: &[&str],
cm_mode: ColumnMappingMode,
write_partition_values_parsed: bool,
) -> Result<Arc<Snapshot>, Box<dyn std::error::Error>> {
let mut builder = create_table(table_path, schema, "test/1.0")
.with_data_layout(DataLayout::partitioned(partition_cols))
.with_table_properties([(
"delta.checkpoint.writeStatsAsStruct",
write_partition_values_parsed.to_string(),
)]);
if cm_mode != ColumnMappingMode::None {
builder =
builder.with_table_properties([("delta.columnMapping.mode", cm_mode_str(cm_mode))]);
}
let _ = builder
.build(engine, Box::new(FileSystemCommitter::new()))?
.commit(engine)?;
Ok(Snapshot::builder_for(table_path).build(engine)?)
}
fn read_sorted(
snapshot: &Arc<Snapshot>,
engine: Arc<dyn delta_kernel::Engine>,
) -> Result<RecordBatch, Box<dyn std::error::Error>> {
let scan = snapshot.clone().scan_builder().build()?;
let batches = read_scan(&scan, engine)?;
assert!(!batches.is_empty(), "expected at least one batch");
let merged = delta_kernel::arrow::compute::concat_batches(&batches[0].schema(), &batches)?;
let sort_indices = delta_kernel::arrow::compute::sort_to_indices(merged.column(0), None, None)?;
let sorted_columns: Vec<ArrayRef> = merged
.columns()
.iter()
.map(|col| delta_kernel::arrow::compute::take(col.as_ref(), &sort_indices, None).unwrap())
.collect();
Ok(RecordBatch::try_new(merged.schema(), sorted_columns)?)
}
fn ts_to_micros(s: &str) -> i64 {
let ndt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f").unwrap();
Utc.from_utc_datetime(&ndt)
.signed_duration_since(chrono::DateTime::UNIX_EPOCH)
.num_microseconds()
.unwrap()
}
fn date_to_days(s: &str) -> i32 {
let date = NaiveDate::parse_from_str(s, "%Y-%m-%d").unwrap();
let dt = Utc.from_utc_datetime(&date.and_hms_opt(0, 0, 0).unwrap());
dt.signed_duration_since(chrono::DateTime::UNIX_EPOCH)
.num_days() as i32
}
fn ts_array(micros: i64) -> ArrayRef {
Arc::new(TimestampMicrosecondArray::from(vec![micros]).with_timezone("UTC"))
}
fn ts_ntz_array(micros: i64) -> ArrayRef {
Arc::new(TimestampMicrosecondArray::from(vec![micros]))
}
fn decimal_array(value: i128, precision: u8, scale: i8) -> ArrayRef {
Arc::new(
Decimal128Array::from(vec![value])
.with_precision_and_scale(precision, scale)
.unwrap(),
)
}
async fn setup_and_write(
schema: Arc<StructType>,
partition_cols: &[&str],
cm_mode: ColumnMappingMode,
write_partition_values_parsed: bool,
arrow_columns: Vec<ArrayRef>,
partition_values: HashMap<String, Scalar>,
) -> Result<
(
tempfile::TempDir,
String,
Arc<Snapshot>,
Arc<dyn delta_kernel::Engine>,
),
Box<dyn std::error::Error>,
> {
let (tmp_dir, table_path, engine) = test_table_setup_mt()?;
let (data_fields, data_columns): (Vec<_>, Vec<ArrayRef>) = schema
.fields()
.cloned()
.zip(arrow_columns)
.filter(|(f, _)| !partition_cols.contains(&f.name().as_str()))
.unzip();
let kernel_data_schema = StructType::try_new(data_fields)?;
let arrow_data_schema: Arc<ArrowSchema> = Arc::new((&kernel_data_schema).try_into_arrow()?);
let snapshot = create_partitioned_table(
&table_path,
engine.as_ref(),
schema,
partition_cols,
cm_mode,
write_partition_values_parsed,
)?;
let batch = RecordBatch::try_new(arrow_data_schema, data_columns)?;
let snapshot =
write_batch_to_table(&snapshot, engine.as_ref(), batch, partition_values).await?;
Ok((
tmp_dir,
table_path,
snapshot,
engine as Arc<dyn delta_kernel::Engine>,
))
}
fn verify_and_checkpoint(
snapshot: &Arc<Snapshot>,
engine: Arc<dyn delta_kernel::Engine>,
assert_fn: impl Fn(&RecordBatch),
) -> Result<(), Box<dyn std::error::Error>> {
let sorted = read_sorted(snapshot, engine.clone())?;
assert_fn(&sorted);
snapshot.checkpoint(engine.as_ref(), None)?;
let reloaded = Snapshot::builder_for(snapshot.table_root()).build(engine.as_ref())?;
let sorted = read_sorted(&reloaded, engine)?;
assert_fn(&sorted);
Ok(())
}
fn hive_prefix(cols: &[&str], value: &str) -> String {
cols.iter()
.map(|c| format!("{c}={value}"))
.collect::<Vec<_>>()
.join("/")
+ "/"
}
fn read_add_actions_json(
table_path: &str,
version: u64,
) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error>> {
let commit_path = format!("{table_path}/_delta_log/{version:020}.json");
let content = std::fs::read_to_string(commit_path)?;
let parsed: Vec<serde_json::Value> = serde_json::Deserializer::from_str(&content)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()?;
Ok(parsed
.into_iter()
.filter_map(|v| v.get("add").cloned())
.collect())
}
fn read_single_add(
table_path: &str,
version: u64,
) -> Result<(serde_json::Value, String), Box<dyn std::error::Error>> {
let adds = read_add_actions_json(table_path, version)?;
assert_eq!(adds.len(), 1, "should have exactly one add action");
let add = adds.into_iter().next().unwrap();
let rel_path = add["path"].as_str().unwrap().to_string();
assert!(
!rel_path.contains("://"),
"should produce relative paths, got: {rel_path}"
);
Ok((add, rel_path))
}
#[tokio::test(flavor = "multi_thread")]
async fn test_materialized_partition_columns_excluded_from_stats(
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let partition_col = "partition";
let table_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("number", DataType::INTEGER),
StructField::nullable(partition_col, DataType::STRING),
])?);
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let _ = create_table(&table_path, table_schema.clone(), "test/1.0")
.with_data_layout(DataLayout::partitioned([partition_col]))
.with_table_properties([("delta.feature.materializePartitionColumns", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let mut txn = test_utils::load_and_begin_transaction(&table_path, engine.as_ref())?
.with_engine_info("default engine");
let data_schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"number",
DataType::INTEGER,
)])?);
let arrow_schema = Arc::new(data_schema.as_ref().try_into_arrow()?);
let batch = RecordBatch::try_new(
arrow_schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;
let data = Box::new(ArrowEngineData::new(batch));
let write_context = txn.partitioned_write_context(HashMap::from([(
partition_col.to_string(),
Scalar::String("a".into()),
)]))?;
let result = engine.write_parquet(&data, &write_context).await?;
txn.add_files(result);
assert!(txn.commit(engine.as_ref())?.is_committed());
let (add, _) = read_single_add(&table_path, 1)?;
let stats: serde_json::Value = serde_json::from_str(add["stats"].as_str().unwrap()).unwrap();
assert!(
stats[MIN_VALUES].get("number").is_some(),
"data column 'number' should have minValues"
);
assert!(
stats[MAX_VALUES].get("number").is_some(),
"data column 'number' should have maxValues"
);
assert!(
stats[MIN_VALUES].get(partition_col).is_none(),
"partition column should not have minValues even when materialized"
);
assert!(
stats[MAX_VALUES].get(partition_col).is_none(),
"partition column should not have maxValues even when materialized"
);
assert!(
stats[NULL_COUNT].get(partition_col).is_none(),
"partition column should not have nullCount even when materialized"
);
Ok(())
}
#[rstest]
#[case::cm_none(ColumnMappingMode::None)]
#[case::cm_name(ColumnMappingMode::Name)]
#[case::cm_id(ColumnMappingMode::Id)]
#[tokio::test(flavor = "multi_thread")]
async fn test_materialize_partition_columns_e2e(
#[case] cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
let cm = cm_mode_str(cm_mode);
let table_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("d1", DataType::INTEGER),
StructField::nullable("p1", DataType::STRING),
StructField::nullable("p2", DataType::INTEGER),
StructField::nullable("d2", DataType::INTEGER),
])?);
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let _ = create_table(&table_path, table_schema, "test/1.0")
.with_data_layout(DataLayout::partitioned(["p1", "p2"]))
.with_table_properties([
("delta.feature.materializePartitionColumns", "supported"),
("delta.columnMapping.mode", cm),
])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let kernel_data_schema = StructType::try_new(vec![
StructField::nullable("d1", DataType::INTEGER),
StructField::nullable("d2", DataType::INTEGER),
])?;
let arrow_data_schema: Arc<ArrowSchema> = Arc::new((&kernel_data_schema).try_into_arrow()?);
let make_batch = |d1: Vec<i32>, d2: Vec<i32>| {
RecordBatch::try_new(
arrow_data_schema.clone(),
vec![
Arc::new(Int32Array::from(d1)) as ArrayRef,
Arc::new(Int32Array::from(d2)),
],
)
.unwrap()
};
let partition_values = |p1: &str, p2: i32| {
HashMap::from([
("p1".to_string(), Scalar::String(p1.into())),
("p2".to_string(), Scalar::Integer(p2)),
])
};
let mut txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
.with_engine_info("default engine")
.with_data_change(true);
for (d1, d2, p1, p2) in [
(vec![1, 2, 3], vec![10, 20, 30], "x", 5),
(vec![4, 5], vec![40, 50], "y", 6),
] {
let wc = txn.partitioned_write_context(partition_values(p1, p2))?;
let add = engine
.write_parquet(&ArrowEngineData::new(make_batch(d1, d2)), &wc)
.await?;
txn.add_files(add);
}
let snapshot = txn.commit(engine.as_ref())?.unwrap_post_commit_snapshot();
let logical_schema = snapshot.schema();
let p1_phys = logical_schema.field("p1").unwrap().physical_name(cm_mode);
let p2_phys = logical_schema.field("p2").unwrap().physical_name(cm_mode);
let adds = read_add_actions_json(&table_path, 1)?;
assert_eq!(adds.len(), 2, "one commit should write two partition files");
let mut found: Vec<(String, i32, usize)> = Vec::new();
for add in &adds {
let rel_path = add["path"].as_str().unwrap();
let parquet_path = Url::from_directory_path(&table_path)
.unwrap()
.join(rel_path)?
.to_file_path()
.unwrap();
let file_batch = read_parquet_file(&parquet_path);
let pv = add["partitionValues"].as_object().unwrap();
let p1_from_delta_log = pv.get(p1_phys).and_then(|v| v.as_str()).unwrap();
let p2_from_delta_log: i32 = pv.get(p2_phys).and_then(|v| v.as_str()).unwrap().parse()?;
let p1 = get_column!(file_batch, p1_phys, StringArray);
let p2 = get_column!(file_batch, p2_phys, Int32Array);
assert!(
p1.iter().all(|v| v == Some(p1_from_delta_log)),
"materialized p1 should equal declared '{p1_from_delta_log}' in every row, got {p1:?}"
);
assert!(
p2.iter().all(|v| v == Some(p2_from_delta_log)),
"materialized p2 should equal declared {p2_from_delta_log} in every row, got {p2:?}"
);
found.push((
p1_from_delta_log.to_string(),
p2_from_delta_log,
file_batch.num_rows(),
));
}
found.sort();
assert_eq!(
found,
vec![("x".to_string(), 5, 3), ("y".to_string(), 6, 2)]
);
let sorted = read_sorted(&snapshot, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
let int_col = |name: &str| get_column!(sorted, name, Int32Array).values().to_vec();
let p1_scan: Vec<Option<&str>> = get_column!(sorted, "p1", StringArray).iter().collect();
assert_eq!(int_col("d1"), vec![1, 2, 3, 4, 5]);
assert_eq!(int_col("d2"), vec![10, 20, 30, 40, 50]);
assert_eq!(
p1_scan,
vec![Some("x"), Some("x"), Some("x"), Some("y"), Some("y")]
);
assert_eq!(int_col("p2"), vec![5, 5, 5, 6, 6]);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_materialize_all_primitive_partition_types() -> Result<(), Box<dyn std::error::Error>>
{
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let _ = create_table(&table_path, all_types_schema(), "test/1.0")
.with_data_layout(DataLayout::partitioned(PARTITION_COLS.iter().copied()))
.with_table_properties([("delta.feature.materializePartitionColumns", "supported")])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let data_schema = StructType::try_new(vec![StructField::nullable("value", DataType::INTEGER)])?;
let batch = RecordBatch::try_new(
Arc::new((&data_schema).try_into_arrow()?),
vec![normal_arrow_columns()[0].clone()],
)?;
let snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
batch,
normal_partition_values()?,
)
.await?;
let (_add, rel_path) = read_single_add(&table_path, 1)?;
let parquet_path = Url::from_directory_path(&table_path)
.unwrap()
.join(&rel_path)?
.to_file_path()
.unwrap();
assert_normal_values(&read_parquet_file(&parquet_path));
let sorted = read_sorted(&snapshot, engine.clone() as Arc<dyn delta_kernel::Engine>)?;
assert_normal_values(&sorted);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_input_data_with_partition_column_errors(
#[values(true, false)] materialized: bool,
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
let cm = cm_mode_str(cm_mode);
let partition_col = "partition";
let table_schema = Arc::new(StructType::try_new(vec![
StructField::nullable("number", DataType::INTEGER),
StructField::nullable(partition_col, DataType::STRING),
])?);
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let mut properties = vec![("delta.columnMapping.mode", cm)];
if materialized {
properties.push(("delta.feature.materializePartitionColumns", "supported"));
}
let _ = create_table(&table_path, table_schema.clone(), "test/1.0")
.with_data_layout(DataLayout::partitioned([partition_col]))
.with_table_properties(properties)
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let txn = test_utils::load_and_begin_transaction(&table_path, engine.as_ref())?
.with_engine_info("default engine");
let arrow_schema = Arc::new(table_schema.as_ref().try_into_arrow()?);
let batch = RecordBatch::try_new(
arrow_schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "a", "a"])),
],
)?;
let data = Box::new(ArrowEngineData::new(batch));
let write_context = txn.partitioned_write_context(HashMap::from([(
partition_col.to_string(),
Scalar::String("a".into()),
)]))?;
let err = engine
.write_parquet(&data, &write_context)
.await
.err()
.expect("writing data that includes the partition column must fail")
.to_string();
let needle = if materialized {
"Too few fields in output schema"
} else {
"Passed struct had 2 columns, but transformed column has 1"
};
assert!(
err.contains(needle),
"expected error containing {needle:?} (materialized={materialized}), got: {err}"
);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_partition_null_validation(
#[values(false, true)] materialized: bool,
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
#[values(
(Scalar::Null(DataType::STRING), Some("not nullable")),
(Scalar::String("a".into()), None),
(Scalar::String(String::new()), Some("not nullable")),
)]
case: (Scalar, Option<&'static str>),
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let (value, expected_err) = case;
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("value", DataType::INTEGER),
StructField::not_null("p", DataType::STRING),
])?);
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let mut properties = vec![("delta.columnMapping.mode", cm_mode_str(cm_mode))];
if materialized {
properties.push(("delta.feature.materializePartitionColumns", "supported"));
}
let _ = create_table(&table_path, schema, "test/1.0")
.with_data_layout(DataLayout::partitioned(["p"]))
.with_table_properties(properties)
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?;
let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
let result = begin_transaction(snapshot, engine.as_ref())?
.with_engine_info("default engine")
.partitioned_write_context(HashMap::from([("p".to_string(), value)]));
match expected_err {
Some(needle) => {
let err = result
.err()
.ok_or(
"expected partitioned_write_context to error for a null-equivalent value into NOT NULL partition",
)?
.to_string();
assert!(err.contains(needle), "{err}");
assert!(err.contains("'p'"), "{err}");
}
None => {
result?;
}
}
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_partition_null_validation_mixed_nullability(
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = tracing_subscriber::fmt::try_init();
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("value", DataType::INTEGER),
StructField::not_null("p_required", DataType::STRING),
StructField::nullable("p_optional", DataType::STRING),
])?);
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let snapshot = create_partitioned_table(
&table_path,
engine.as_ref(),
schema,
&["p_required", "p_optional"],
cm_mode,
false, )?;
begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("default engine")
.partitioned_write_context(HashMap::from([
("p_required".to_string(), Scalar::String("a".into())),
("p_optional".to_string(), Scalar::Null(DataType::STRING)),
]))?;
begin_transaction(snapshot.clone(), engine.as_ref())?
.with_engine_info("default engine")
.partitioned_write_context(HashMap::from([
("p_required".to_string(), Scalar::String("a".into())),
("p_optional".to_string(), Scalar::String(String::new())),
]))?;
let err = begin_transaction(snapshot, engine.as_ref())?
.with_engine_info("default engine")
.partitioned_write_context(HashMap::from([
("p_required".to_string(), Scalar::Null(DataType::STRING)),
("p_optional".to_string(), Scalar::String("b".into())),
]))
.unwrap_err()
.to_string();
assert!(err.contains("not nullable"), "{err}");
assert!(err.contains("'p_required'"), "{err}");
Ok(())
}