use std::collections::HashMap;
use std::sync::Arc;
use buoyant_kernel as delta_kernel;
use delta_kernel::arrow::array::{Int64Array, RecordBatch};
use delta_kernel::arrow::datatypes::Schema as ArrowSchema;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::default::executor::tokio::TokioMultiThreadExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, Expression as Expr, Predicate as Pred, PredicateRef};
use delta_kernel::scan::{AfterSequentialScanMetadata, ParallelScanMetadata};
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::{Snapshot, SnapshotRef};
use rstest::rstest;
use test_utils::{create_table_and_load_snapshot, test_table_setup_mt, write_batch_to_table};
use url::Url;
use crate::common::write_utils::set_table_properties;
type TestEngine = DefaultEngine<TokioMultiThreadExecutor>;
fn capped_schema() -> SchemaRef {
Arc::new(
StructType::try_new((0..5).map(|i| StructField::nullable(format!("c{i}"), DataType::LONG)))
.unwrap(),
)
}
fn long_batch(schema: &SchemaRef, c0_start: i64, rows: i64) -> RecordBatch {
let arrow_schema: ArrowSchema = schema.as_ref().try_into_arrow().unwrap();
let c0 = Int64Array::from_iter_values(c0_start..c0_start + rows);
let c1 = Int64Array::from_iter_values(c0_start + 100..c0_start + 100 + rows);
let cn = Int64Array::from_iter_values(std::iter::repeat_n(c0_start, rows as usize));
RecordBatch::try_new(
Arc::new(arrow_schema),
vec![
Arc::new(c0),
Arc::new(c1),
Arc::new(cn.clone()),
Arc::new(cn.clone()),
Arc::new(cn),
],
)
.unwrap()
}
async fn build_capped_table_with_checkpoint(
) -> Result<(tempfile::TempDir, String, Arc<TestEngine>), Box<dyn std::error::Error>> {
let schema = capped_schema();
let (tmp_dir, table_path, engine) = test_table_setup_mt()?;
let table_url = Url::from_directory_path(&table_path)
.map_err(|_| "table_path should be a valid file URL")?;
let snapshot =
create_table_and_load_snapshot(&table_path, schema.clone(), engine.as_ref(), &[])?;
let mut snapshot = set_table_properties(
&table_path,
&table_url,
engine.as_ref(),
snapshot.version(),
&[("delta.dataSkippingNumIndexedCols", "2")],
)?;
for c0_start in [10, 50, 100] {
snapshot = write_batch_to_table(
&snapshot,
engine.as_ref(),
long_batch(&schema, c0_start, 10),
HashMap::new(),
)
.await?;
}
snapshot.checkpoint(engine.as_ref(), None)?;
Ok((tmp_dir, table_path, engine))
}
fn count_selected_files(
snapshot: SnapshotRef,
engine: Arc<TestEngine>,
predicate: PredicateRef,
use_parallel: bool,
) -> Result<usize, Box<dyn std::error::Error>> {
let scan = snapshot.scan_builder().with_predicate(predicate).build()?;
fn push_path(paths: &mut Vec<String>, scan_file: delta_kernel::scan::state::ScanFile) {
paths.push(scan_file.path);
}
let mut paths: Vec<String> = Vec::new();
if use_parallel {
let mut sequential = scan.parallel_scan_metadata(engine.clone())?;
for sm in sequential.by_ref() {
paths = sm?.visit_scan_files(paths, push_path)?;
}
if let AfterSequentialScanMetadata::Parallel { state, files } = sequential.finish()? {
let state = Arc::from(state);
for file in files {
let parallel =
ParallelScanMetadata::try_new(engine.clone(), Arc::clone(&state), vec![file])?;
for sm in parallel {
paths = sm?.visit_scan_files(paths, push_path)?;
}
}
}
} else {
for sm in scan.scan_metadata(engine.as_ref())? {
paths = sm?.visit_scan_files(paths, push_path)?;
}
}
Ok(paths.len())
}
fn surviving_files(
table_path: &str,
engine: Arc<TestEngine>,
predicate: PredicateRef,
use_parallel: bool,
) -> Result<usize, Box<dyn std::error::Error>> {
let url = delta_kernel::try_parse_uri(table_path)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
count_selected_files(snapshot, engine, predicate, use_parallel)
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_in_cap_prunes(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::gt(column_expr!("c0"), Expr::literal(60i64)));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 1);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_in_cap_keeps_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::gt(column_expr!("c0"), Expr::literal(0i64)));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 3);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_past_cap_keeps_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::gt(column_expr!("c4"), Expr::literal(1_000_000i64)));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 3);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mixed_and_in_cap_prunes(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::and(
Pred::gt(column_expr!("c0"), Expr::literal(60i64)),
Pred::gt(column_expr!("c4"), Expr::literal(50i64)),
));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 1);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mixed_or_keeps_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::or(
Pred::gt(column_expr!("c0"), Expr::literal(1000i64)),
Pred::gt(column_expr!("c4"), Expr::literal(50i64)),
));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 3);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn boundary_c1_prunes_all(
#[values(false, true)] use_parallel: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (_tmp_dir, table_path, engine) = build_capped_table_with_checkpoint().await?;
let pred = Arc::new(Pred::and(
Pred::gt(column_expr!("c1"), Expr::literal(250i64)),
Pred::gt(column_expr!("c2"), Expr::literal(50i64)),
));
assert_eq!(surviving_files(&table_path, engine, pred, use_parallel)?, 0);
Ok(())
}