use std::collections::HashMap;
use std::sync::Arc;
use delta_kernel::arrow::array::{Int64Array, RecordBatch};
use delta_kernel::arrow::datatypes::Schema as ArrowSchema;
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::expressions::{
column_expr, Expression as Expr, Predicate as Pred, PredicateRef, Scalar,
};
use delta_kernel::object_store::local::LocalFileSystem;
use delta_kernel::scan::{AfterSequentialScanMetadata, ParallelScanMetadata};
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::transaction::create_table::create_table;
use delta_kernel::transaction::data_layout::DataLayout;
use delta_kernel::{Snapshot, SnapshotRef};
use rstest::rstest;
use test_utils::delta_kernel_default_engine::executor::tokio::TokioMultiThreadExecutor;
use test_utils::delta_kernel_default_engine::DefaultEngine;
use test_utils::{
add_commit, create_table_and_load_snapshot, test_table_setup_mt, write_batch_to_table,
};
use url::Url;
use crate::common::write_utils::set_table_properties;
type TestEngine = DefaultEngine<TokioMultiThreadExecutor>;
fn capped_schema() -> SchemaRef {
Arc::new(
StructType::try_new((0..5).map(|i| StructField::nullable(format!("c{i}"), DataType::LONG)))
.unwrap(),
)
}
fn long_batch(schema: &SchemaRef, c0_start: i64, rows: i64) -> RecordBatch {
let arrow_schema: ArrowSchema = schema.as_ref().try_into_arrow().unwrap();
let c0 = Int64Array::from_iter_values(c0_start..c0_start + rows);
let c1 = Int64Array::from_iter_values(c0_start + 100..c0_start + 100 + rows);
let cn = Int64Array::from_iter_values(std::iter::repeat_n(c0_start, rows as usize));
RecordBatch::try_new(
Arc::new(arrow_schema),
vec![
Arc::new(c0),
Arc::new(c1),
Arc::new(cn.clone()),
Arc::new(cn.clone()),
Arc::new(cn),
],
)
.unwrap()
}
async fn build_capped_table_with_checkpoint(
) -> Result<(tempfile::TempDir, String, Arc<TestEngine>), Box<dyn std::error::Error>> {
let schema = capped_schema();
let (tmp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = Url::from_directory_path(&table_path)
.map_err(|_| "table_path should be a valid file URL")?;
let snapshot =
create_table_and_load_snapshot(&table_path, schema.clone(), engine.as_ref(), &[])?;
let mut snapshot = set_table_properties(
&table_path,
&table_url,
engine.as_ref(),
snapshot.version(),
&[("delta.dataSkippingNumIndexedCols", "2")],
)?;
for c0_start in [10, 50, 100] {
snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
long_batch(&schema, c0_start, 10),
HashMap::new(),
)
.await?;
}
snapshot.checkpoint(engine.as_ref(), None)?;
Ok((tmp_dir, table_path, engine))
}
fn count_selected_files(
snapshot: SnapshotRef,
engine: Arc<TestEngine>,
predicate: PredicateRef,
use_parallel: bool,
) -> Result<usize, Box<dyn std::error::Error>> {
let scan = snapshot.scan_builder().with_predicate(predicate).build()?;
fn push_path(paths: &mut Vec<String>, scan_file: delta_kernel::scan::state::ScanFile) {
paths.push(scan_file.path);
}
let mut paths: Vec<String> = Vec::new();
if use_parallel {
let mut sequential = scan.parallel_scan_metadata(engine.clone())?;
for sm in sequential.by_ref() {
paths = sm?.visit_scan_files(paths, push_path)?;
}
if let AfterSequentialScanMetadata::Parallel { state, files } = sequential.finish()? {
let state = Arc::from(state);
for file in files {
let parallel =
ParallelScanMetadata::try_new(engine.clone(), Arc::clone(&state), vec![file])?;
for sm in parallel {
paths = sm?.visit_scan_files(paths, push_path)?;
}
}
}
} else {
for sm in scan.scan_metadata(engine.as_ref())? {
paths = sm?.visit_scan_files(paths, push_path)?;
}
}
Ok(paths.len())
}
fn surviving_files(
table_path: &str,
engine: Arc<TestEngine>,
predicate: PredicateRef,
use_parallel: bool,
) -> Result<usize, Box<dyn std::error::Error>> {
let url = delta_kernel::try_parse_uri(table_path)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
count_selected_files(snapshot, engine, predicate, use_parallel)
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_in_cap_prunes(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::gt(column_expr!("c0"), Expr::literal(60i64)));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 1);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_in_cap_keeps_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::gt(column_expr!("c0"), Expr::literal(0i64)));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 3);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_past_cap_keeps_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::gt(column_expr!("c4"), Expr::literal(1_000_000i64)));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 3);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mixed_and_in_cap_prunes(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::and(
Pred::gt(column_expr!("c0"), Expr::literal(60i64)),
Pred::gt(column_expr!("c4"), Expr::literal(50i64)),
));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 1);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mixed_or_keeps_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::or(
Pred::gt(column_expr!("c0"), Expr::literal(1000i64)),
Pred::gt(column_expr!("c4"), Expr::literal(50i64)),
));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 3);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn boundary_c1_prunes_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::and(
Pred::gt(column_expr!("c1"), Expr::literal(250i64)),
Pred::gt(column_expr!("c2"), Expr::literal(50i64)),
));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 0);
Ok(())
}
fn timestamp_stats_schema() -> SchemaRef {
Arc::new(
StructType::try_new(vec![
StructField::nullable("EventTime", DataType::TIMESTAMP),
StructField::nullable("UserId", DataType::LONG),
])
.unwrap(),
)
}
fn stats_json(
num_records: i64,
event_time_min: &str,
event_time_max: &str,
user_id_min: i64,
user_id_max: i64,
) -> String {
format!(
r#"{{"numRecords":{num_records},"minValues":{{"EventTime":"{event_time_min}","UserId":{user_id_min}}},"maxValues":{{"EventTime":"{event_time_max}","UserId":{user_id_max}}},"nullCount":{{"EventTime":0,"UserId":0}},"tightBounds":true}}"#
)
}
fn commit_with_adds(version: u64, adds: &[(&str, String)]) -> String {
let mut lines = vec![format!(
r#"{{"commitInfo":{{"timestamp":1700000000000,"operation":"WRITE","version":{version}}}}}"#
)];
for (path, stats) in adds {
let stats_escaped = serde_json::Value::String(stats.clone()).to_string();
lines.push(format!(
r#"{{"add":{{"path":"{path}","size":1024,"modificationTime":1700000000000,"dataChange":true,"partitionValues":{{}},"stats":{stats_escaped}}}}}"#
));
}
lines.join("\n")
}
fn commit_with_remove(version: u64, path: &str, deletion_timestamp: i64) -> String {
let commit_info = format!(
r#"{{"commitInfo":{{"timestamp":{deletion_timestamp},"operation":"DELETE","version":{version}}}}}"#
);
let remove = format!(
r#"{{"remove":{{"path":"{path}","deletionTimestamp":{deletion_timestamp},"dataChange":true,"extendedFileMetadata":false}}}}"#
);
format!("{commit_info}\n{remove}")
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn extended_year_timestamp_stats_dont_collapse_skipping(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let _ = create_table_and_load_snapshot(
&table_path,
timestamp_stats_schema(),
engine.as_ref(),
&[],
)?;
let store: Arc<delta_kernel::object_store::DynObjectStore> = Arc::new(LocalFileSystem::new());
let table_url = Url::from_directory_path(&table_path)
.map_err(|_| "table_path should be a valid file URL")?;
let table_url_string = table_url.to_string();
add_commit(
&table_url_string,
store.as_ref(),
1,
commit_with_adds(
1,
&[(
"file_A.parquet",
stats_json(
10,
"2024-01-15T00:00:00.000Z",
"2024-01-31T00:00:00.000Z",
1,
100,
),
)],
),
)
.await?;
add_commit(
&table_url_string,
store.as_ref(),
2,
commit_with_adds(
2,
&[
(
"file_B.parquet",
stats_json(
20,
"+48690-07-02T22:50:38.211Z",
"+48690-07-02T22:50:38.211Z",
5,
200,
),
),
(
"file_D.parquet",
stats_json(
40,
"2024-09-01T00:00:00.000Z",
"2024-09-30T00:00:00.000Z",
7,
70,
),
),
],
),
)
.await?;
add_commit(
&table_url_string,
store.as_ref(),
3,
commit_with_adds(
3,
&[(
"file_C.parquet",
stats_json(
30,
"2025-01-01T00:00:00.000Z",
"2025-12-31T00:00:00.000Z",
10,
300,
),
)],
),
)
.await?;
let june_first_2024_us: i64 = 1_717_200_000_000_000;
let predicate = Arc::new(Pred::and(
Pred::lt(
column_expr!("EventTime"),
Expr::literal(Scalar::Timestamp(june_first_2024_us)),
),
Pred::gt(column_expr!("UserId"), Expr::literal(0i64)),
));
assert_eq!(
surviving_files(&table_path, engine, predicate, use_parallel)?,
2
);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn extended_year_timestamp_round_trip_via_checkpoint_and_remove(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = Url::from_directory_path(&table_path)
.map_err(|_| "table_path should be a valid file URL")?;
let table_url_string = table_url.to_string();
let store: Arc<delta_kernel::object_store::DynObjectStore> = Arc::new(LocalFileSystem::new());
let _v0 = create_table_and_load_snapshot(
&table_path,
timestamp_stats_schema(),
engine.as_ref(),
&[("delta.checkpoint.writeStatsAsStruct", "true")],
)?;
add_commit(
&table_url_string,
store.as_ref(),
1,
commit_with_adds(
1,
&[
(
"file_A.parquet",
stats_json(
10,
"2024-01-15T00:00:00.000Z",
"2024-01-31T00:00:00.000Z",
1,
100,
),
),
(
"file_B.parquet",
stats_json(
20,
"+48690-07-02T22:50:38.211Z",
"+48690-07-02T22:50:38.211Z",
5,
200,
),
),
(
"file_E.parquet",
stats_json(
50,
"2025-01-01T00:00:00.000Z",
"2025-12-31T00:00:00.000Z",
10,
300,
),
),
],
),
)
.await?;
let snapshot_v1 = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
snapshot_v1.checkpoint(engine.as_ref(), None)?;
add_commit(
&table_url_string,
store.as_ref(),
2,
commit_with_adds(
2,
&[(
"file_C.parquet",
stats_json(
15,
"2024-02-01T00:00:00.000Z",
"2024-02-29T00:00:00.000Z",
50,
80,
),
)],
),
)
.await?;
add_commit(
&table_url_string,
store.as_ref(),
3,
commit_with_remove(3, "file_C.parquet", 1_700_000_001_000),
)
.await?;
let june_first_2024_us: i64 = 1_717_200_000_000_000;
let predicate = Arc::new(Pred::and(
Pred::lt(
column_expr!("EventTime"),
Expr::literal(Scalar::Timestamp(june_first_2024_us)),
),
Pred::gt(column_expr!("UserId"), Expr::literal(0i64)),
));
assert_eq!(
surviving_files(&table_path, engine, predicate, use_parallel)?,
2
);
Ok(())
}
fn commit_with_ts_partitioned_adds(version: u64, adds: &[(&str, &str)]) -> String {
let mut lines = vec![format!(
r#"{{"commitInfo":{{"timestamp":1700000000000,"operation":"WRITE","version":{version}}}}}"#
)];
for (path, ts) in adds {
lines.push(format!(
r#"{{"add":{{"path":"{path}","size":1024,"modificationTime":1700000000000,"dataChange":true,"partitionValues":{{"ts":"{ts}"}}}}}}"#
));
}
lines.join("\n")
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn partition_pruning_honors_rfc3339_offset_partition_values(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("ts", DataType::TIMESTAMP),
StructField::nullable("v", DataType::LONG),
])?);
create_table(&table_path, schema, "Test/1.0")
.with_data_layout(DataLayout::partitioned(["ts"]))
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
.commit(engine.as_ref())?
.unwrap_committed();
let store: Arc<delta_kernel::object_store::DynObjectStore> = Arc::new(LocalFileSystem::new());
let table_url = Url::from_directory_path(&table_path)
.map_err(|_| "table_path should be a valid file URL")?;
add_commit(
table_url.as_str(),
store.as_ref(),
1,
commit_with_ts_partitioned_adds(
1,
&[
("file_A.parquet", "2024-06-15T14:30:00+05:00"),
("file_B.parquet", "2024-06-15T14:30:00Z"),
],
),
)
.await?;
let nine_thirty_utc_us: i64 = 1_718_443_800_000_000; let fourteen_thirty_utc_us: i64 = 1_718_461_800_000_000;
let predicate = Arc::new(Pred::eq(
column_expr!("ts"),
Expr::literal(Scalar::Timestamp(nine_thirty_utc_us)),
));
assert_eq!(
surviving_files(&table_path, engine.clone(), predicate, use_parallel)?,
1
);
let predicate = Arc::new(Pred::eq(
column_expr!("ts"),
Expr::literal(Scalar::Timestamp(fourteen_thirty_utc_us)),
));
assert_eq!(
surviving_files(&table_path, engine, predicate, use_parallel)?,
1
);
Ok(())
}