use std::path::PathBuf;
use std::sync::Arc;
use std::vec;
use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::arrow::array::{ArrayRef, AsArray as _, RecordBatch, TimestampMicrosecondArray};
use delta_kernel::arrow::compute::{concat_batches, filter_record_batch};
use delta_kernel::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Int64Type, Schema as ArrowSchema, TimeUnit,
};
use delta_kernel::engine::arrow_conversion::TryFromKernel as _;
use delta_kernel::engine::arrow_data::EngineDataArrowExt as _;
use delta_kernel::expressions::{
column_expr, column_pred, Expression as Expr, ExpressionRef, Predicate as Pred, PredicateRef,
Scalar,
};
use delta_kernel::log_segment::LogSegment;
use delta_kernel::object_store::memory::InMemory;
use delta_kernel::object_store::path::Path;
use delta_kernel::object_store::ObjectStoreExt as _;
use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties};
use delta_kernel::path::ParsedLogPath;
use delta_kernel::scan::state::{transform_to_logical, ScanFile};
use delta_kernel::scan::{Scan, StatsOptions};
use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType};
use delta_kernel::{Engine, FileMeta, Snapshot};
use itertools::Itertools;
use test_utils::delta_kernel_default_engine::DefaultEngineBuilder;
use test_utils::{
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
load_test_data, read_scan, record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray,
TestAction, METADATA,
};
use url::Url;
const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet";
const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet";
const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet";
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
/// Bridge the new `Path::join` method that deprecates `Path::child` in object_store 0.13.
trait PathExt {
fn join(&self, other: &str) -> Self;
}
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
impl PathExt for Path {
fn join(&self, other: &str) -> Self {
self.child(other)
}
}
/// Convert all top-level fields in a RecordBatch to nullable, matching Delta table schema
/// conventions where the table metadata declares columns as nullable.
fn make_top_level_fields_nullable(batch: &RecordBatch) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(
batch
.schema()
.fields()
.iter()
.map(|f| ArrowField::new(f.name(), f.data_type().clone(), true))
.collect::<Vec<_>>(),
));
RecordBatch::try_new(schema, batch.columns().to_vec()).unwrap()
}
#[tokio::test]
async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let parquet_bytes = record_batch_to_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size),
]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let expected = make_top_level_fields_nullable(&batch);
let expected_data = vec![expected.clone(), expected];
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let mut files = 0;
let stream = scan.execute(engine)?.zip(expected_data);
for (data, expected) in stream {
files += 1;
assert_eq!(into_record_batch(data?), expected);
}
assert_eq!(2, files, "Expected to have scanned two files");
Ok(())
}
#[tokio::test]
async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let parquet_bytes = record_batch_to_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
1,
actions_to_string(vec![TestAction::AddWithSize(
PARQUET_FILE2.to_string(),
file_size,
)]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
let expected = make_top_level_fields_nullable(&batch);
let expected_data = vec![expected.clone(), expected];
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let scan = snapshot.scan_builder().build()?;
let mut files = 0;
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);
for (data, expected) in stream {
files += 1;
assert_eq!(into_record_batch(data?), expected);
}
assert_eq!(2, files, "Expected to have scanned two files");
Ok(())
}
#[tokio::test]
async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let parquet_bytes = record_batch_to_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
1,
actions_to_string(vec![TestAction::AddWithSize(
PARQUET_FILE2.to_string(),
file_size,
)]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
2,
actions_to_string(vec![TestAction::RemoveWithSize(
PARQUET_FILE2.to_string(),
file_size,
)]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
let expected = make_top_level_fields_nullable(&batch);
let expected_data = vec![expected];
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let scan = snapshot.scan_builder().build()?;
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);
let mut files = 0;
for (data, expected) in stream {
files += 1;
assert_eq!(into_record_batch(data?), expected);
}
assert_eq!(1, files, "Expected to have scanned one file");
Ok(())
}
#[tokio::test]
async fn stats() -> Result<(), Box<dyn std::error::Error>> {
fn generate_commit2(actions: Vec<TestAction>) -> String {
actions
.into_iter()
.map(|test_action| match test_action {
TestAction::Add(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
TestAction::Remove(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
TestAction::Metadata => METADATA.into(),
TestAction::AddWithSize(path, size) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":{size},"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
TestAction::RemoveWithSize(path, size) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":{size},"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
})
.fold(String::new(), |a, b| a + &b + "\n")
}
let batch1 = make_top_level_fields_nullable(&generate_simple_batch()?);
let batch2 = make_top_level_fields_nullable(&generate_batch(vec![
("id", vec![5, 7].into_array()),
("val", vec!["e", "g"].into_array()),
])?);
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
// valid commit with min/max (0, 2)
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
]),
)
.await?;
// storage.add_commit(1, &format!("{}\n",
// r#"{{"add":{{"path":"doesnotexist","partitionValues":{{}},"size":262,"modificationTime":
// 1587968586000,"dataChange":true,
// "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\":
// 0}},\"maxValues\":{{\"id\":2}}}}"}}}}"#));
add_commit(
table_root,
storage.as_ref(),
1,
generate_commit2(vec![TestAction::AddWithSize(
PARQUET_FILE2.to_string(),
file_size2,
)]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch1).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch2).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
// The first file has id between 1 and 3; the second has id between 5 and 7. For each operator,
// we validate the boundary values where we expect the set of matched files to change.
//
// NOTE: For cases that match both batch1 and batch2, we list batch2 first because log replay
// returns most recently added files first.
#[allow(clippy::type_complexity)] // otherwise it's even more complex because no `_`
let test_cases: Vec<(fn(Expr, Expr) -> _, _, _)> = vec![
(Pred::eq, 0i32, vec![]),
(Pred::eq, 1, vec![&batch1]),
(Pred::eq, 3, vec![&batch1]),
(Pred::eq, 4, vec![]),
(Pred::eq, 5, vec![&batch2]),
(Pred::eq, 7, vec![&batch2]),
(Pred::eq, 8, vec![]),
(Pred::lt, 1, vec![]),
(Pred::lt, 2, vec![&batch1]),
(Pred::lt, 5, vec![&batch1]),
(Pred::lt, 6, vec![&batch2, &batch1]),
(Pred::le, 0, vec![]),
(Pred::le, 1, vec![&batch1]),
(Pred::le, 4, vec![&batch1]),
(Pred::le, 5, vec![&batch2, &batch1]),
(Pred::gt, 2, vec![&batch2, &batch1]),
(Pred::gt, 3, vec![&batch2]),
(Pred::gt, 6, vec![&batch2]),
(Pred::gt, 7, vec![]),
(Pred::ge, 3, vec![&batch2, &batch1]),
(Pred::ge, 4, vec![&batch2]),
(Pred::ge, 7, vec![&batch2]),
(Pred::ge, 8, vec![]),
(Pred::ne, 0, vec![&batch2, &batch1]),
(Pred::ne, 1, vec![&batch2, &batch1]),
(Pred::ne, 3, vec![&batch2, &batch1]),
(Pred::ne, 4, vec![&batch2, &batch1]),
(Pred::ne, 5, vec![&batch2, &batch1]),
(Pred::ne, 7, vec![&batch2, &batch1]),
(Pred::ne, 8, vec![&batch2, &batch1]),
];
for (pred_fn, value, expected_batches) in test_cases {
let predicate = pred_fn(column_expr!("id"), Expr::literal(value));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(Arc::new(predicate.clone()))
.build()?;
let expected_files = expected_batches.len();
let mut files_scanned = 0;
let stream = scan.execute(engine.clone())?.zip(expected_batches);
for (batch, expected) in stream {
files_scanned += 1;
assert_eq!(into_record_batch(batch?), expected.clone());
}
assert_eq!(expected_files, files_scanned, "{predicate:?}");
}
Ok(())
}
fn read_with_execute(
engine: Arc<dyn Engine>,
scan: &Scan,
expected: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
let result_schema = Arc::new(ArrowSchema::try_from_kernel(
scan.logical_schema().as_ref(),
)?);
let batches = read_scan(scan, engine)?;
if expected.is_empty() {
assert_eq!(batches.len(), 0);
} else {
let batch = concat_batches(&result_schema, &batches)?;
assert_batches_sorted_eq!(expected, &[batch]);
}
Ok(())
}
fn scan_metadata_callback(batches: &mut Vec<ScanFile>, scan_file: ScanFile) {
batches.push(scan_file);
}
fn read_with_scan_metadata(
location: &Url,
engine: &dyn Engine,
scan: &Scan,
expected: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
let result_schema = Arc::new(ArrowSchema::try_from_kernel(
scan.logical_schema().as_ref(),
)?);
let scan_metadata = scan.scan_metadata(engine)?;
let mut scan_files = vec![];
for res in scan_metadata {
let scan_metadata = res?;
scan_files = scan_metadata.visit_scan_files(scan_files, scan_metadata_callback)?;
}
let mut batches = vec![];
for scan_file in scan_files.into_iter() {
let file_path = location.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine, location)
.unwrap();
let meta = FileMeta {
last_modified: 0,
size: scan_file.size.try_into().unwrap(),
location: file_path,
};
let read_results = engine
.parquet_handler()
.read_parquet_files(
&[meta],
scan.physical_schema().clone(),
scan.physical_predicate().clone(),
)
.unwrap();
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();
// to transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
scan.physical_schema(),
scan.logical_schema(),
scan_file.transform.clone(),
)
.unwrap();
let record_batch = logical.try_into_record_batch()?;
let rest = split_vector(selection_vector.as_mut(), len, Some(true));
let batch = if let Some(mask) = selection_vector.clone() {
// apply the selection vector
filter_record_batch(&record_batch, &mask.into()).unwrap()
} else {
record_batch
};
selection_vector = rest;
batches.push(batch);
}
}
if expected.is_empty() {
assert_eq!(batches.len(), 0);
} else {
let batch = concat_batches(&result_schema, &batches)?;
assert_batches_sorted_eq!(expected, &[batch]);
}
Ok(())
}
fn read_table_data(
path: &str,
select_cols: Option<&[&str]>,
predicate: Option<Pred>,
mut expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from(path))?;
let predicate = predicate.map(Arc::new);
let url = url::Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?;
let read_schema = select_cols.map(|select_cols| {
let table_schema = snapshot.schema();
let selected_fields = select_cols
.iter()
.map(|col| table_schema.field(col).cloned().unwrap());
Arc::new(Schema::new_unchecked(selected_fields))
});
println!("Read {url:?} with schema {read_schema:#?} and predicate {predicate:#?}");
let scan = snapshot
.scan_builder()
.with_schema_opt(read_schema)
.with_predicate(predicate.clone())
.build()?;
sort_lines!(expected);
read_with_scan_metadata(&url, engine.as_ref(), &scan, &expected)?;
read_with_execute(engine, &scan, &expected)?;
Ok(())
}
// util to take a Vec<&str> and call read_table_data with Vec<String>
fn read_table_data_str(
path: &str,
select_cols: Option<&[&str]>,
predicate: Option<Pred>,
expected: Vec<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
path,
select_cols,
predicate,
expected.into_iter().map(String::from).collect(),
)
}
#[test]
fn data() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+--------+--------+---------+",
"| letter | number | a_float |",
"+--------+--------+---------+",
"| | 6 | 6.6 |",
"| a | 1 | 1.1 |",
"| a | 4 | 4.4 |",
"| b | 2 | 2.2 |",
"| c | 3 | 3.3 |",
"| e | 5 | 5.5 |",
"+--------+--------+---------+",
];
read_table_data_str("./tests/data/basic_partitioned", None, None, expected)?;
Ok(())
}
#[test]
fn column_ordering() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------+--------+--------+",
"| a_float | letter | number |",
"+---------+--------+--------+",
"| 6.6 | | 6 |",
"| 4.4 | a | 4 |",
"| 5.5 | e | 5 |",
"| 1.1 | a | 1 |",
"| 2.2 | b | 2 |",
"| 3.3 | c | 3 |",
"+---------+--------+--------+",
];
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "letter", "number"]),
None,
expected,
)?;
Ok(())
}
#[test]
fn column_ordering_and_projection() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------+--------+",
"| a_float | number |",
"+---------+--------+",
"| 6.6 | 6 |",
"| 4.4 | 4 |",
"| 5.5 | 5 |",
"| 1.1 | 1 |",
"| 2.2 | 2 |",
"| 3.3 | 3 |",
"+---------+--------+",
];
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
None,
expected,
)?;
Ok(())
}
// get the basic_partitioned table for a set of expected numbers
fn table_for_numbers(nums: Vec<u32>) -> Vec<String> {
let mut res: Vec<String> = vec![
"+---------+--------+",
"| a_float | number |",
"+---------+--------+",
]
.into_iter()
.map(String::from)
.collect();
for num in nums.iter() {
res.push(format!("| {num}.{num} | {num} |"));
}
res.push("+---------+--------+".to_string());
res
}
// get the basic_partitioned table for a set of expected letters
fn table_for_letters(letters: &[char]) -> Vec<String> {
let mut res: Vec<String> = vec![
"+--------+--------+",
"| letter | number |",
"+--------+--------+",
]
.into_iter()
.map(String::from)
.collect();
let rows = vec![(1, 'a'), (2, 'b'), (3, 'c'), (4, 'a'), (5, 'e')];
for (num, letter) in rows {
if letters.contains(&letter) {
res.push(format!("| {letter} | {num} |"));
}
}
res.push("+--------+--------+".to_string());
res
}
#[rstest::rstest]
#[case::less_than(
column_expr!("number").lt(Expr::literal(4i64)),
table_for_numbers(vec![1, 2, 3])
)]
#[case::less_than_or_equal(
column_expr!("number").le(Expr::literal(4i64)),
table_for_numbers(vec![1, 2, 3, 4])
)]
#[case::greater_than(
column_expr!("number").gt(Expr::literal(4i64)),
table_for_numbers(vec![5, 6])
)]
#[case::greater_than_or_equal(
column_expr!("number").ge(Expr::literal(4i64)),
table_for_numbers(vec![4, 5, 6])
)]
#[case::equal(
column_expr!("number").eq(Expr::literal(4i64)),
table_for_numbers(vec![4])
)]
#[case::not_equal(
column_expr!("number").ne(Expr::literal(4i64)),
table_for_numbers(vec![1, 2, 3, 5, 6])
)]
fn predicate_on_number(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::is_null(
column_expr!("letter").is_null(),
vec![
"+--------+--------+",
"| letter | number |",
"+--------+--------+",
"| | 6 |",
"+--------+--------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::is_not_null(
column_expr!("letter").is_not_null(),
table_for_letters(&['a', 'b', 'c', 'e'])
)]
#[case::less_than(
column_expr!("letter").lt(Expr::literal("c")),
table_for_letters(&['a', 'b'])
)]
#[case::less_than_or_equal(
column_expr!("letter").le(Expr::literal("c")),
table_for_letters(&['a', 'b', 'c'])
)]
#[case::greater_than(
column_expr!("letter").gt(Expr::literal("c")),
table_for_letters(&['e'])
)]
#[case::greater_than_or_equal(
column_expr!("letter").ge(Expr::literal("c")),
table_for_letters(&['c', 'e'])
)]
#[case::equal(
column_expr!("letter").eq(Expr::literal("c")),
table_for_letters(&['c'])
)]
#[case::not_equal(
column_expr!("letter").ne(Expr::literal("c")),
table_for_letters(&['a', 'b', 'e'])
)]
fn predicate_on_letter(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
// Test basic column pruning. Note that the actual predicate machinery is already well-tested,
// so we're just testing wiring here.
read_table_data(
"./tests/data/basic_partitioned",
Some(&["letter", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::or_with_pruning(
Pred::or(
column_expr!("letter").gt(Expr::literal("a")),
column_expr!("number").gt(Expr::literal(3i64)),
),
// Unified data skipping evaluates partition + data predicates in a single pass.
// File a/1 (letter='a', max(number)=1): OR('a'>'a', 1>3) = FALSE -> pruned
vec![
"+--------+--------+",
"| letter | number |",
"+--------+--------+",
"| | 6 |",
"| a | 4 |",
"| b | 2 |",
"| c | 3 |",
"| e | 5 |",
"+--------+--------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::and_with_pruning(
Pred::and(
column_expr!("letter").gt(Expr::literal("a")), // numbers 2, 3, 5
column_expr!("number").gt(Expr::literal(3i64)), // letters a, e
),
table_for_letters(&['e'])
)]
#[case::and_with_nested_or(
Pred::and(
column_expr!("letter").gt(Expr::literal("a")), // numbers 2, 3, 5
Pred::or(
column_expr!("letter").eq(Expr::literal("c")),
column_expr!("number").eq(Expr::literal(3i64)),
),
),
// Unified data skipping evaluates the full expression:
// b/2: AND(TRUE, OR(FALSE, FALSE)) = FALSE -> pruned
// c/3: AND(TRUE, OR(TRUE, TRUE)) = TRUE -> kept
// e/5: AND(TRUE, OR(FALSE, FALSE)) = FALSE -> pruned
table_for_letters(&['c'])
)]
fn predicate_on_letter_and_number(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
// Unified data skipping evaluates partition + data predicates together in a single
// columnar pass, enabling pruning for mixed predicates including OR expressions.
read_table_data(
"./tests/data/basic_partitioned",
Some(&["letter", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
// Regression test for issue #2468
#[rstest::rstest]
#[case::predicate_on_unprojected_data_column_in_table_schema_succeeds(
// `number` is a data column not present in the projected schema (`a_float`).
column_expr!("number").gt(Expr::literal(4i64)),
vec![
"+---------+",
"| a_float |",
"+---------+",
"| 5.5 |",
"| 6.6 |",
"+---------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::predicate_on_partition_column_in_table_schema_succeeds(
column_expr!("letter").eq(Expr::literal("a")),
vec![
"+---------+",
"| a_float |",
"+---------+",
"| 1.1 |",
"| 4.4 |",
"+---------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::predicate_on_mixed_projected_and_unprojected_columns_succeeds(
// a_float is projected, number is unprojected
Pred::and(
column_expr!("a_float").gt(Expr::literal(4.0)),
column_expr!("number").lt(Expr::literal(6i64)),
),
vec![
"+---------+",
"| a_float |",
"+---------+",
"| 4.4 |",
"| 5.5 |",
"+---------+",
]
.into_iter()
.map(String::from)
.collect()
)]
fn predicate_on_unprojected_column(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float"]),
Some(pred),
expected,
)?;
Ok(())
}
/// Test partition pruning on a table with a checkpoint containing `partitionValues_parsed`.
/// This exercises the checkpoint code path where typed partition values are read directly
/// from the parquet column rather than parsed from the string map via `MapToStruct`.
///
/// Table: app-txn-checkpoint (checkpoint at v1, partition column: `modified` (string))
/// - 2 files with modified=2021-02-01 (value 4-11, 8 rows each)
/// - 2 files with modified=2021-02-02 (value 1-3, 3 rows each)
#[rstest::rstest]
#[case::partition_only_prunes_one_partition(
// Partition-only predicate: modified = '2021-02-02' should prune 2021-02-01 files
column_expr!("modified").eq(Expr::literal("2021-02-02")),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 3 |",
"| A | 2021-02-02 | 3 |",
"| B | 2021-02-02 | 2 |",
"| B | 2021-02-02 | 2 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::partition_prunes_other_partition(
// modified = '2021-02-01' should prune 2021-02-02 files, keeping all 2021-02-01 rows
column_expr!("modified").eq(Expr::literal("2021-02-01")),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-01 | 7 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-01 | 9 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
fn partition_pruning_with_checkpoint_parsed_values(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/app-txn-checkpoint",
Some(&["id", "modified", "value"]),
Some(pred),
expected,
)?;
Ok(())
}
/// Test mixed predicates (partition + data stats) on a checkpoint with both
/// `partitionValues_parsed` and `stats_parsed`. This exercises the unified columnar data skipping
/// pass that evaluates both partition values and data column statistics together.
///
/// Table: app-txn-checkpoint (checkpoint at v1, partition column: `modified` (string))
/// - 2 files: modified=2021-02-02 -- 3 rows each, value in [1, 3]
/// - 2 files: modified=2021-02-01 -- 8 rows each, value in [4, 11]
#[rstest::rstest]
#[case::and_keeps_partition_matched_files(
// Data skipping keeps 2021-02-01 files (partition matches, max(value)=11 > 9) and
// prunes 2021-02-02 files (partition mismatch). All rows from kept files are returned
// since kernel does not apply row-level predicate filtering.
Pred::and(
column_expr!("modified").eq(Expr::literal("2021-02-01")),
column_expr!("value").gt(Expr::literal(9i32)),
),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-01 | 7 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-01 | 9 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::and_prunes_all_files(
// 2021-02-02: partition matches but data stats fail (max value=3, NOT > 3).
// 2021-02-01: partition mismatch. All 4 files pruned.
Pred::and(
column_expr!("modified").eq(Expr::literal("2021-02-02")),
column_expr!("value").gt(Expr::literal(3i32)),
),
vec![]
)]
#[case::or_prunes_by_both_partition_and_stats(
// 2021-02-01 pruned: partition mismatch AND max(value)=11 NOT > 11.
// 2021-02-02 kept by partition match. Only 2021-02-02 rows returned.
Pred::or(
column_expr!("modified").eq(Expr::literal("2021-02-02")),
column_expr!("value").gt(Expr::literal(11i32)),
),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 3 |",
"| A | 2021-02-02 | 3 |",
"| B | 2021-02-02 | 2 |",
"| B | 2021-02-02 | 2 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::or_keeps_all_files(
// 2021-02-02 kept by partition match, 2021-02-01 kept by data stats (max=11 > 9).
// All rows from all 4 files are returned.
Pred::or(
column_expr!("modified").eq(Expr::literal("2021-02-02")),
column_expr!("value").gt(Expr::literal(9i32)),
),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 3 |",
"| A | 2021-02-02 | 3 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-02 | 2 |",
"| B | 2021-02-02 | 2 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
fn mixed_predicate_with_checkpoint_parsed_columns(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
// Exercises the unified data skipping path that reads both `partitionValues_parsed` and
// `stats_parsed` from the checkpoint parquet file in a single columnar pass.
read_table_data(
"./tests/data/app-txn-checkpoint",
Some(&["id", "modified", "value"]),
Some(pred),
expected,
)?;
Ok(())
}
/// Test partition pruning on a table with column mapping (name mode). The logical partition
/// column "category" has physical name "phys_category". With column mapping, `partitionValues`
/// in the log uses physical column names, and the partition schema + predicate must also use
/// physical names for `MapToStruct` extraction and data skipping to work correctly.
#[rstest::rstest]
#[case::partition_only(
// Partition-only predicate: category = 'A' prunes the category=B file
Arc::new(Pred::eq(column_expr!("category"), Expr::literal("A"))),
None,
1
)]
#[case::mixed_partition_and_data(
// Mixed predicate: category = 'A' OR val > 'z'. Category=A kept by partition match.
// Category=B: partition mismatch, but max(val)='z' NOT > 'z', so data skipping prunes it.
Arc::new(Pred::or(
Pred::eq(column_expr!("category"), Expr::literal("A")),
Pred::gt(column_expr!("val"), Expr::literal("z")),
)),
None,
1
)]
#[case::predicate_on_unprojected_data_column(
// Project only "category"; predicate references unprojected "val" (logical) whose
// physical name is "phys_val". Kernel must resolve to phys_val via the full table
// schema and prune both files: max(phys_val)='z' is NOT > 'z'.
Arc::new(Pred::gt(column_expr!("val"), Expr::literal("z"))),
Some(vec!["category"]),
0
)]
#[case::predicate_on_unprojected_partition_column(
// Project only "val"; predicate references unprojected partition column "category"
// (logical, physical name "phys_category"). Partition pruning must still kick in
// using the physical partition name, keeping only the category=A file.
Arc::new(Pred::eq(column_expr!("category"), Expr::literal("A"))),
Some(vec!["val"]),
1
)]
#[tokio::test]
async fn test_partition_pruning_with_column_mapping(
#[case] predicate: Arc<Pred>,
#[case] select_cols: Option<Vec<&'static str>>,
#[case] expected_files: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_batch(vec![("phys_val", vec!["x", "y", "z"].into_array())])?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
// Column mapping name mode: logical "category" -> physical "phys_category",
// logical "val" -> physical "phys_val"
let schema_str = r#"{"type":"struct","fields":[{"name":"category","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":1,"delta.columnMapping.physicalName":"phys_category"}},{"name":"val","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":2,"delta.columnMapping.physicalName":"phys_val"}}]}"#;
let actions = [
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["columnMapping"],"writerFeatures":["columnMapping"]}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","isBlindAppend":true}}"#.to_string(),
format!(
r#"{{"metaData":{{"id":"test-cm","format":{{"provider":"parquet","options":{{}}}},"schemaString":"{schema}","partitionColumns":["category"],"configuration":{{"delta.columnMapping.mode":"name","delta.columnMapping.maxColumnId":"2"}},"createdTime":1587968585495}}}}"#,
schema = schema_str.replace('"', r#"\""#),
),
// partitionValues uses physical column name when column mapping is enabled
format!(
r#"{{"add":{{"path":"phys_category=A/{PARQUET_FILE1}","partitionValues":{{"phys_category":"A"}},"size":0,"modificationTime":1587968586000,"dataChange":true,"stats":"{{\"numRecords\":3,\"nullCount\":{{\"phys_val\":0}},\"minValues\":{{\"phys_val\":\"x\"}},\"maxValues\":{{\"phys_val\":\"z\"}}}}" }}}}"#
),
format!(
r#"{{"add":{{"path":"phys_category=B/{PARQUET_FILE2}","partitionValues":{{"phys_category":"B"}},"size":0,"modificationTime":1587968586000,"dataChange":true,"stats":"{{\"numRecords\":3,\"nullCount\":{{\"phys_val\":0}},\"minValues\":{{\"phys_val\":\"x\"}},\"maxValues\":{{\"phys_val\":\"z\"}}}}" }}}}"#
),
];
add_commit(table_root, storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("phys_category=A").join(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("phys_category=B").join(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
// Predicates use logical column names -- kernel must map to physical names. The
// optional projection narrows the output schema; when set, the predicate may still
// reference columns outside it
let projection = select_cols
.as_ref()
.map(|cols| snapshot.schema().project(cols))
.transpose()?;
let scan = snapshot
.scan_builder()
.with_schema_opt(projection)
.with_predicate(predicate)
.build()?;
let stream = scan.execute(engine)?;
let mut files_scanned = 0;
// "category" is in the output iff there's no projection (SELECT *) or the projection
// explicitly includes it.
let expect_category = select_cols
.as_ref()
.is_none_or(|cols| cols.contains(&"category"));
for engine_data in stream {
let result_batch = into_record_batch(engine_data?);
// The "category" partition column should be present exactly when projected, and
// should be 'A' for every surviving row.
match result_batch.schema().index_of("category") {
Ok(category_idx) => {
assert!(
expect_category,
"category present but not expected in projection"
);
let category_col = result_batch.column(category_idx).as_string::<i32>();
for i in 0..result_batch.num_rows() {
assert_eq!(category_col.value(i), "A");
}
}
Err(_) => assert!(
!expect_category,
"category missing but expected in projection"
),
}
files_scanned += 1;
}
assert_eq!(
expected_files, files_scanned,
"Expected partition pruning to return {expected_files} file(s)"
);
Ok(())
}
#[rstest::rstest]
#[case::not_less_than(
Pred::not(column_expr!("number").lt(Expr::literal(4i64))),
table_for_numbers(vec![4, 5, 6])
)]
#[case::not_less_than_or_equal(
Pred::not(column_expr!("number").le(Expr::literal(4i64))),
table_for_numbers(vec![5, 6])
)]
#[case::not_greater_than(
Pred::not(column_expr!("number").gt(Expr::literal(4i64))),
table_for_numbers(vec![1, 2, 3, 4])
)]
#[case::not_greater_than_or_equal(
Pred::not(column_expr!("number").ge(Expr::literal(4i64))),
table_for_numbers(vec![1, 2, 3])
)]
#[case::not_equal(
Pred::not(column_expr!("number").eq(Expr::literal(4i64))),
table_for_numbers(vec![1, 2, 3, 5, 6])
)]
#[case::not_not_equal(
Pred::not(column_expr!("number").ne(Expr::literal(4i64))),
table_for_numbers(vec![4])
)]
fn predicate_on_number_not(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[test]
fn predicate_on_number_with_not_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------+--------+",
"| a_float | number |",
"+---------+--------+",
"| 1.1 | 1 |",
"| 2.2 | 2 |",
"+---------+--------+",
];
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(Pred::and(
column_expr!("number").is_not_null(),
column_expr!("number").lt(Expr::literal(3i64)),
)),
expected,
)?;
Ok(())
}
#[test]
fn predicate_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![]; // number is never null
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(column_expr!("number").is_null()),
expected,
)?;
Ok(())
}
#[test]
fn mixed_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+------+--------------+",
"| part | n |",
"+------+--------------+",
"| 0 | |",
"| 0 | |",
"| 0 | |",
"| 0 | |",
"| 0 | |",
"| 2 | |",
"| 2 | non-null-mix |",
"| 2 | |",
"| 2 | non-null-mix |",
"| 2 | |",
"+------+--------------+",
];
read_table_data_str(
"./tests/data/mixed-nulls",
Some(&["part", "n"]),
Some(column_expr!("n").is_null()),
expected,
)?;
Ok(())
}
#[test]
fn mixed_not_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+------+--------------+",
"| part | n |",
"+------+--------------+",
"| 1 | non-null |",
"| 1 | non-null |",
"| 1 | non-null |",
"| 1 | non-null |",
"| 1 | non-null |",
"| 2 | |",
"| 2 | |",
"| 2 | |",
"| 2 | non-null-mix |",
"| 2 | non-null-mix |",
"+------+--------------+",
];
read_table_data_str(
"./tests/data/mixed-nulls",
Some(&["part", "n"]),
Some(column_expr!("n").is_not_null()),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::and_both_conditions(
Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
),
table_for_numbers(vec![6])
)]
#[case::and_with_negation(
Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
),
table_for_numbers(vec![5])
)]
#[case::or_either_condition(
Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
),
table_for_numbers(vec![5, 6])
)]
#[case::or_with_negation(
Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
fn and_or_predicates(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::not_and_both_conditions(
Pred::not(Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
)),
table_for_numbers(vec![1, 2, 3, 4, 5])
)]
#[case::not_and_with_negation(
Pred::not(Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
)),
table_for_numbers(vec![1, 2, 3, 4, 6])
)]
#[case::not_or_either_condition(
Pred::not(Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
)),
table_for_numbers(vec![1, 2, 3, 4])
)]
#[case::not_or_with_negation(
Pred::not(Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
)),
vec![]
)]
fn not_and_or_predicates(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::literal_false(Pred::literal(false), table_for_numbers(vec![]))]
#[case::and_with_literal_false(
Pred::and(column_pred!("number"), Pred::literal(false)),
table_for_numbers(vec![])
)]
#[case::literal_true(
Pred::literal(true),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::from_literal_expr(
Pred::from_expr(Expr::literal(3i64)),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::distinct_value(
column_expr!("number").distinct(Expr::literal(3i64)),
table_for_numbers(vec![1, 2, 4, 5, 6])
)]
#[case::distinct_null(
column_expr!("number").distinct(Expr::null_literal(DataType::LONG)),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::not_distinct_value(
Pred::not(column_expr!("number").distinct(Expr::literal(3i64))),
table_for_numbers(vec![3])
)]
#[case::not_distinct_null(
Pred::not(column_expr!("number").distinct(Expr::null_literal(DataType::LONG))),
table_for_numbers(vec![])
)]
#[case::gt_empty_struct(
column_expr!("number").gt(Expr::struct_from(Vec::<ExpressionRef>::new())),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::not_gt_empty_struct(
Pred::not(column_expr!("number").gt(Expr::struct_from(Vec::<ExpressionRef>::new()))),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
fn invalid_skips_none_predicates(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[test]
fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+-------+",
"| value |",
"+-------+",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"+-------+",
];
read_table_data_str(
"./tests/data/table-with-dv-small/",
None,
Some(Pred::gt(column_expr!("value"), Expr::literal(3))),
expected,
)?;
Ok(())
}
#[tokio::test]
async fn predicate_on_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
// Test for https://github.com/delta-io/delta-kernel-rs/issues/698
let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
];
add_commit(table_root, storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("id=1").join(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("id=2").join(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let predicate = Pred::eq(column_expr!("id"), Expr::literal(2));
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;
let stream = scan.execute(engine)?;
let mut files_scanned = 0;
for engine_data in stream {
let mut result_batch = into_record_batch(engine_data?);
let _ = result_batch.remove_column(result_batch.schema().index_of("id")?);
assert_eq!(&batch, &result_batch);
files_scanned += 1;
}
assert_eq!(1, files_scanned);
Ok(())
}
#[tokio::test]
async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box<dyn std::error::Error>>
{
let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
// Add one file with stats, one file without
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];
// Disable writing Parquet statistics so these cannot be used for pruning row groups
let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();
add_commit(table_root, storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes_with_props(&batch_2, writer_props).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let predicate = Pred::eq(column_expr!("val"), Expr::literal("g"));
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;
let stream = scan.execute(engine)?;
let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?);
assert_eq!(&batch_2, &result_batch);
files_scanned += 1;
}
// One file is scanned as stats are missing so we don't know the predicate isn't satisfied
assert_eq!(1, files_scanned);
Ok(())
}
#[test]
fn short_dv() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+-------+--------------------------+---------------------+",
"| id | value | timestamp | rand |",
"+----+-------+--------------------------+---------------------+",
"| 3 | 3 | 2023-05-31T18:58:33.633Z | 0.7918174793484931 |",
"| 4 | 4 | 2023-05-31T18:58:33.633Z | 0.9281049271981882 |",
"| 5 | 5 | 2023-05-31T18:58:33.633Z | 0.27796520310701633 |",
"| 6 | 6 | 2023-05-31T18:58:33.633Z | 0.15263801464228832 |",
"| 7 | 7 | 2023-05-31T18:58:33.633Z | 0.1981143710215575 |",
"| 8 | 8 | 2023-05-31T18:58:33.633Z | 0.3069439236599195 |",
"| 9 | 9 | 2023-05-31T18:58:33.633Z | 0.5175919190815845 |",
"+----+-------+--------------------------+---------------------+",
];
read_table_data_str("./tests/data/with-short-dv/", None, None, expected)?;
Ok(())
}
#[test]
fn basic_decimal() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----------------+---------+--------------+------------------------+",
"| part | col1 | col2 | col3 |",
"+----------------+---------+--------------+------------------------+",
"| -2342342.23423 | -999.99 | -99999.99999 | -9999999999.9999999999 |",
"| 0.00004 | 0.00 | 0.00000 | 0.0000000000 |",
"| 234.00000 | 1.00 | 2.00000 | 3.0000000000 |",
"| 2342222.23454 | 111.11 | 22222.22222 | 3333333333.3333333333 |",
"+----------------+---------+--------------+------------------------+",
];
read_table_data_str("./tests/data/basic-decimal-table/", None, None, expected)?;
Ok(())
}
#[test]
fn timestamp_ntz() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+----------------------------+----------------------------+",
"| id | tsNtz | tsNtzPartition |",
"+----+----------------------------+----------------------------+",
"| 0 | 2021-11-18T02:30:00.123456 | 2021-11-18T02:30:00.123456 |",
"| 1 | 2013-07-05T17:01:00.123456 | 2021-11-18T02:30:00.123456 |",
"| 2 | | 2021-11-18T02:30:00.123456 |",
"| 3 | 2021-11-18T02:30:00.123456 | 2013-07-05T17:01:00.123456 |",
"| 4 | 2013-07-05T17:01:00.123456 | 2013-07-05T17:01:00.123456 |",
"| 5 | | 2013-07-05T17:01:00.123456 |",
"| 6 | 2021-11-18T02:30:00.123456 | |",
"| 7 | 2013-07-05T17:01:00.123456 | |",
"| 8 | | |",
"+----+----------------------------+----------------------------+",
];
read_table_data_str(
"./tests/data/data-reader-timestamp_ntz/",
None,
None,
expected,
)?;
Ok(())
}
#[test]
fn type_widening_basic() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| byte_long | int_long | float_double | byte_double | short_double | int_double | date_timestamp_ntz |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| 1 | 2 | 3.4000000953674316 | 5.0 | 6.0 | 7.0 | 2024-09-09T00:00:00 |",
"| 9223372036854775807 | 9223372036854775807 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 2024-09-09T12:34:56.123456 |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"byte_long",
"int_long",
"float_double",
"byte_double",
"short_double",
"int_double",
"date_timestamp_ntz",
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}
#[test]
fn type_widening_decimal() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| decimal_decimal_same_scale | decimal_decimal_greater_scale | byte_decimal | short_decimal | int_decimal | long_decimal |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| 123.45 | 67.89000 | 1.0 | 2.0 | 3.0 | 4.0 |",
"| 12345678901234.56 | 12345678901.23456 | 123.4 | 12345.6 | 1234567890.1 | 123456789012345678.9 |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"decimal_decimal_same_scale",
"decimal_decimal_greater_scale",
"byte_decimal",
"short_decimal",
"int_decimal",
"long_decimal",
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}
// Verify that predicates over invalid/missing columns do not cause skipping.
#[test]
fn predicate_references_invalid_missing_column() -> Result<(), Box<dyn std::error::Error>> {
// Attempted skipping over a logically valid but physically missing column. We should be able to
// skip the data file because the missing column is inferred to be all-null.
//
// WARNING: https://github.com/delta-io/delta-kernel-rs/issues/434 -- currently disabled.
//
//let expected = vec![
// "+--------+",
// "| chrono |",
// "+--------+",
// "+--------+",
//];
let columns = &["chrono", "missing"];
let expected = vec![
"+-------------------------------------------------------------------------------------------+---------+",
"| chrono | missing |",
"+-------------------------------------------------------------------------------------------+---------+",
"| {date32: 1971-01-01, timestamp: 1970-02-01T08:00:00Z, timestamp_ntz: 1970-01-02T00:00:00} | |",
"| {date32: 1971-01-02, timestamp: 1970-02-01T09:00:00Z, timestamp_ntz: 1970-01-02T00:01:00} | |",
"| {date32: 1971-01-03, timestamp: 1970-02-01T10:00:00Z, timestamp_ntz: 1970-01-02T00:02:00} | |",
"| {date32: 1971-01-04, timestamp: 1970-02-01T11:00:00Z, timestamp_ntz: 1970-01-02T00:03:00} | |",
"| {date32: 1971-01-05, timestamp: 1970-02-01T12:00:00Z, timestamp_ntz: 1970-01-02T00:04:00} | |",
"+-------------------------------------------------------------------------------------------+---------+",
];
let predicate = column_expr!("missing").lt(Expr::literal(10i64));
read_table_data_str(
"./tests/data/parquet_row_group_skipping/",
Some(columns),
Some(predicate),
expected,
)?;
// Attempted skipping over an invalid (logically missing) column. Ideally this should throw a
// query error, but at a minimum it should not cause incorrect data skipping.
let expected = vec![
"+-------------------------------------------------------------------------------------------+",
"| chrono |",
"+-------------------------------------------------------------------------------------------+",
"| {date32: 1971-01-01, timestamp: 1970-02-01T08:00:00Z, timestamp_ntz: 1970-01-02T00:00:00} |",
"| {date32: 1971-01-02, timestamp: 1970-02-01T09:00:00Z, timestamp_ntz: 1970-01-02T00:01:00} |",
"| {date32: 1971-01-03, timestamp: 1970-02-01T10:00:00Z, timestamp_ntz: 1970-01-02T00:02:00} |",
"| {date32: 1971-01-04, timestamp: 1970-02-01T11:00:00Z, timestamp_ntz: 1970-01-02T00:03:00} |",
"| {date32: 1971-01-05, timestamp: 1970-02-01T12:00:00Z, timestamp_ntz: 1970-01-02T00:04:00} |",
"+-------------------------------------------------------------------------------------------+",
];
let predicate = column_expr!("invalid").lt(Expr::literal(10));
read_table_data_str(
"./tests/data/parquet_row_group_skipping/",
Some(columns),
Some(predicate),
expected,
)
.expect_err("unknown column");
Ok(())
}
// Note: This test is disabled for windows because it creates a directory with name
// `time=1971-07-22T03:06:40.000000Z`. This is disallowed in windows due to having a `:` in
// the name.
#[cfg(not(windows))]
#[test]
fn timestamp_partitioned_table() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+-----+---+----------------------+",
"| id | x | s | time |",
"+----+-----+---+----------------------+",
"| 1 | 0.5 | | 1971-07-22T03:06:40Z |",
"+----+-----+---+----------------------+",
];
let test_name = "timestamp-partitioned-table";
let test_dir = load_test_data("./tests/data", test_name).unwrap();
let test_path = test_dir.path().join(test_name);
read_table_data_str(test_path.to_str().unwrap(), None, None, expected)
}
#[test]
fn compacted_log_files_table() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+--------------------+",
"| id | comment |",
"+----+--------------------+",
"| 0 | new |",
"| 1 | after-large-delete |",
"| 2 | |",
"| 10 | merge1-insert |",
"| 12 | merge2-insert |",
"+----+--------------------+",
];
let test_name = "compacted-log-files-table";
let test_dir = load_test_data("./tests/data", test_name).unwrap();
let test_path = test_dir.path().join(test_name);
read_table_data_str(test_path.to_str().unwrap(), None, None, expected)
}
#[test]
fn unshredded_variant_table() -> Result<(), Box<dyn std::error::Error>> {
let expected = include!("../data/unshredded-variant.expected.in");
let test_name = "unshredded-variant";
let test_dir = load_test_data("./tests/data", test_name).unwrap();
let test_path = test_dir.path().join(test_name);
read_table_data_str(test_path.to_str().unwrap(), None, None, expected)
}
#[tokio::test]
async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Error>> {
// Setup up an in-memory table with different numbers of rows in each file
let batch1 = generate_batch(vec![
("id", vec![1i32, 2, 3, 4, 5].into_array()),
("value", vec!["a", "b", "c", "d", "e"].into_array()),
])?;
let batch2 = generate_batch(vec![
("id", vec![10i32, 20, 30].into_array()),
("value", vec!["x", "y", "z"].into_array()),
])?;
let batch3 = generate_batch(vec![
("id", vec![100i32, 200, 300, 400].into_array()),
("value", vec!["p", "q", "r", "s"].into_array()),
])?;
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
let file_size3 = record_batch_to_bytes(&batch3).len() as u64;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size2),
TestAction::AddWithSize(PARQUET_FILE3.to_string(), file_size3),
]),
)
.await?;
for (parquet_file, batch) in [
(PARQUET_FILE1, &batch1),
(PARQUET_FILE2, &batch2),
(PARQUET_FILE3, &batch3),
] {
storage
.put(
&Path::from(parquet_file),
record_batch_to_bytes(batch).into(),
)
.await?;
}
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
// Create a schema that includes a row index metadata column
let schema = Arc::new(StructType::try_new([
StructField::nullable("id", DataType::INTEGER),
StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
StructField::nullable("value", DataType::STRING),
])?);
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_schema(schema).build()?;
let mut file_count = 0;
let expected_row_counts = [5, 3, 4];
let stream = scan.execute(engine.clone())?;
for data in stream {
let batch = into_record_batch(data?);
file_count += 1;
// Verify the schema structure
assert_eq!(batch.num_columns(), 3, "Expected 3 columns in the batch");
assert_eq!(
batch.schema().field(0).name(),
"id",
"First column should be 'id'"
);
assert_eq!(
batch.schema().field(1).name(),
"row_index",
"Second column should be 'row_index'"
);
assert_eq!(
batch.schema().field(2).name(),
"value",
"Third column should be 'value'"
);
// Each file should have row indexes starting from 0 (file-local indexing)
let row_index_array = batch.column(1).as_primitive::<Int64Type>();
let expected_values: Vec<i64> = (0..batch.num_rows() as i64).collect();
assert_eq!(
row_index_array.values().to_vec(),
expected_values,
"Row index values incorrect for file {} (expected {} rows)",
file_count,
expected_row_counts[file_count - 1]
);
}
assert_eq!(file_count, 3, "Expected to scan 3 files");
Ok(())
}
#[tokio::test]
async fn test_file_path_metadata_column() -> Result<(), Box<dyn std::error::Error>> {
use delta_kernel::arrow::array::{Array, StringArray};
// Set up an in-memory table with multiple data files
let batch1 = generate_batch(vec![
("id", vec![1i32, 2, 3].into_array()),
("value", vec!["a", "b", "c"].into_array()),
])?;
let batch2 = generate_batch(vec![
("id", vec![10i32, 20].into_array()),
("value", vec!["x", "y"].into_array()),
])?;
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size2),
]),
)
.await?;
for (parquet_file, batch) in [(PARQUET_FILE1, &batch1), (PARQUET_FILE2, &batch2)] {
storage
.put(
&Path::from(parquet_file),
record_batch_to_bytes(batch).into(),
)
.await?;
}
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
// Create a schema that includes the file path metadata column
let schema = Arc::new(StructType::try_new([
StructField::nullable("id", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
StructField::nullable("value", DataType::STRING),
])?);
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_schema(schema).build()?;
let mut file_count = 0;
let expected_files = [PARQUET_FILE1, PARQUET_FILE2];
let expected_row_counts = [3, 2];
let stream = scan.execute(engine.clone())?;
for data in stream {
let batch = into_record_batch(data?);
// Verify the schema structure
assert_eq!(batch.num_columns(), 3, "Expected 3 columns in the batch");
assert_eq!(
batch.schema().field(0).name(),
"id",
"First column should be 'id'"
);
assert_eq!(
batch.schema().field(1).name(),
"_file",
"Second column should be '_file'"
);
assert_eq!(
batch.schema().field(2).name(),
"value",
"Third column should be 'value'"
);
// Verify the file path column contains the expected file name
let file_path_array = batch.column(1);
let expected_file_name = expected_files[file_count];
let expected_path = format!("{table_root}{expected_file_name}");
// The file path array should be a plain StringArray with the path repeated for each row.
let string_array = file_path_array
.as_any()
.downcast_ref::<StringArray>()
.expect("File path column should be a StringArray");
assert_eq!(
string_array.len(),
expected_row_counts[file_count],
"File {} should have {} rows",
expected_file_name,
expected_row_counts[file_count]
);
assert!(
string_array
.iter()
.all(|v| v == Some(expected_path.as_str())),
"All rows should contain file path '{expected_path}'"
);
file_count += 1;
}
assert_eq!(file_count, 2, "Expected to scan 2 files");
Ok(())
}
#[tokio::test]
async fn test_unsupported_metadata_columns() -> Result<(), Box<dyn std::error::Error>> {
// Prepare an in-memory table with some data
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::Add(PARQUET_FILE1.to_string()),
]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
// Test that unsupported metadata columns fail with appropriate errors
let test_cases = [
(
"row_id",
MetadataColumnSpec::RowId,
"Row ids are not enabled on this table",
),
(
"row_commit_version",
MetadataColumnSpec::RowCommitVersion,
"Row commit versions not supported",
),
];
for (column_name, metadata_spec, error_text) in test_cases {
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let schema = Arc::new(StructType::try_new([
StructField::nullable("id", DataType::INTEGER),
StructField::create_metadata_column(column_name, metadata_spec),
])?);
let scan_err = snapshot
.scan_builder()
.with_schema(schema)
.build()
.unwrap_err();
let error_msg = scan_err.to_string();
assert!(
error_msg.contains(error_text),
"Expected {error_msg} to contain {error_text}"
);
}
Ok(())
}
#[tokio::test]
async fn test_invalid_files_are_skipped() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::Add(PARQUET_FILE1.to_string()),
TestAction::Add(PARQUET_FILE2.to_string()),
]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let invalid_files = [
"_delta_log/0.zip",
"_delta_log/_copy_into_log/0.zip",
"_delta_log/_ignore_me/00000000000000000000.json",
"_delta_log/_and_me/00000000000000000000.checkpoint.parquet",
"_delta_log/02184.json",
"_delta_log/0x000000000000000000.checkpoint.parquet",
"00000000000000000000.json",
"_delta_log/_staged_commits/_staged_commits/00000000000000000000.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
"_delta_log/my_random_dir/_staged_commits/00000000000000000000.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
"_delta_log/my_random_dir/_delta_log/_staged_commits/00000000000000000000.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
"_delta_log/_delta_log/00000000000000000000.json",
"_delta_log/_delta_log/00000000000000000000.checkpoint.parquet",
"_delta_log/something/_delta_log/00000000000000000000.crc",
"_delta_log/something/_delta_log/00000000000000000000.json",
"_delta_log/something/_delta_log/00000000000000000000.checkpoint.parquet",
];
fn get_file_path_for_test(path: &ParsedLogPath) -> &str {
&path.location.location.as_str()[10..]
}
fn ensure_segment_does_not_contain(invalid_files: &[&str], segment: &LogSegment) {
assert!(
!segment.listed.ascending_commit_files.iter().any(|p| {
let test_path = get_file_path_for_test(p);
invalid_files.contains(&test_path)
}),
"ascending_commit_files contained invalid file"
);
assert!(
!segment.listed.ascending_compaction_files.iter().any(|p| {
let test_path = get_file_path_for_test(p);
invalid_files.contains(&test_path)
}),
"ascending_compaction_files contained invalid file"
);
assert!(
!segment.listed.checkpoint_parts.iter().any(|p| {
let test_path = get_file_path_for_test(p);
invalid_files.contains(&test_path)
}),
"checkpoint_parts contained invalid file"
);
if let Some(ref crc) = segment.listed.latest_crc_file {
assert!(
!invalid_files.contains(&get_file_path_for_test(crc)),
"Latest crc contained invalid file"
);
}
if let Some(ref latest_commit) = segment.listed.latest_commit_file {
assert!(
!invalid_files.contains(&get_file_path_for_test(latest_commit)),
"Latest commit contained invalid file"
);
}
}
for invalid_file in invalid_files.iter() {
let invalid_path = Path::from(*invalid_file);
storage.put(&invalid_path, vec![1u8].into()).await?;
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
ensure_segment_does_not_contain(&invalid_files, snapshot.log_segment());
storage.delete(&invalid_path).await?;
}
// final test with _all_ the files we should ignore
for invalid_file in invalid_files.iter() {
let invalid_path = Path::from(*invalid_file);
storage.put(&invalid_path, vec![1u8].into()).await?;
}
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
ensure_segment_does_not_contain(&invalid_files, snapshot.log_segment());
Ok(())
}
// Verifies data skipping works via stats_parsed across all checkpoint types.
// All tables have writeStatsAsStruct=true, writeStatsAsJson=false (struct stats only),
// schema (id: long, value: string), 5 rows (id 1-5), checkpoint at v5.
// Predicate id > 3 skips files where max(id) <= 3, returning only rows 4 and 5.
#[rstest::rstest]
#[test]
fn checkpoint_stats_skipping(
#[values(
"v1-single-part",
"v1-multi-part",
"v2-parquet-sidecars",
"v2-json-sidecars",
"v2-classic-parquet"
)]
checkpoint_type: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let table_path = format!("./tests/data/{checkpoint_type}-struct-stats-only/");
let expected = vec![
"+----+---------+",
"| id | value |",
"+----+---------+",
"| 4 | value_4 |",
"| 5 | value_5 |",
"+----+---------+",
];
let predicate = column_expr!("id").gt(Expr::literal(3i64));
read_table_data_str(&table_path, None, Some(predicate), expected)?;
Ok(())
}
// Verifies ScanFile.stats handling for parsed-stats checkpoints. Tables have
// writeStatsAsStruct=true, writeStatsAsJson=false (no JSON stats in checkpoint),
// schema (id: long, value: string), 5 files with 1 row each, checkpoint at v5.
// Cross-product covers all five checkpoint variants against four stats option
// shapes: ScanFile.stats should be populated via the COALESCE/ToJson fallback
// when both `json=true` and `struct_stats=All` are set; otherwise null on these
// struct-stats-only checkpoints.
#[rstest::rstest]
#[case::default_json_only(StatsOptions::default(), false)]
#[case::all_both(StatsOptions::all(), true)]
#[case::all_struct_only(StatsOptions::all_struct(), false)]
#[case::none(StatsOptions::none(), false)]
fn struct_stats_surfaced_in_scan_file(
#[values(
"v1-single-part",
"v1-multi-part",
"v2-parquet-sidecars",
"v2-json-sidecars",
"v2-classic-parquet"
)]
checkpoint_type: &str,
#[case] stats: StatsOptions,
#[case] expect_json_stats: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let table_path = format!("./tests/data/{checkpoint_type}-struct-stats-only/");
let path = std::fs::canonicalize(PathBuf::from(table_path))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_stats(stats).build()?;
let mut scan_files = vec![];
for res in scan.scan_metadata(engine.as_ref())? {
let scan_metadata = res?;
scan_files = scan_metadata.visit_scan_files(scan_files, scan_metadata_callback)?;
}
assert_eq!(scan_files.len(), 5, "expected 5 files in checkpoint");
for scan_file in &scan_files {
if expect_json_stats {
assert!(
scan_file.stats.is_some(),
"ScanFile.stats should be populated, path: {}",
scan_file.path
);
assert_eq!(
scan_file.stats.as_ref().unwrap().num_records,
1,
"each file has exactly 1 row, path: {}",
scan_file.path
);
} else {
assert!(
scan_file.stats.is_none(),
"ScanFile.stats should be None, path: {}",
scan_file.path
);
}
}
Ok(())
}
// On a JSON-only table (no parsed-stats checkpoint), `has_stats_parsed` is
// false and the COALESCE branch is never selected, so `StatsOptions::all_struct`
// vs `StatsOptions::default` is a no-op for the `stats` JSON column: `add.stats`
// is read directly from the commit JSON either way.
#[rstest::rstest]
fn struct_stats_only_no_op_on_json_only_table(
#[values(StatsOptions::default(), StatsOptions::all_struct())] stats: StatsOptions,
) -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/"))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_stats(stats).build()?;
let mut scan_files = vec![];
for res in scan.scan_metadata(engine.as_ref())? {
let scan_metadata = res?;
scan_files = scan_metadata.visit_scan_files(scan_files, scan_metadata_callback)?;
}
assert!(!scan_files.is_empty());
for scan_file in &scan_files {
assert!(
scan_file.stats.is_some(),
"JSON commits populate stats from add.stats"
);
}
Ok(())
}
// Confirms `stats_parsed` is still consumed by data skipping when only struct
// stats are requested (no JSON synthesis). Predicate `id > 3` should skip files
// where `max(id) <= 3`, returning rows 4 and 5 only. If `all_struct` accidentally
// disabled the stats_parsed read path, all 5 rows would come back.
#[rstest::rstest]
fn struct_stats_only_preserves_data_skipping(
#[values(
"v1-single-part",
"v1-multi-part",
"v2-parquet-sidecars",
"v2-json-sidecars",
"v2-classic-parquet"
)]
checkpoint_type: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let table_path = format!("./tests/data/{checkpoint_type}-struct-stats-only/");
let path = std::fs::canonicalize(PathBuf::from(table_path))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
let predicate: PredicateRef = Arc::new(column_expr!("id").gt(Expr::literal(3i64)));
let scan = snapshot
.scan_builder()
.with_stats(StatsOptions::all_struct())
.with_predicate(predicate)
.build()?;
let mut scan_files = vec![];
for res in scan.scan_metadata(engine.as_ref())? {
let scan_metadata = res?;
scan_files = scan_metadata.visit_scan_files(scan_files, scan_metadata_callback)?;
}
assert_eq!(
scan_files.len(),
2,
"data skipping via stats_parsed should leave only 2 files (id=4, id=5)"
);
for scan_file in &scan_files {
assert!(
scan_file.stats.is_none(),
"ScanFile.stats must remain null when synthesis is skipped, path: {}",
scan_file.path
);
}
Ok(())
}
// Multi-part V1 checkpoint with partitionValues_parsed on a partitioned table.
// Schema: (id: long, value: string, part: int) partitioned by part.
// Each commit inserts one row with part = i % 3 (parts 0, 1, 2).
#[test]
fn partition_values_parsed_skipping() -> Result<(), Box<dyn std::error::Error>> {
// Predicate part = 0 should skip partitions 1 and 2, returning rows with part=0.
// i % 3 == 0: i=3 -> (3, "value_3", 0)
let expected = vec![
"+----+---------+------+",
"| id | value | part |",
"+----+---------+------+",
"| 3 | value_3 | 0 |",
"+----+---------+------+",
];
let predicate = column_expr!("part").eq(Expr::literal(0i32));
read_table_data_str(
"./tests/data/v1-multi-part-partitioned-struct-stats-only/",
None,
Some(predicate),
expected,
)?;
Ok(())
}
// In-memory test with crafted truncated JSON stats. Three files:
// file 1: ts_col [1s, 2s] -- max at ms boundary
// file 2: ts_col [3s, 4.000500s] -- JSON max truncated to 4.000s
// file 3: ts_col [7s, 8s] -- max at ms boundary
//
// Predicate `ts_col > 4_000_400us`:
// file 1: max=2s << adjusted predicate (3_999_401) -> pruned (skipping works)
// file 2: truncated max=4s > adjusted predicate (3_999_401) -> kept (truncation safe)
// file 3: max=8s >> adjusted predicate -> kept
#[tokio::test]
async fn timestamp_max_stat_truncation_does_not_over_prune(
) -> Result<(), Box<dyn std::error::Error>> {
let ts_metadata = "{\
\"id\":\"test-ts-table\",\
\"format\":{\"provider\":\"parquet\",\"options\":{}},\
\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[\
{\\\"name\\\":\\\"ts_col\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}\
]}\",\
\"partitionColumns\":[],\
\"configuration\":{},\
\"createdTime\":1700000000000\
}";
let ts_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"ts_col",
delta_kernel::arrow::datatypes::DataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".into()),
),
true,
)]));
// file1: max at ms boundary, will be pruned
let file1_stats = r#"{"numRecords":2,"nullCount":{"ts_col":0},"minValues":{"ts_col":"1970-01-01T00:00:01.000Z"},"maxValues":{"ts_col":"1970-01-01T00:00:02.000Z"}}"#;
// file2: max truncated from 4.000500s to 4.000s -- truncation adjustment must keep this
let file2_stats = r#"{"numRecords":2,"nullCount":{"ts_col":0},"minValues":{"ts_col":"1970-01-01T00:00:03.000Z"},"maxValues":{"ts_col":"1970-01-01T00:00:04.000Z"}}"#;
// file3: max clearly above predicate
let file3_stats = r#"{"numRecords":2,"nullCount":{"ts_col":0},"minValues":{"ts_col":"1970-01-01T00:00:07.000Z"},"maxValues":{"ts_col":"1970-01-01T00:00:08.000Z"}}"#;
let batch1 = RecordBatch::try_new(
ts_schema.clone(),
vec![Arc::new(
TimestampMicrosecondArray::from(vec![1_000_000i64, 2_000_000]).with_timezone("UTC"),
)],
)?;
let batch2 = RecordBatch::try_new(
ts_schema.clone(),
vec![Arc::new(
TimestampMicrosecondArray::from(vec![3_000_000i64, 4_000_500]).with_timezone("UTC"),
)],
)?;
let batch3 = RecordBatch::try_new(
ts_schema,
vec![Arc::new(
TimestampMicrosecondArray::from(vec![7_000_000i64, 8_000_000]).with_timezone("UTC"),
)],
)?;
let file1_bytes = record_batch_to_bytes(&batch1);
let file2_bytes = record_batch_to_bytes(&batch2);
let file3_bytes = record_batch_to_bytes(&batch3);
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let make_add = |name: &str, size: usize, stats: &str| -> String {
format!(
"{{\"add\":{{\"path\":\"{name}\",\"partitionValues\":{{}},\"size\":{size},\
\"modificationTime\":1700000000000,\"dataChange\":true,\
\"stats\":\"{stats_escaped}\"}}}}",
stats_escaped = stats.replace('"', "\\\""),
)
};
let commit0 = format!(
"{{\"protocol\":{{\"minReaderVersion\":1,\"minWriterVersion\":2}}}}\n\
{{\"metaData\":{ts_metadata}}}\n\
{}",
make_add("file1.parquet", file1_bytes.len(), file1_stats)
);
add_commit(table_root, storage.as_ref(), 0, commit0).await?;
add_commit(
table_root,
storage.as_ref(),
1,
make_add("file2.parquet", file2_bytes.len(), file2_stats),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
2,
make_add("file3.parquet", file3_bytes.len(), file3_stats),
)
.await?;
storage
.put(&Path::from("file1.parquet"), file1_bytes.into())
.await?;
storage
.put(&Path::from("file2.parquet"), file2_bytes.into())
.await?;
storage
.put(&Path::from("file3.parquet"), file3_bytes.into())
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let row_count = |predicate_us: i64| -> Result<usize, Box<dyn std::error::Error>> {
let predicate = Arc::new(Pred::gt(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(predicate_us)),
));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()?;
let batches = read_scan(&scan, engine.clone())?;
Ok(batches.iter().map(|b| b.num_rows()).sum())
};
// Mid-ms value (4.000400s): adjusted to 3_999_401
// file1 max=2s < 3_999_401 -> pruned; file2+3 kept (4 rows)
assert_eq!(row_count(4_000_400)?, 4, "mid-ms: file2+file3 kept");
// Exact ms boundary (4.000000s = truncated max of file2): adjusted to 3_999_001
// file1 max=2s < 3_999_001 -> pruned; file2 max=4s > 3_999_001 -> kept (4 rows)
assert_eq!(
row_count(4_000_000)?,
4,
"exact ms boundary: file2+file3 kept"
);
// 1us above ms boundary (4.000001s): adjusted to 3_999_002
// file1 pruned; file2 max=4s > 3_999_002 -> kept (4 rows)
assert_eq!(row_count(4_000_001)?, 4, "1us above ms: file2+file3 kept");
// 998us above ms boundary (4.000998s): adjusted to 3_999_999
// file2 max=4s > 3_999_999 -> kept (just not prunable)
assert_eq!(
row_count(4_000_998)?,
4,
"just not prunable: file2+file3 kept"
);
// 999us above ms boundary (4.000999s): adjusted to 4_000_000
// file2 max=4s == 4_000_000 -> NOT strictly greater -> pruned (just prunable)
assert_eq!(row_count(4_000_999)?, 2, "just prunable: only file3 kept");
// Next ms boundary (4.001000s): adjusted to 4_000_001
// file2 max=4s < 4_000_001 -> pruned (2 rows)
assert_eq!(
row_count(4_001_000)?,
2,
"next ms boundary: only file3 kept"
);
Ok(())
}
// Regression test for https://github.com/delta-io/delta-kernel-rs/issues/739
// Void columns should be present in scan results as all-null columns.
#[tokio::test]
async fn read_table_with_void_column() -> Result<(), Box<dyn std::error::Error>> {
// Parquet batch has only the non-void column (parquet cannot represent void)
let batch = generate_batch(vec![("id", vec![1, 2, 3].into_array())])?;
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"test-void","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"void_col\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
// The table schema has both "id" and "void_col"
assert!(snapshot.schema().field("void_col").is_some());
assert!(snapshot.schema().field("id").is_some());
let scan = snapshot.scan_builder().build()?;
// The scan's logical schema should contain the void column (returned as null)
let logical_schema = scan.logical_schema();
assert!(logical_schema.field("void_col").is_some());
assert!(logical_schema.field("id").is_some());
assert_eq!(logical_schema.fields().count(), 2);
// Execute the scan and verify void column appears as all-null
let batches = read_scan(&scan, engine)?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_columns(), 2);
assert_eq!(batches[0].schema().field(0).name(), "id");
assert_eq!(batches[0].schema().field(1).name(), "void_col");
assert_eq!(batches[0].num_rows(), 3);
// void column should be Arrow Null type
assert_eq!(
*batches[0].schema().field(1).data_type(),
ArrowDataType::Null
);
Ok(())
}
// End-to-end tests using a Spark-written Delta table with real truncated JSON stats.
// Table has three files:
// file 1: id=[1,2], ts_col=[1s, 2s] -- max at ms boundary
// file 2: id=[3,4], ts_col=[3s, 4.000500s] -- max truncated to 4.000s in JSON stats
// file 3: id=[5,6], ts_col=[7s, 8s] -- max at ms boundary
//
// Predicate value 4.000400s sits between the truncated max (4.000s) and actual max
// (4.000500s) of file 2, exercising the truncation adjustment.
// GT: file1 pruned (max=2s < adjusted 3.999401s), file2+3 kept
#[test]
fn timestamp_truncation_real_table_gt() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::gt(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"| 5 | 1970-01-01T00:00:07Z |",
"| 6 | 1970-01-01T00:00:08Z |",
"+----+-----------------------------+",
],
)
}
// Verify that an explicit projection including a void column returns it as null.
#[tokio::test]
async fn explicit_projection_with_void_column_returns_nulls(
) -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_batch(vec![("id", vec![1, 2, 3].into_array())])?;
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"test-void","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"void_col\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
// Explicitly request both columns, including the void one
let schema = snapshot.schema();
let scan = snapshot.scan_builder().with_schema(schema).build()?;
// The void column should be present in the logical schema (returned as null)
let logical_schema = scan.logical_schema();
assert!(logical_schema.field("void_col").is_some());
assert!(logical_schema.field("id").is_some());
assert_eq!(logical_schema.fields().count(), 2);
// Execute and verify void column appears as all-null
let batches = read_scan(&scan, engine)?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_columns(), 2);
assert_eq!(batches[0].schema().field(0).name(), "id");
assert_eq!(batches[0].schema().field(1).name(), "void_col");
assert_eq!(
*batches[0].schema().field(1).data_type(),
ArrowDataType::Null
);
Ok(())
}
// Verify that void fields inside nested structs are preserved in the scan schema and data.
// Delta schema: {id: int, info: struct<name: string, v: void>}
// Parquet schema: {id: int, info: struct<name: string>} (void field missing from Parquet)
// Result: void field appears as all-null in the returned data.
#[tokio::test]
async fn read_table_with_void_in_nested_struct() -> Result<(), Box<dyn std::error::Error>> {
// Parquet batch: {id: int, info: struct<name: string>} (without the void field)
let name_array: ArrayRef = Arc::new(delta_kernel::arrow::array::StringArray::from(vec![
"alice", "bob",
]));
let inner_fields = vec![delta_kernel::arrow::datatypes::Field::new(
"name",
ArrowDataType::Utf8,
true,
)];
let struct_array =
delta_kernel::arrow::array::StructArray::new(inner_fields.into(), vec![name_array], None);
let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(delta_kernel::arrow::array::Int32Array::from(vec![1, 2])) as ArrayRef,
),
("info", Arc::new(struct_array) as ArrayRef),
])?;
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"test-void-nested","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
// Table schema has the void field inside nested struct
let table_schema = snapshot.schema();
let info_field = table_schema.field("info").expect("info should exist");
if let DataType::Struct(inner) = info_field.data_type() {
assert!(
inner.field("v").is_some(),
"table schema should have void field 'v'"
);
}
let scan = snapshot.scan_builder().build()?;
// Scan logical schema should preserve the void field in the nested struct
let logical_schema = scan.logical_schema();
let info_field = logical_schema
.field("info")
.expect("info should exist in scan");
if let DataType::Struct(inner) = info_field.data_type() {
assert!(
inner.field("v").is_some(),
"void field 'v' should be preserved"
);
assert!(
inner.field("name").is_some(),
"non-void field 'name' should remain"
);
assert_eq!(inner.fields().count(), 2);
} else {
panic!("info should be a struct type");
}
// Execute the scan and verify void field appears as null in the struct
let batches = read_scan(&scan, engine)?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
// The info struct should have 2 fields: name (string) and v (null)
let info_col = batches[0].column_by_name("info").expect("info column");
let info_struct = info_col
.as_any()
.downcast_ref::<delta_kernel::arrow::array::StructArray>()
.expect("info should be struct");
assert_eq!(info_struct.num_columns(), 2);
assert_eq!(info_struct.column_by_name("name").unwrap().len(), 2);
let v_col = info_struct
.column_by_name("v")
.expect("v field should exist");
assert_eq!(*v_col.data_type(), ArrowDataType::Null);
assert_eq!(v_col.len(), 2);
Ok(())
}
// Integration test using a real Delta table created by Spark with a void column.
// Verifies that the kernel returns void column as all-null on reads.
#[test]
fn read_spark_table_with_void_column() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/void-column",
None, // SELECT * -- void column should appear as null
None,
vec![
"+----+----------+",
"| id | void_col |",
"+----+----------+",
"| 1 | |",
"| 2 | |",
"| 3 | |",
"+----+----------+",
],
)
}
// GE: file1 pruned (max=2s < adjusted 3.999401s), file2+3 kept
#[test]
fn timestamp_truncation_real_table_ge() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::ge(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"| 5 | 1970-01-01T00:00:07Z |",
"| 6 | 1970-01-01T00:00:08Z |",
"+----+-----------------------------+",
],
)
}
// LT: file3 pruned (min=7s > 4.000400s), file1+2 kept
#[test]
fn timestamp_truncation_real_table_lt() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::lt(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 1 | 1970-01-01T00:00:01Z |",
"| 2 | 1970-01-01T00:00:02Z |",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"+----+-----------------------------+",
],
)
}
// LE: file3 pruned (min=7s > 4.000400s), file1+2 kept
#[test]
fn timestamp_truncation_real_table_le() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::le(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 1 | 1970-01-01T00:00:01Z |",
"| 2 | 1970-01-01T00:00:02Z |",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"+----+-----------------------------+",
],
)
}
// EQ: file1 pruned (max=2s < adjusted 3.999401s), file3 pruned (min=7s > 4.000400s).
// Only file2 kept (ids 3,4).
#[test]
fn timestamp_truncation_real_table_eq() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::eq(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"+----+-----------------------------+",
],
)
}
async fn scan_with_void_schema(
schema_string: &str,
table_id: &str,
num_rows: i32,
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
let batch = generate_batch(vec![(
"id",
(1..=num_rows).collect::<Vec<_>>().into_array(),
)])?;
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
format!(
r#"{{"metaData":{{"id":"{table_id}","format":{{"provider":"parquet","options":{{}}}},"schemaString":"{schema_string}","partitionColumns":[],"configuration":{{}},"createdTime":1587968585495}}}}"#
),
format!(
r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#
),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
Ok(read_scan(&scan, engine)?)
}
// Verify that a table with void nested inside an Array can be read at runtime.
// Write-time validation rejects void-in-array, but reads and metadata ops always work.
// The arr column is absent from the Parquet schema; kernel synthesizes a null column at read
// time (the entire column is NULL, not an array of null elements).
#[tokio::test]
async fn read_void_in_array_type_ok() -> Result<(), Box<dyn std::error::Error>> {
let schema_string = r#"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"arr\",\"type\":{\"type\":\"array\",\"elementType\":\"void\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]}"#;
let batches = scan_with_void_schema(schema_string, "test-void-array", 2).await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
let arr_col = batches[0]
.column_by_name("arr")
.expect("arr column must be in RecordBatch");
assert_eq!(
arr_col.null_count(),
arr_col.len(),
"entire column should be null"
);
match arr_col.data_type() {
ArrowDataType::List(elem) => assert_eq!(*elem.data_type(), ArrowDataType::Null),
other => panic!("expected List<Null>, got {other:?}"),
}
Ok(())
}
// Verify that a table with void nested inside a Map value can be read at runtime.
// Write-time validation rejects void-in-map, but reads and metadata ops always work.
// The m column is absent from the Parquet schema; kernel synthesizes a null column at read
// time (the entire column is NULL, not a map with null entries).
#[tokio::test]
async fn read_void_in_map_type_ok() -> Result<(), Box<dyn std::error::Error>> {
let schema_string = r#"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"m\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"void\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}"#;
let batches = scan_with_void_schema(schema_string, "test-void-map", 2).await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
let m_col = batches[0]
.column_by_name("m")
.expect("m column must be in RecordBatch");
assert_eq!(
m_col.null_count(),
m_col.len(),
"entire column should be null"
);
match m_col.data_type() {
ArrowDataType::Map(entries, _) => {
let entries = entries.data_type();
match entries {
ArrowDataType::Struct(fields) => {
assert_eq!(*fields[0].data_type(), ArrowDataType::Utf8);
assert_eq!(*fields[1].data_type(), ArrowDataType::Null);
}
other => panic!("expected Struct inside Map, got {other:?}"),
}
}
other => panic!("expected Map<String,Null>, got {other:?}"),
}
Ok(())
}
// Verify runtime read of a struct where all fields are void.
// Delta schema: {id: int, s: struct<x: void, y: void>}
// Parquet has only {id} — the entire struct is missing and must materialize as null.
#[tokio::test]
async fn read_all_void_struct() -> Result<(), Box<dyn std::error::Error>> {
let schema_string = r#"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"s\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}},{\"name\":\"y\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}"#;
let batches = scan_with_void_schema(schema_string, "test-all-void-struct", 3).await?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
let s_col = batches[0].column_by_name("s").expect("s column");
let s_struct = s_col
.as_any()
.downcast_ref::<delta_kernel::arrow::array::StructArray>()
.expect("s should be struct");
assert_eq!(s_struct.num_columns(), 2);
let x_col = s_struct.column_by_name("x").expect("x field");
let y_col = s_struct.column_by_name("y").expect("y field");
assert_eq!(*x_col.data_type(), ArrowDataType::Null);
assert_eq!(*y_col.data_type(), ArrowDataType::Null);
assert_eq!(x_col.len(), 3);
assert_eq!(y_col.len(), 3);
Ok(())
}
// Verify that a table where ALL field are void can still be read (returns all-null rows).
// Reads always succeed; only writes fail for all-void tables.
#[tokio::test]
async fn read_all_void_table() -> Result<(), Box<dyn std::error::Error>> {
// Empty Parquet file (no columns) — the row count comes from the Parquet metadata
let batch = RecordBatch::new_empty(Arc::new(ArrowSchema::empty()));
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"test-all-void","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
// Schema should contain both void columns
assert_eq!(snapshot.schema().fields().count(), 2);
assert!(snapshot.schema().field("a").is_some());
assert!(snapshot.schema().field("b").is_some());
let scan = snapshot.scan_builder().build()?;
let logical_schema = scan.logical_schema();
assert_eq!(logical_schema.fields().count(), 2);
// Execute read_scan — runtime must not crash even with all-void schema.
// Parquet file has 0 columns/0 rows → empty row group → reader produces no batches.
// The key verification is that read_scan() doesn't error.
let batches = read_scan(&scan, engine)?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 0);
Ok(())
}
// Verify that a void column used as a partition column works on reads.
// The partition value is missing (null), so parse_partition_value_raw returns Scalar::Null(VOID).
#[tokio::test]
async fn read_table_with_void_partition_column() -> Result<(), Box<dyn std::error::Error>> {
// Parquet file has only the non-partition, non-void column
let batch = generate_batch(vec![("id", vec![1, 2].into_array())])?;
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"void_part\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"test-void-part","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"void_part\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["void_part"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let logical_schema = scan.logical_schema();
assert!(logical_schema.field("void_part").is_some());
assert!(logical_schema.field("id").is_some());
// Execute and verify both columns appear
let batches = read_scan(&scan, engine)?;
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 2);
// void partition column should appear as Null type
let void_col = batches[0]
.column_by_name("void_part")
.expect("void partition column");
assert_eq!(*void_col.data_type(), ArrowDataType::Null);
assert_eq!(void_col.len(), 2);
Ok(())
}
// Verify that predicate pushdown on a void column works correctly.
// A predicate like `void_col IS NULL` is always true for void columns, so no rows are skipped.
#[tokio::test]
async fn read_with_predicate_on_void_column() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_batch(vec![("id", vec![1, 2, 3].into_array())])?;
let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"test-void-pred","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"void_col\",\"type\":\"void\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true}}}}"#),
];
add_commit("memory:///", storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(location).build(engine.as_ref())?;
// Predicate: void_col IS NULL — always true for void, should return all rows
let predicate = Arc::new(column_expr!("void_col").is_null());
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()?;
let batches = read_scan(&scan, engine.clone())?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3, "IS NULL on void should return all rows");
// Predicate: void_col IS NOT NULL — always false for void. The Add action above has no
// `stats` string, so kernel has nothing to skip on. All rows are returned.
// Skipping driven by `nullCount` is exercised by `void_predicate_skips_via_null_count`.
let predicate_not_null = Arc::new(column_expr!("void_col").is_not_null());
let scan_not_null = snapshot
.scan_builder()
.with_predicate(predicate_not_null)
.build()?;
let batches_not_null = read_scan(&scan_not_null, engine)?;
let total_rows_not_null: usize = batches_not_null.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows_not_null, 3,
"IS NOT NULL on void: no row-level filtering, all rows returned"
);
Ok(())
}
// File-level skipping via the `nullCount` Delta stat for void columns. The Spark fixture's 3
// Add actions each carry stats with `nullCount.void_col == numRecords == 1`, so kernel can prune
// every file for `IS NOT NULL`. Contrast with `read_with_predicate_on_void_column`, whose Add
// action has no `stats` and consequently returns all 3 rows for the same predicate -- confirming
// that pruning (not row-level filtering) is what produces the empty result here.
#[rstest::rstest]
#[case::is_null(
column_expr!("void_col").is_null(),
vec![
"+----+----------+",
"| id | void_col |",
"+----+----------+",
"| 1 | |",
"| 2 | |",
"| 3 | |",
"+----+----------+",
]
)]
#[case::is_not_null(column_expr!("void_col").is_not_null(), vec![])]
fn void_predicate_skips_via_null_count(
#[case] predicate: Pred,
#[case] expected: Vec<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str("./tests/data/void-column", None, Some(predicate), expected)
}
// Direct evidence that void-column predicates drive file-level pruning: count the surviving
// ScanFiles after `scan_metadata` rather than relying on the empty-batches contrast. With the
// Spark fixture's `nullCount.void_col == numRecords` stats, `IS NOT NULL` prunes all 3 files
// and `IS NULL` keeps all 3.
#[rstest::rstest]
#[case::is_null_keeps_all(column_expr!("void_col").is_null(), 3)]
#[case::is_not_null_prunes_all(column_expr!("void_col").is_not_null(), 0)]
fn void_predicate_pruning_scan_file_count(
#[case] predicate: Pred,
#[case] expected_files: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/void-column"))?;
let url = Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;
let mut scan_files: Vec<ScanFile> = vec![];
for res in scan.scan_metadata(engine.as_ref())? {
scan_files = res?.visit_scan_files(scan_files, scan_metadata_callback)?;
}
assert_eq!(scan_files.len(), expected_files);
Ok(())
}