use std::path::PathBuf;
use std::sync::Arc;
use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::arrow::array::{AsArray as _, RecordBatch, TimestampMicrosecondArray};
use delta_kernel::arrow::compute::{concat_batches, filter_record_batch};
use delta_kernel::arrow::datatypes::{
Field as ArrowField, Int64Type, Schema as ArrowSchema, TimeUnit,
};
use delta_kernel::engine::arrow_conversion::TryFromKernel as _;
use delta_kernel::engine::arrow_data::EngineDataArrowExt as _;
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::expressions::{
column_expr, column_pred, Expression as Expr, ExpressionRef, Predicate as Pred, Scalar,
};
use delta_kernel::log_segment::LogSegment;
use delta_kernel::object_store::{memory::InMemory, path::Path, ObjectStoreExt as _};
use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties};
use delta_kernel::path::ParsedLogPath;
use delta_kernel::scan::state::{transform_to_logical, ScanFile};
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType};
use delta_kernel::{Engine, FileMeta, Snapshot};
use itertools::Itertools;
use test_utils::{
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
load_test_data, read_scan, record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray,
TestAction, METADATA,
};
use url::Url;
mod common;
const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet";
const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet";
const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet";
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
trait PathExt {
fn join(&self, other: &str) -> Self;
}
#[cfg(all(feature = "arrow-57", not(feature = "arrow-58")))]
impl PathExt for Path {
fn join(&self, other: &str) -> Self {
self.child(other)
}
}
fn make_top_level_fields_nullable(batch: &RecordBatch) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(
batch
.schema()
.fields()
.iter()
.map(|f| ArrowField::new(f.name(), f.data_type().clone(), true))
.collect::<Vec<_>>(),
));
RecordBatch::try_new(schema, batch.columns().to_vec()).unwrap()
}
#[tokio::test]
async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let parquet_bytes = record_batch_to_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size),
]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let expected = make_top_level_fields_nullable(&batch);
let expected_data = vec![expected.clone(), expected];
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().build()?;
let mut files = 0;
let stream = scan.execute(engine)?.zip(expected_data);
for (data, expected) in stream {
files += 1;
assert_eq!(into_record_batch(data?), expected);
}
assert_eq!(2, files, "Expected to have scanned two files");
Ok(())
}
#[tokio::test]
async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let parquet_bytes = record_batch_to_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
1,
actions_to_string(vec![TestAction::AddWithSize(
PARQUET_FILE2.to_string(),
file_size,
)]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
let expected = make_top_level_fields_nullable(&batch);
let expected_data = vec![expected.clone(), expected];
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let scan = snapshot.scan_builder().build()?;
let mut files = 0;
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);
for (data, expected) in stream {
files += 1;
assert_eq!(into_record_batch(data?), expected);
}
assert_eq!(2, files, "Expected to have scanned two files");
Ok(())
}
#[tokio::test]
async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let parquet_bytes = record_batch_to_bytes(&batch);
let file_size = parquet_bytes.len() as u64;
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size),
]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
1,
actions_to_string(vec![TestAction::AddWithSize(
PARQUET_FILE2.to_string(),
file_size,
)]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
2,
actions_to_string(vec![TestAction::RemoveWithSize(
PARQUET_FILE2.to_string(),
file_size,
)]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
let expected = make_top_level_fields_nullable(&batch);
let expected_data = vec![expected];
let snapshot = Snapshot::builder_for(table_root).build(&engine)?;
let scan = snapshot.scan_builder().build()?;
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);
let mut files = 0;
for (data, expected) in stream {
files += 1;
assert_eq!(into_record_batch(data?), expected);
}
assert_eq!(1, files, "Expected to have scanned one file");
Ok(())
}
#[tokio::test]
async fn stats() -> Result<(), Box<dyn std::error::Error>> {
fn generate_commit2(actions: Vec<TestAction>) -> String {
actions
.into_iter()
.map(|test_action| match test_action {
TestAction::Add(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
TestAction::Remove(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
TestAction::Metadata => METADATA.into(),
TestAction::AddWithSize(path, size) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":{size},"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
TestAction::RemoveWithSize(path, size) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":{size},"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
})
.fold(String::new(), |a, b| a + &b + "\n")
}
let batch1 = make_top_level_fields_nullable(&generate_simple_batch()?);
let batch2 = make_top_level_fields_nullable(&generate_batch(vec![
("id", vec![5, 7].into_array()),
("val", vec!["e", "g"].into_array()),
])?);
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
]),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
1,
generate_commit2(vec![TestAction::AddWithSize(
PARQUET_FILE2.to_string(),
file_size2,
)]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch1).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch2).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
#[allow(clippy::type_complexity)] let test_cases: Vec<(fn(Expr, Expr) -> _, _, _)> = vec![
(Pred::eq, 0i32, vec![]),
(Pred::eq, 1, vec![&batch1]),
(Pred::eq, 3, vec![&batch1]),
(Pred::eq, 4, vec![]),
(Pred::eq, 5, vec![&batch2]),
(Pred::eq, 7, vec![&batch2]),
(Pred::eq, 8, vec![]),
(Pred::lt, 1, vec![]),
(Pred::lt, 2, vec![&batch1]),
(Pred::lt, 5, vec![&batch1]),
(Pred::lt, 6, vec![&batch2, &batch1]),
(Pred::le, 0, vec![]),
(Pred::le, 1, vec![&batch1]),
(Pred::le, 4, vec![&batch1]),
(Pred::le, 5, vec![&batch2, &batch1]),
(Pred::gt, 2, vec![&batch2, &batch1]),
(Pred::gt, 3, vec![&batch2]),
(Pred::gt, 6, vec![&batch2]),
(Pred::gt, 7, vec![]),
(Pred::ge, 3, vec![&batch2, &batch1]),
(Pred::ge, 4, vec![&batch2]),
(Pred::ge, 7, vec![&batch2]),
(Pred::ge, 8, vec![]),
(Pred::ne, 0, vec![&batch2, &batch1]),
(Pred::ne, 1, vec![&batch2, &batch1]),
(Pred::ne, 3, vec![&batch2, &batch1]),
(Pred::ne, 4, vec![&batch2, &batch1]),
(Pred::ne, 5, vec![&batch2, &batch1]),
(Pred::ne, 7, vec![&batch2, &batch1]),
(Pred::ne, 8, vec![&batch2, &batch1]),
];
for (pred_fn, value, expected_batches) in test_cases {
let predicate = pred_fn(column_expr!("id"), Expr::literal(value));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(Arc::new(predicate.clone()))
.build()?;
let expected_files = expected_batches.len();
let mut files_scanned = 0;
let stream = scan.execute(engine.clone())?.zip(expected_batches);
for (batch, expected) in stream {
files_scanned += 1;
assert_eq!(into_record_batch(batch?), expected.clone());
}
assert_eq!(expected_files, files_scanned, "{predicate:?}");
}
Ok(())
}
fn read_with_execute(
engine: Arc<dyn Engine>,
scan: &Scan,
expected: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
let result_schema = Arc::new(ArrowSchema::try_from_kernel(
scan.logical_schema().as_ref(),
)?);
let batches = read_scan(scan, engine)?;
if expected.is_empty() {
assert_eq!(batches.len(), 0);
} else {
let batch = concat_batches(&result_schema, &batches)?;
assert_batches_sorted_eq!(expected, &[batch]);
}
Ok(())
}
fn scan_metadata_callback(batches: &mut Vec<ScanFile>, scan_file: ScanFile) {
batches.push(scan_file);
}
fn read_with_scan_metadata(
location: &Url,
engine: &dyn Engine,
scan: &Scan,
expected: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
let result_schema = Arc::new(ArrowSchema::try_from_kernel(
scan.logical_schema().as_ref(),
)?);
let scan_metadata = scan.scan_metadata(engine)?;
let mut scan_files = vec![];
for res in scan_metadata {
let scan_metadata = res?;
scan_files = scan_metadata.visit_scan_files(scan_files, scan_metadata_callback)?;
}
let mut batches = vec![];
for scan_file in scan_files.into_iter() {
let file_path = location.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine, location)
.unwrap();
let meta = FileMeta {
last_modified: 0,
size: scan_file.size.try_into().unwrap(),
location: file_path,
};
let read_results = engine
.parquet_handler()
.read_parquet_files(
&[meta],
scan.physical_schema().clone(),
scan.physical_predicate().clone(),
)
.unwrap();
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();
let logical = transform_to_logical(
engine,
read_result,
scan.physical_schema(),
scan.logical_schema(),
scan_file.transform.clone(),
)
.unwrap();
let record_batch = logical.try_into_record_batch()?;
let rest = split_vector(selection_vector.as_mut(), len, Some(true));
let batch = if let Some(mask) = selection_vector.clone() {
filter_record_batch(&record_batch, &mask.into()).unwrap()
} else {
record_batch
};
selection_vector = rest;
batches.push(batch);
}
}
if expected.is_empty() {
assert_eq!(batches.len(), 0);
} else {
let batch = concat_batches(&result_schema, &batches)?;
assert_batches_sorted_eq!(expected, &[batch]);
}
Ok(())
}
fn read_table_data(
path: &str,
select_cols: Option<&[&str]>,
predicate: Option<Pred>,
mut expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from(path))?;
let predicate = predicate.map(Arc::new);
let url = url::Url::from_directory_path(path).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?;
let read_schema = select_cols.map(|select_cols| {
let table_schema = snapshot.schema();
let selected_fields = select_cols
.iter()
.map(|col| table_schema.field(col).cloned().unwrap());
Arc::new(Schema::new_unchecked(selected_fields))
});
println!("Read {url:?} with schema {read_schema:#?} and predicate {predicate:#?}");
let scan = snapshot
.scan_builder()
.with_schema_opt(read_schema)
.with_predicate(predicate.clone())
.build()?;
sort_lines!(expected);
read_with_scan_metadata(&url, engine.as_ref(), &scan, &expected)?;
read_with_execute(engine, &scan, &expected)?;
Ok(())
}
fn read_table_data_str(
path: &str,
select_cols: Option<&[&str]>,
predicate: Option<Pred>,
expected: Vec<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
path,
select_cols,
predicate,
expected.into_iter().map(String::from).collect(),
)
}
#[test]
fn data() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+--------+--------+---------+",
"| letter | number | a_float |",
"+--------+--------+---------+",
"| | 6 | 6.6 |",
"| a | 1 | 1.1 |",
"| a | 4 | 4.4 |",
"| b | 2 | 2.2 |",
"| c | 3 | 3.3 |",
"| e | 5 | 5.5 |",
"+--------+--------+---------+",
];
read_table_data_str("./tests/data/basic_partitioned", None, None, expected)?;
Ok(())
}
#[test]
fn column_ordering() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------+--------+--------+",
"| a_float | letter | number |",
"+---------+--------+--------+",
"| 6.6 | | 6 |",
"| 4.4 | a | 4 |",
"| 5.5 | e | 5 |",
"| 1.1 | a | 1 |",
"| 2.2 | b | 2 |",
"| 3.3 | c | 3 |",
"+---------+--------+--------+",
];
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "letter", "number"]),
None,
expected,
)?;
Ok(())
}
#[test]
fn column_ordering_and_projection() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------+--------+",
"| a_float | number |",
"+---------+--------+",
"| 6.6 | 6 |",
"| 4.4 | 4 |",
"| 5.5 | 5 |",
"| 1.1 | 1 |",
"| 2.2 | 2 |",
"| 3.3 | 3 |",
"+---------+--------+",
];
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
None,
expected,
)?;
Ok(())
}
fn table_for_numbers(nums: Vec<u32>) -> Vec<String> {
let mut res: Vec<String> = vec![
"+---------+--------+",
"| a_float | number |",
"+---------+--------+",
]
.into_iter()
.map(String::from)
.collect();
for num in nums.iter() {
res.push(format!("| {num}.{num} | {num} |"));
}
res.push("+---------+--------+".to_string());
res
}
fn table_for_letters(letters: &[char]) -> Vec<String> {
let mut res: Vec<String> = vec![
"+--------+--------+",
"| letter | number |",
"+--------+--------+",
]
.into_iter()
.map(String::from)
.collect();
let rows = vec![(1, 'a'), (2, 'b'), (3, 'c'), (4, 'a'), (5, 'e')];
for (num, letter) in rows {
if letters.contains(&letter) {
res.push(format!("| {letter} | {num} |"));
}
}
res.push("+--------+--------+".to_string());
res
}
#[rstest::rstest]
#[case::less_than(
column_expr!("number").lt(Expr::literal(4i64)),
table_for_numbers(vec![1, 2, 3])
)]
#[case::less_than_or_equal(
column_expr!("number").le(Expr::literal(4i64)),
table_for_numbers(vec![1, 2, 3, 4])
)]
#[case::greater_than(
column_expr!("number").gt(Expr::literal(4i64)),
table_for_numbers(vec![5, 6])
)]
#[case::greater_than_or_equal(
column_expr!("number").ge(Expr::literal(4i64)),
table_for_numbers(vec![4, 5, 6])
)]
#[case::equal(
column_expr!("number").eq(Expr::literal(4i64)),
table_for_numbers(vec![4])
)]
#[case::not_equal(
column_expr!("number").ne(Expr::literal(4i64)),
table_for_numbers(vec![1, 2, 3, 5, 6])
)]
fn predicate_on_number(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::is_null(
column_expr!("letter").is_null(),
vec![
"+--------+--------+",
"| letter | number |",
"+--------+--------+",
"| | 6 |",
"+--------+--------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::is_not_null(
column_expr!("letter").is_not_null(),
table_for_letters(&['a', 'b', 'c', 'e'])
)]
#[case::less_than(
column_expr!("letter").lt(Expr::literal("c")),
table_for_letters(&['a', 'b'])
)]
#[case::less_than_or_equal(
column_expr!("letter").le(Expr::literal("c")),
table_for_letters(&['a', 'b', 'c'])
)]
#[case::greater_than(
column_expr!("letter").gt(Expr::literal("c")),
table_for_letters(&['e'])
)]
#[case::greater_than_or_equal(
column_expr!("letter").ge(Expr::literal("c")),
table_for_letters(&['c', 'e'])
)]
#[case::equal(
column_expr!("letter").eq(Expr::literal("c")),
table_for_letters(&['c'])
)]
#[case::not_equal(
column_expr!("letter").ne(Expr::literal("c")),
table_for_letters(&['a', 'b', 'e'])
)]
fn predicate_on_letter(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["letter", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::or_with_pruning(
Pred::or(
column_expr!("letter").gt(Expr::literal("a")),
column_expr!("number").gt(Expr::literal(3i64)),
),
// Unified data skipping evaluates partition + data predicates in a single pass.
// File a/1 (letter='a', max(number)=1): OR('a'>'a', 1>3) = FALSE -> pruned
vec![
"+--------+--------+",
"| letter | number |",
"+--------+--------+",
"| | 6 |",
"| a | 4 |",
"| b | 2 |",
"| c | 3 |",
"| e | 5 |",
"+--------+--------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::and_with_pruning(
Pred::and(
column_expr!("letter").gt(Expr::literal("a")), // numbers 2, 3, 5
column_expr!("number").gt(Expr::literal(3i64)), // letters a, e
),
table_for_letters(&['e'])
)]
#[case::and_with_nested_or(
Pred::and(
column_expr!("letter").gt(Expr::literal("a")), // numbers 2, 3, 5
Pred::or(
column_expr!("letter").eq(Expr::literal("c")),
column_expr!("number").eq(Expr::literal(3i64)),
),
),
// Unified data skipping evaluates the full expression:
// b/2: AND(TRUE, OR(FALSE, FALSE)) = FALSE -> pruned
// c/3: AND(TRUE, OR(TRUE, TRUE)) = TRUE -> kept
// e/5: AND(TRUE, OR(FALSE, FALSE)) = FALSE -> pruned
table_for_letters(&['c'])
)]
fn predicate_on_letter_and_number(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["letter", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::partition_only_prunes_one_partition(
// Partition-only predicate: modified = '2021-02-02' should prune 2021-02-01 files
column_expr!("modified").eq(Expr::literal("2021-02-02")),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 3 |",
"| A | 2021-02-02 | 3 |",
"| B | 2021-02-02 | 2 |",
"| B | 2021-02-02 | 2 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::partition_prunes_other_partition(
// modified = '2021-02-01' should prune 2021-02-02 files, keeping all 2021-02-01 rows
column_expr!("modified").eq(Expr::literal("2021-02-01")),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-01 | 7 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-01 | 9 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
fn partition_pruning_with_checkpoint_parsed_values(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/app-txn-checkpoint",
Some(&["id", "modified", "value"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::and_keeps_partition_matched_files(
// Data skipping keeps 2021-02-01 files (partition matches, max(value)=11 > 9) and
// prunes 2021-02-02 files (partition mismatch). All rows from kept files are returned
// since kernel does not apply row-level predicate filtering.
Pred::and(
column_expr!("modified").eq(Expr::literal("2021-02-01")),
column_expr!("value").gt(Expr::literal(9i32)),
),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-01 | 7 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-01 | 9 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::and_prunes_all_files(
// 2021-02-02: partition matches but data stats fail (max value=3, NOT > 3).
// 2021-02-01: partition mismatch. All 4 files pruned.
Pred::and(
column_expr!("modified").eq(Expr::literal("2021-02-02")),
column_expr!("value").gt(Expr::literal(3i32)),
),
vec![]
)]
#[case::or_prunes_by_both_partition_and_stats(
// 2021-02-01 pruned: partition mismatch AND max(value)=11 NOT > 11.
// 2021-02-02 kept by partition match. Only 2021-02-02 rows returned.
Pred::or(
column_expr!("modified").eq(Expr::literal("2021-02-02")),
column_expr!("value").gt(Expr::literal(11i32)),
),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 3 |",
"| A | 2021-02-02 | 3 |",
"| B | 2021-02-02 | 2 |",
"| B | 2021-02-02 | 2 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
#[case::or_keeps_all_files(
// 2021-02-02 kept by partition match, 2021-02-01 kept by data stats (max=11 > 9).
// All rows from all 4 files are returned.
Pred::or(
column_expr!("modified").eq(Expr::literal("2021-02-02")),
column_expr!("value").gt(Expr::literal(9i32)),
),
vec![
"+----+------------+-------+",
"| id | modified | value |",
"+----+------------+-------+",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 10 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 11 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 5 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 6 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-01 | 7 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 1 |",
"| A | 2021-02-02 | 3 |",
"| A | 2021-02-02 | 3 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 4 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 8 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-01 | 9 |",
"| B | 2021-02-02 | 2 |",
"| B | 2021-02-02 | 2 |",
"+----+------------+-------+",
]
.into_iter()
.map(String::from)
.collect()
)]
fn mixed_predicate_with_checkpoint_parsed_columns(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/app-txn-checkpoint",
Some(&["id", "modified", "value"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::partition_only(
// Partition-only predicate: category = 'A' prunes the category=B file
Arc::new(Pred::eq(column_expr!("category"), Expr::literal("A"))),
1
)]
#[case::mixed_partition_and_data(
// Mixed predicate: category = 'A' OR val > 'z'. Category=A kept by partition match.
// Category=B: partition mismatch, but max(val)='z' NOT > 'z', so data skipping prunes it.
Arc::new(Pred::or(
Pred::eq(column_expr!("category"), Expr::literal("A")),
Pred::gt(column_expr!("val"), Expr::literal("z")),
)),
1
)]
#[tokio::test]
async fn partition_pruning_with_column_mapping(
#[case] predicate: Arc<Pred>,
#[case] expected_files: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_batch(vec![("phys_val", vec!["x", "y", "z"].into_array())])?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let schema_str = r#"{"type":"struct","fields":[{"name":"category","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":1,"delta.columnMapping.physicalName":"phys_category"}},{"name":"val","type":"string","nullable":true,"metadata":{"delta.columnMapping.id":2,"delta.columnMapping.physicalName":"phys_val"}}]}"#;
let actions = [
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["columnMapping"],"writerFeatures":["columnMapping"]}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","isBlindAppend":true}}"#.to_string(),
format!(
r#"{{"metaData":{{"id":"test-cm","format":{{"provider":"parquet","options":{{}}}},"schemaString":"{schema}","partitionColumns":["category"],"configuration":{{"delta.columnMapping.mode":"name","delta.columnMapping.maxColumnId":"2"}},"createdTime":1587968585495}}}}"#,
schema = schema_str.replace('"', r#"\""#),
),
format!(
r#"{{"add":{{"path":"phys_category=A/{PARQUET_FILE1}","partitionValues":{{"phys_category":"A"}},"size":0,"modificationTime":1587968586000,"dataChange":true,"stats":"{{\"numRecords\":3,\"nullCount\":{{\"phys_val\":0}},\"minValues\":{{\"phys_val\":\"x\"}},\"maxValues\":{{\"phys_val\":\"z\"}}}}" }}}}"#
),
format!(
r#"{{"add":{{"path":"phys_category=B/{PARQUET_FILE2}","partitionValues":{{"phys_category":"B"}},"size":0,"modificationTime":1587968586000,"dataChange":true,"stats":"{{\"numRecords\":3,\"nullCount\":{{\"phys_val\":0}},\"minValues\":{{\"phys_val\":\"x\"}},\"maxValues\":{{\"phys_val\":\"z\"}}}}" }}}}"#
),
];
add_commit(table_root, storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("phys_category=A").join(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("phys_category=B").join(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_predicate(predicate).build()?;
let stream = scan.execute(engine)?;
let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?);
let category_idx = result_batch.schema().index_of("category")?;
let category_col = result_batch.column(category_idx).as_string::<i32>();
for i in 0..result_batch.num_rows() {
assert_eq!(category_col.value(i), "A");
}
files_scanned += 1;
}
assert_eq!(
expected_files, files_scanned,
"Expected partition pruning to return {expected_files} file(s)"
);
Ok(())
}
#[rstest::rstest]
#[case::not_less_than(
Pred::not(column_expr!("number").lt(Expr::literal(4i64))),
table_for_numbers(vec![4, 5, 6])
)]
#[case::not_less_than_or_equal(
Pred::not(column_expr!("number").le(Expr::literal(4i64))),
table_for_numbers(vec![5, 6])
)]
#[case::not_greater_than(
Pred::not(column_expr!("number").gt(Expr::literal(4i64))),
table_for_numbers(vec![1, 2, 3, 4])
)]
#[case::not_greater_than_or_equal(
Pred::not(column_expr!("number").ge(Expr::literal(4i64))),
table_for_numbers(vec![1, 2, 3])
)]
#[case::not_equal(
Pred::not(column_expr!("number").eq(Expr::literal(4i64))),
table_for_numbers(vec![1, 2, 3, 5, 6])
)]
#[case::not_not_equal(
Pred::not(column_expr!("number").ne(Expr::literal(4i64))),
table_for_numbers(vec![4])
)]
fn predicate_on_number_not(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[test]
fn predicate_on_number_with_not_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------+--------+",
"| a_float | number |",
"+---------+--------+",
"| 1.1 | 1 |",
"| 2.2 | 2 |",
"+---------+--------+",
];
read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(Pred::and(
column_expr!("number").is_not_null(),
column_expr!("number").lt(Expr::literal(3i64)),
)),
expected,
)?;
Ok(())
}
#[test]
fn predicate_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![]; read_table_data_str(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(column_expr!("number").is_null()),
expected,
)?;
Ok(())
}
#[test]
fn mixed_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+------+--------------+",
"| part | n |",
"+------+--------------+",
"| 0 | |",
"| 0 | |",
"| 0 | |",
"| 0 | |",
"| 0 | |",
"| 2 | |",
"| 2 | non-null-mix |",
"| 2 | |",
"| 2 | non-null-mix |",
"| 2 | |",
"+------+--------------+",
];
read_table_data_str(
"./tests/data/mixed-nulls",
Some(&["part", "n"]),
Some(column_expr!("n").is_null()),
expected,
)?;
Ok(())
}
#[test]
fn mixed_not_null() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+------+--------------+",
"| part | n |",
"+------+--------------+",
"| 1 | non-null |",
"| 1 | non-null |",
"| 1 | non-null |",
"| 1 | non-null |",
"| 1 | non-null |",
"| 2 | |",
"| 2 | |",
"| 2 | |",
"| 2 | non-null-mix |",
"| 2 | non-null-mix |",
"+------+--------------+",
];
read_table_data_str(
"./tests/data/mixed-nulls",
Some(&["part", "n"]),
Some(column_expr!("n").is_not_null()),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::and_both_conditions(
Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
),
table_for_numbers(vec![6])
)]
#[case::and_with_negation(
Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
),
table_for_numbers(vec![5])
)]
#[case::or_either_condition(
Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
),
table_for_numbers(vec![5, 6])
)]
#[case::or_with_negation(
Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
fn and_or_predicates(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::not_and_both_conditions(
Pred::not(Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
)),
table_for_numbers(vec![1, 2, 3, 4, 5])
)]
#[case::not_and_with_negation(
Pred::not(Pred::and(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
)),
table_for_numbers(vec![1, 2, 3, 4, 6])
)]
#[case::not_or_either_condition(
Pred::not(Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
column_expr!("a_float").gt(Expr::literal(5.5)),
)),
table_for_numbers(vec![1, 2, 3, 4])
)]
#[case::not_or_with_negation(
Pred::not(Pred::or(
column_expr!("number").gt(Expr::literal(4i64)),
Pred::not(column_expr!("a_float").gt(Expr::literal(5.5))),
)),
vec![]
)]
fn not_and_or_predicates(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[rstest::rstest]
#[case::literal_false(Pred::literal(false), table_for_numbers(vec![]))]
#[case::and_with_literal_false(
Pred::and(column_pred!("number"), Pred::literal(false)),
table_for_numbers(vec![])
)]
#[case::literal_true(
Pred::literal(true),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::from_literal_expr(
Pred::from_expr(Expr::literal(3i64)),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::distinct_value(
column_expr!("number").distinct(Expr::literal(3i64)),
table_for_numbers(vec![1, 2, 4, 5, 6])
)]
#[case::distinct_null(
column_expr!("number").distinct(Expr::null_literal(DataType::LONG)),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::not_distinct_value(
Pred::not(column_expr!("number").distinct(Expr::literal(3i64))),
table_for_numbers(vec![3])
)]
#[case::not_distinct_null(
Pred::not(column_expr!("number").distinct(Expr::null_literal(DataType::LONG))),
table_for_numbers(vec![])
)]
#[case::gt_empty_struct(
column_expr!("number").gt(Expr::struct_from(Vec::<ExpressionRef>::new())),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
#[case::not_gt_empty_struct(
Pred::not(column_expr!("number").gt(Expr::struct_from(Vec::<ExpressionRef>::new()))),
table_for_numbers(vec![1, 2, 3, 4, 5, 6])
)]
fn invalid_skips_none_predicates(
#[case] pred: Pred,
#[case] expected: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
read_table_data(
"./tests/data/basic_partitioned",
Some(&["a_float", "number"]),
Some(pred),
expected,
)?;
Ok(())
}
#[test]
fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+-------+",
"| value |",
"+-------+",
"| 1 |",
"| 2 |",
"| 3 |",
"| 4 |",
"| 5 |",
"| 6 |",
"| 7 |",
"| 8 |",
"+-------+",
];
read_table_data_str(
"./tests/data/table-with-dv-small/",
None,
Some(Pred::gt(column_expr!("value"), Expr::literal(3))),
expected,
)?;
Ok(())
}
#[tokio::test]
async fn predicate_on_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
];
add_commit(table_root, storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("id=1").join(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("id=2").join(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let predicate = Pred::eq(column_expr!("id"), Expr::literal(2));
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;
let stream = scan.execute(engine)?;
let mut files_scanned = 0;
for engine_data in stream {
let mut result_batch = into_record_batch(engine_data?);
let _ = result_batch.remove_column(result_batch.schema().index_of("id")?);
assert_eq!(&batch, &result_batch);
files_scanned += 1;
}
assert_eq!(1, files_scanned);
Ok(())
}
#[tokio::test]
async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box<dyn std::error::Error>>
{
let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];
let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();
add_commit(table_root, storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes_with_props(&batch_2, writer_props).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let predicate = Pred::eq(column_expr!("val"), Expr::literal("g"));
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;
let stream = scan.execute(engine)?;
let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?);
assert_eq!(&batch_2, &result_batch);
files_scanned += 1;
}
assert_eq!(1, files_scanned);
Ok(())
}
#[test]
fn short_dv() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+-------+--------------------------+---------------------+",
"| id | value | timestamp | rand |",
"+----+-------+--------------------------+---------------------+",
"| 3 | 3 | 2023-05-31T18:58:33.633Z | 0.7918174793484931 |",
"| 4 | 4 | 2023-05-31T18:58:33.633Z | 0.9281049271981882 |",
"| 5 | 5 | 2023-05-31T18:58:33.633Z | 0.27796520310701633 |",
"| 6 | 6 | 2023-05-31T18:58:33.633Z | 0.15263801464228832 |",
"| 7 | 7 | 2023-05-31T18:58:33.633Z | 0.1981143710215575 |",
"| 8 | 8 | 2023-05-31T18:58:33.633Z | 0.3069439236599195 |",
"| 9 | 9 | 2023-05-31T18:58:33.633Z | 0.5175919190815845 |",
"+----+-------+--------------------------+---------------------+",
];
read_table_data_str("./tests/data/with-short-dv/", None, None, expected)?;
Ok(())
}
#[test]
fn basic_decimal() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----------------+---------+--------------+------------------------+",
"| part | col1 | col2 | col3 |",
"+----------------+---------+--------------+------------------------+",
"| -2342342.23423 | -999.99 | -99999.99999 | -9999999999.9999999999 |",
"| 0.00004 | 0.00 | 0.00000 | 0.0000000000 |",
"| 234.00000 | 1.00 | 2.00000 | 3.0000000000 |",
"| 2342222.23454 | 111.11 | 22222.22222 | 3333333333.3333333333 |",
"+----------------+---------+--------------+------------------------+",
];
read_table_data_str("./tests/data/basic-decimal-table/", None, None, expected)?;
Ok(())
}
#[test]
fn timestamp_ntz() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+----------------------------+----------------------------+",
"| id | tsNtz | tsNtzPartition |",
"+----+----------------------------+----------------------------+",
"| 0 | 2021-11-18T02:30:00.123456 | 2021-11-18T02:30:00.123456 |",
"| 1 | 2013-07-05T17:01:00.123456 | 2021-11-18T02:30:00.123456 |",
"| 2 | | 2021-11-18T02:30:00.123456 |",
"| 3 | 2021-11-18T02:30:00.123456 | 2013-07-05T17:01:00.123456 |",
"| 4 | 2013-07-05T17:01:00.123456 | 2013-07-05T17:01:00.123456 |",
"| 5 | | 2013-07-05T17:01:00.123456 |",
"| 6 | 2021-11-18T02:30:00.123456 | |",
"| 7 | 2013-07-05T17:01:00.123456 | |",
"| 8 | | |",
"+----+----------------------------+----------------------------+",
];
read_table_data_str(
"./tests/data/data-reader-timestamp_ntz/",
None,
None,
expected,
)?;
Ok(())
}
#[test]
fn type_widening_basic() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| byte_long | int_long | float_double | byte_double | short_double | int_double | date_timestamp_ntz |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| 1 | 2 | 3.4000000953674316 | 5.0 | 6.0 | 7.0 | 2024-09-09T00:00:00 |",
"| 9223372036854775807 | 9223372036854775807 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 2024-09-09T12:34:56.123456 |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"byte_long",
"int_long",
"float_double",
"byte_double",
"short_double",
"int_double",
"date_timestamp_ntz",
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}
#[test]
fn type_widening_decimal() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| decimal_decimal_same_scale | decimal_decimal_greater_scale | byte_decimal | short_decimal | int_decimal | long_decimal |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| 123.45 | 67.89000 | 1.0 | 2.0 | 3.0 | 4.0 |",
"| 12345678901234.56 | 12345678901.23456 | 123.4 | 12345.6 | 1234567890.1 | 123456789012345678.9 |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"decimal_decimal_same_scale",
"decimal_decimal_greater_scale",
"byte_decimal",
"short_decimal",
"int_decimal",
"long_decimal",
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}
#[test]
fn predicate_references_invalid_missing_column() -> Result<(), Box<dyn std::error::Error>> {
let columns = &["chrono", "missing"];
let expected = vec![
"+-------------------------------------------------------------------------------------------+---------+",
"| chrono | missing |",
"+-------------------------------------------------------------------------------------------+---------+",
"| {date32: 1971-01-01, timestamp: 1970-02-01T08:00:00Z, timestamp_ntz: 1970-01-02T00:00:00} | |",
"| {date32: 1971-01-02, timestamp: 1970-02-01T09:00:00Z, timestamp_ntz: 1970-01-02T00:01:00} | |",
"| {date32: 1971-01-03, timestamp: 1970-02-01T10:00:00Z, timestamp_ntz: 1970-01-02T00:02:00} | |",
"| {date32: 1971-01-04, timestamp: 1970-02-01T11:00:00Z, timestamp_ntz: 1970-01-02T00:03:00} | |",
"| {date32: 1971-01-05, timestamp: 1970-02-01T12:00:00Z, timestamp_ntz: 1970-01-02T00:04:00} | |",
"+-------------------------------------------------------------------------------------------+---------+",
];
let predicate = column_expr!("missing").lt(Expr::literal(10i64));
read_table_data_str(
"./tests/data/parquet_row_group_skipping/",
Some(columns),
Some(predicate),
expected,
)?;
let expected = vec![
"+-------------------------------------------------------------------------------------------+",
"| chrono |",
"+-------------------------------------------------------------------------------------------+",
"| {date32: 1971-01-01, timestamp: 1970-02-01T08:00:00Z, timestamp_ntz: 1970-01-02T00:00:00} |",
"| {date32: 1971-01-02, timestamp: 1970-02-01T09:00:00Z, timestamp_ntz: 1970-01-02T00:01:00} |",
"| {date32: 1971-01-03, timestamp: 1970-02-01T10:00:00Z, timestamp_ntz: 1970-01-02T00:02:00} |",
"| {date32: 1971-01-04, timestamp: 1970-02-01T11:00:00Z, timestamp_ntz: 1970-01-02T00:03:00} |",
"| {date32: 1971-01-05, timestamp: 1970-02-01T12:00:00Z, timestamp_ntz: 1970-01-02T00:04:00} |",
"+-------------------------------------------------------------------------------------------+",
];
let predicate = column_expr!("invalid").lt(Expr::literal(10));
read_table_data_str(
"./tests/data/parquet_row_group_skipping/",
Some(columns),
Some(predicate),
expected,
)
.expect_err("unknown column");
Ok(())
}
#[cfg(not(windows))]
#[test]
fn timestamp_partitioned_table() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+-----+---+----------------------+",
"| id | x | s | time |",
"+----+-----+---+----------------------+",
"| 1 | 0.5 | | 1971-07-22T03:06:40Z |",
"+----+-----+---+----------------------+",
];
let test_name = "timestamp-partitioned-table";
let test_dir = load_test_data("./tests/data", test_name).unwrap();
let test_path = test_dir.path().join(test_name);
read_table_data_str(test_path.to_str().unwrap(), None, None, expected)
}
#[test]
fn compacted_log_files_table() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+--------------------+",
"| id | comment |",
"+----+--------------------+",
"| 0 | new |",
"| 1 | after-large-delete |",
"| 2 | |",
"| 10 | merge1-insert |",
"| 12 | merge2-insert |",
"+----+--------------------+",
];
let test_name = "compacted-log-files-table";
let test_dir = load_test_data("./tests/data", test_name).unwrap();
let test_path = test_dir.path().join(test_name);
read_table_data_str(test_path.to_str().unwrap(), None, None, expected)
}
#[test]
fn unshredded_variant_table() -> Result<(), Box<dyn std::error::Error>> {
let expected = include!("data/unshredded-variant.expected.in");
let test_name = "unshredded-variant";
let test_dir = load_test_data("./tests/data", test_name).unwrap();
let test_path = test_dir.path().join(test_name);
read_table_data_str(test_path.to_str().unwrap(), None, None, expected)
}
#[tokio::test]
async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Error>> {
let batch1 = generate_batch(vec![
("id", vec![1i32, 2, 3, 4, 5].into_array()),
("value", vec!["a", "b", "c", "d", "e"].into_array()),
])?;
let batch2 = generate_batch(vec![
("id", vec![10i32, 20, 30].into_array()),
("value", vec!["x", "y", "z"].into_array()),
])?;
let batch3 = generate_batch(vec![
("id", vec![100i32, 200, 300, 400].into_array()),
("value", vec!["p", "q", "r", "s"].into_array()),
])?;
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
let file_size3 = record_batch_to_bytes(&batch3).len() as u64;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size2),
TestAction::AddWithSize(PARQUET_FILE3.to_string(), file_size3),
]),
)
.await?;
for (parquet_file, batch) in [
(PARQUET_FILE1, &batch1),
(PARQUET_FILE2, &batch2),
(PARQUET_FILE3, &batch3),
] {
storage
.put(
&Path::from(parquet_file),
record_batch_to_bytes(batch).into(),
)
.await?;
}
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let schema = Arc::new(StructType::try_new([
StructField::nullable("id", DataType::INTEGER),
StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
StructField::nullable("value", DataType::STRING),
])?);
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_schema(schema).build()?;
let mut file_count = 0;
let expected_row_counts = [5, 3, 4];
let stream = scan.execute(engine.clone())?;
for data in stream {
let batch = into_record_batch(data?);
file_count += 1;
assert_eq!(batch.num_columns(), 3, "Expected 3 columns in the batch");
assert_eq!(
batch.schema().field(0).name(),
"id",
"First column should be 'id'"
);
assert_eq!(
batch.schema().field(1).name(),
"row_index",
"Second column should be 'row_index'"
);
assert_eq!(
batch.schema().field(2).name(),
"value",
"Third column should be 'value'"
);
let row_index_array = batch.column(1).as_primitive::<Int64Type>();
let expected_values: Vec<i64> = (0..batch.num_rows() as i64).collect();
assert_eq!(
row_index_array.values().to_vec(),
expected_values,
"Row index values incorrect for file {} (expected {} rows)",
file_count,
expected_row_counts[file_count - 1]
);
}
assert_eq!(file_count, 3, "Expected to scan 3 files");
Ok(())
}
#[tokio::test]
async fn test_file_path_metadata_column() -> Result<(), Box<dyn std::error::Error>> {
use delta_kernel::arrow::array::{Array, StringArray};
let batch1 = generate_batch(vec![
("id", vec![1i32, 2, 3].into_array()),
("value", vec!["a", "b", "c"].into_array()),
])?;
let batch2 = generate_batch(vec![
("id", vec![10i32, 20].into_array()),
("value", vec!["x", "y"].into_array()),
])?;
let file_size1 = record_batch_to_bytes(&batch1).len() as u64;
let file_size2 = record_batch_to_bytes(&batch2).len() as u64;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AddWithSize(PARQUET_FILE1.to_string(), file_size1),
TestAction::AddWithSize(PARQUET_FILE2.to_string(), file_size2),
]),
)
.await?;
for (parquet_file, batch) in [(PARQUET_FILE1, &batch1), (PARQUET_FILE2, &batch2)] {
storage
.put(
&Path::from(parquet_file),
record_batch_to_bytes(batch).into(),
)
.await?;
}
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let schema = Arc::new(StructType::try_new([
StructField::nullable("id", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
StructField::nullable("value", DataType::STRING),
])?);
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let scan = snapshot.scan_builder().with_schema(schema).build()?;
let mut file_count = 0;
let expected_files = [PARQUET_FILE1, PARQUET_FILE2];
let expected_row_counts = [3, 2];
let stream = scan.execute(engine.clone())?;
for data in stream {
let batch = into_record_batch(data?);
assert_eq!(batch.num_columns(), 3, "Expected 3 columns in the batch");
assert_eq!(
batch.schema().field(0).name(),
"id",
"First column should be 'id'"
);
assert_eq!(
batch.schema().field(1).name(),
"_file",
"Second column should be '_file'"
);
assert_eq!(
batch.schema().field(2).name(),
"value",
"Third column should be 'value'"
);
let file_path_array = batch.column(1);
let expected_file_name = expected_files[file_count];
let expected_path = format!("{table_root}{expected_file_name}");
let string_array = file_path_array
.as_any()
.downcast_ref::<StringArray>()
.expect("File path column should be a StringArray");
assert_eq!(
string_array.len(),
expected_row_counts[file_count],
"File {} should have {} rows",
expected_file_name,
expected_row_counts[file_count]
);
assert!(
string_array
.iter()
.all(|v| v == Some(expected_path.as_str())),
"All rows should contain file path '{expected_path}'"
);
file_count += 1;
}
assert_eq!(file_count, 2, "Expected to scan 2 files");
Ok(())
}
#[tokio::test]
async fn test_unsupported_metadata_columns() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::Add(PARQUET_FILE1.to_string()),
]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let test_cases = [
(
"row_id",
MetadataColumnSpec::RowId,
"Row ids are not enabled on this table",
),
(
"row_commit_version",
MetadataColumnSpec::RowCommitVersion,
"Row commit versions not supported",
),
];
for (column_name, metadata_spec, error_text) in test_cases {
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let schema = Arc::new(StructType::try_new([
StructField::nullable("id", DataType::INTEGER),
StructField::create_metadata_column(column_name, metadata_spec),
])?);
let scan_err = snapshot
.scan_builder()
.with_schema(schema)
.build()
.unwrap_err();
let error_msg = scan_err.to_string();
assert!(
error_msg.contains(error_text),
"Expected {error_msg} to contain {error_text}"
);
}
Ok(())
}
#[tokio::test]
async fn test_invalid_files_are_skipped() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
add_commit(
table_root,
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::Add(PARQUET_FILE1.to_string()),
TestAction::Add(PARQUET_FILE2.to_string()),
]),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let invalid_files = [
"_delta_log/0.zip",
"_delta_log/_copy_into_log/0.zip",
"_delta_log/_ignore_me/00000000000000000000.json",
"_delta_log/_and_me/00000000000000000000.checkpoint.parquet",
"_delta_log/02184.json",
"_delta_log/0x000000000000000000.checkpoint.parquet",
"00000000000000000000.json",
"_delta_log/_staged_commits/_staged_commits/00000000000000000000.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
"_delta_log/my_random_dir/_staged_commits/00000000000000000000.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
"_delta_log/my_random_dir/_delta_log/_staged_commits/00000000000000000000.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json",
"_delta_log/_delta_log/00000000000000000000.json",
"_delta_log/_delta_log/00000000000000000000.checkpoint.parquet",
"_delta_log/something/_delta_log/00000000000000000000.crc",
"_delta_log/something/_delta_log/00000000000000000000.json",
"_delta_log/something/_delta_log/00000000000000000000.checkpoint.parquet",
];
fn get_file_path_for_test(path: &ParsedLogPath) -> &str {
&path.location.location.as_str()[10..]
}
fn ensure_segment_does_not_contain(invalid_files: &[&str], segment: &LogSegment) {
assert!(
!segment.listed.ascending_commit_files.iter().any(|p| {
let test_path = get_file_path_for_test(p);
invalid_files.contains(&test_path)
}),
"ascending_commit_files contained invalid file"
);
assert!(
!segment.listed.ascending_compaction_files.iter().any(|p| {
let test_path = get_file_path_for_test(p);
invalid_files.contains(&test_path)
}),
"ascending_compaction_files contained invalid file"
);
assert!(
!segment.listed.checkpoint_parts.iter().any(|p| {
let test_path = get_file_path_for_test(p);
invalid_files.contains(&test_path)
}),
"checkpoint_parts contained invalid file"
);
if let Some(ref crc) = segment.listed.latest_crc_file {
assert!(
!invalid_files.contains(&get_file_path_for_test(crc)),
"Latest crc contained invalid file"
);
}
if let Some(ref latest_commit) = segment.listed.latest_commit_file {
assert!(
!invalid_files.contains(&get_file_path_for_test(latest_commit)),
"Latest commit contained invalid file"
);
}
}
for invalid_file in invalid_files.iter() {
let invalid_path = Path::from(*invalid_file);
storage.put(&invalid_path, vec![1u8].into()).await?;
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
ensure_segment_does_not_contain(&invalid_files, snapshot.log_segment());
storage.delete(&invalid_path).await?;
}
for invalid_file in invalid_files.iter() {
let invalid_path = Path::from(*invalid_file);
storage.put(&invalid_path, vec![1u8].into()).await?;
}
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
ensure_segment_does_not_contain(&invalid_files, snapshot.log_segment());
Ok(())
}
#[rstest::rstest]
#[test]
fn checkpoint_stats_skipping(
#[values(
"v1-single-part",
"v1-multi-part",
"v2-parquet-sidecars",
"v2-json-sidecars",
"v2-classic-parquet"
)]
checkpoint_type: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let table_path = format!("./tests/data/{checkpoint_type}-struct-stats-only/");
let expected = vec![
"+----+---------+",
"| id | value |",
"+----+---------+",
"| 4 | value_4 |",
"| 5 | value_5 |",
"+----+---------+",
];
let predicate = column_expr!("id").gt(Expr::literal(3i64));
read_table_data_str(&table_path, None, Some(predicate), expected)?;
Ok(())
}
#[test]
fn partition_values_parsed_skipping() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----+---------+------+",
"| id | value | part |",
"+----+---------+------+",
"| 3 | value_3 | 0 |",
"+----+---------+------+",
];
let predicate = column_expr!("part").eq(Expr::literal(0i32));
read_table_data_str(
"./tests/data/v1-multi-part-partitioned-struct-stats-only/",
None,
Some(predicate),
expected,
)?;
Ok(())
}
#[tokio::test]
async fn timestamp_max_stat_truncation_does_not_over_prune(
) -> Result<(), Box<dyn std::error::Error>> {
let ts_metadata = "{\
\"id\":\"test-ts-table\",\
\"format\":{\"provider\":\"parquet\",\"options\":{}},\
\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[\
{\\\"name\\\":\\\"ts_col\\\",\\\"type\\\":\\\"timestamp\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}\
]}\",\
\"partitionColumns\":[],\
\"configuration\":{},\
\"createdTime\":1700000000000\
}";
let ts_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"ts_col",
delta_kernel::arrow::datatypes::DataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".into()),
),
true,
)]));
let file1_stats = r#"{"numRecords":2,"nullCount":{"ts_col":0},"minValues":{"ts_col":"1970-01-01T00:00:01.000Z"},"maxValues":{"ts_col":"1970-01-01T00:00:02.000Z"}}"#;
let file2_stats = r#"{"numRecords":2,"nullCount":{"ts_col":0},"minValues":{"ts_col":"1970-01-01T00:00:03.000Z"},"maxValues":{"ts_col":"1970-01-01T00:00:04.000Z"}}"#;
let file3_stats = r#"{"numRecords":2,"nullCount":{"ts_col":0},"minValues":{"ts_col":"1970-01-01T00:00:07.000Z"},"maxValues":{"ts_col":"1970-01-01T00:00:08.000Z"}}"#;
let batch1 = RecordBatch::try_new(
ts_schema.clone(),
vec![Arc::new(
TimestampMicrosecondArray::from(vec![1_000_000i64, 2_000_000]).with_timezone("UTC"),
)],
)?;
let batch2 = RecordBatch::try_new(
ts_schema.clone(),
vec![Arc::new(
TimestampMicrosecondArray::from(vec![3_000_000i64, 4_000_500]).with_timezone("UTC"),
)],
)?;
let batch3 = RecordBatch::try_new(
ts_schema,
vec![Arc::new(
TimestampMicrosecondArray::from(vec![7_000_000i64, 8_000_000]).with_timezone("UTC"),
)],
)?;
let file1_bytes = record_batch_to_bytes(&batch1);
let file2_bytes = record_batch_to_bytes(&batch2);
let file3_bytes = record_batch_to_bytes(&batch3);
let storage = Arc::new(InMemory::new());
let table_root = "memory:///";
let make_add = |name: &str, size: usize, stats: &str| -> String {
format!(
"{{\"add\":{{\"path\":\"{name}\",\"partitionValues\":{{}},\"size\":{size},\
\"modificationTime\":1700000000000,\"dataChange\":true,\
\"stats\":\"{stats_escaped}\"}}}}",
stats_escaped = stats.replace('"', "\\\""),
)
};
let commit0 = format!(
"{{\"protocol\":{{\"minReaderVersion\":1,\"minWriterVersion\":2}}}}\n\
{{\"metaData\":{ts_metadata}}}\n\
{}",
make_add("file1.parquet", file1_bytes.len(), file1_stats)
);
add_commit(table_root, storage.as_ref(), 0, commit0).await?;
add_commit(
table_root,
storage.as_ref(),
1,
make_add("file2.parquet", file2_bytes.len(), file2_stats),
)
.await?;
add_commit(
table_root,
storage.as_ref(),
2,
make_add("file3.parquet", file3_bytes.len(), file3_stats),
)
.await?;
storage
.put(&Path::from("file1.parquet"), file1_bytes.into())
.await?;
storage
.put(&Path::from("file2.parquet"), file2_bytes.into())
.await?;
storage
.put(&Path::from("file3.parquet"), file3_bytes.into())
.await?;
let engine = Arc::new(DefaultEngineBuilder::new(storage.clone()).build());
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
let row_count = |predicate_us: i64| -> Result<usize, Box<dyn std::error::Error>> {
let predicate = Arc::new(Pred::gt(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(predicate_us)),
));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()?;
let batches = read_scan(&scan, engine.clone())?;
Ok(batches.iter().map(|b| b.num_rows()).sum())
};
assert_eq!(row_count(4_000_400)?, 4, "mid-ms: file2+file3 kept");
assert_eq!(
row_count(4_000_000)?,
4,
"exact ms boundary: file2+file3 kept"
);
assert_eq!(row_count(4_000_001)?, 4, "1us above ms: file2+file3 kept");
assert_eq!(
row_count(4_000_998)?,
4,
"just not prunable: file2+file3 kept"
);
assert_eq!(row_count(4_000_999)?, 2, "just prunable: only file3 kept");
assert_eq!(
row_count(4_001_000)?,
2,
"next ms boundary: only file3 kept"
);
Ok(())
}
#[test]
fn timestamp_truncation_real_table_gt() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::gt(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"| 5 | 1970-01-01T00:00:07Z |",
"| 6 | 1970-01-01T00:00:08Z |",
"+----+-----------------------------+",
],
)
}
#[test]
fn timestamp_truncation_real_table_ge() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::ge(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"| 5 | 1970-01-01T00:00:07Z |",
"| 6 | 1970-01-01T00:00:08Z |",
"+----+-----------------------------+",
],
)
}
#[test]
fn timestamp_truncation_real_table_lt() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::lt(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 1 | 1970-01-01T00:00:01Z |",
"| 2 | 1970-01-01T00:00:02Z |",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"+----+-----------------------------+",
],
)
}
#[test]
fn timestamp_truncation_real_table_le() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::le(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 1 | 1970-01-01T00:00:01Z |",
"| 2 | 1970-01-01T00:00:02Z |",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"+----+-----------------------------+",
],
)
}
#[test]
fn timestamp_truncation_real_table_eq() -> Result<(), Box<dyn std::error::Error>> {
read_table_data_str(
"./tests/data/timestamp-truncation-stats",
None,
Some(Pred::eq(
column_expr!("ts_col"),
Expr::literal(Scalar::Timestamp(4_000_400)),
)),
vec![
"+----+-----------------------------+",
"| id | ts_col |",
"+----+-----------------------------+",
"| 3 | 1970-01-01T00:00:03Z |",
"| 4 | 1970-01-01T00:00:04.000500Z |",
"+----+-----------------------------+",
],
)
}