use std::path::PathBuf;
use std::sync::Arc;
use bytes::Bytes;
use rstest::rstest;
use crate::arrow::array::{Array, BooleanArray, Int64Array, StringArray, StructArray};
use crate::arrow::compute::filter_record_batch;
use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Fields, Schema as ArrowSchema};
use crate::arrow::record_batch::RecordBatch;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::engine::sync::SyncEngine;
use crate::expressions::{
column_expr, column_name, column_pred, ColumnName, Expression as Expr, Predicate as Pred,
};
use crate::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use crate::parquet::arrow::arrow_writer::ArrowWriter;
use crate::scan::data_skipping::as_checkpoint_skipping_predicate;
use crate::scan::state::ScanFile;
use crate::schema::{ColumnMetadataKey, DataType, StructField, StructType};
use crate::{
Engine, EvaluationHandler, FileDataReadResultIterator, FileMeta, JsonHandler, ParquetFooter,
ParquetHandler, PredicateRef, StorageHandler,
};
use crate::{EngineData, Snapshot};
use super::*;
macro_rules! get_column {
($source:expr, $name:expr, $ty:ty) => {
$source
.column_by_name($name)
.unwrap_or_else(|| panic!("should have column '{}'", $name))
.as_any()
.downcast_ref::<$ty>()
.unwrap_or_else(|| panic!("column '{}' should be {}", $name, stringify!($ty)))
};
}
fn field_names(s: &StructArray) -> Vec<String> {
s.fields().iter().map(|f| f.name().clone()).collect()
}
#[test]
fn test_static_skipping() {
const NULL: Pred = Pred::null_literal();
let test_cases = [
(false, column_pred!("a")),
(true, Pred::literal(false)),
(false, Pred::literal(true)),
(false, NULL), (true, Pred::and(column_pred!("a"), Pred::literal(false))),
(false, Pred::or(column_pred!("a"), Pred::literal(true))),
(false, Pred::or(column_pred!("a"), Pred::literal(false))),
(false, Pred::lt(column_expr!("a"), Expr::literal(10))),
(false, Pred::lt(Expr::literal(10), Expr::literal(100))),
(true, Pred::gt(Expr::literal(10), Expr::literal(100))),
(false, Pred::and(NULL, column_pred!("a"))), ];
for (should_skip, predicate) in test_cases {
assert_eq!(
can_statically_skip_all_files(&predicate),
should_skip,
"Failed for predicate: {predicate:#?}"
);
}
}
#[test]
fn test_physical_predicate() {
let logical_schema = StructType::new_unchecked(vec![
StructField::nullable("a", DataType::LONG),
StructField::nullable("b", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_b",
)]),
StructField::nullable("phys_b", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_c",
)]),
StructField::nullable(
"nested",
StructType::new_unchecked(vec![
StructField::nullable("x", DataType::LONG),
StructField::nullable("y", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_y",
)]),
]),
),
StructField::nullable(
"mapped",
StructType::new_unchecked(vec![StructField::nullable("n", DataType::LONG)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_n",
)])]),
)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_mapped",
)]),
]);
let test_cases = [
(Pred::literal(true), Some(PhysicalPredicate::None)),
(Pred::literal(false), Some(PhysicalPredicate::StaticSkipAll)),
(column_pred!("x"), None), (
column_pred!("a"),
Some(PhysicalPredicate::Some(
column_pred!("a").into(),
StructType::new_unchecked(vec![StructField::nullable("a", DataType::LONG)]).into(),
)),
),
(
column_pred!("b"),
Some(PhysicalPredicate::Some(
column_pred!("phys_b").into(),
StructType::new_unchecked(vec![StructField::nullable("phys_b", DataType::LONG)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_b",
)])])
.into(),
)),
),
(
column_pred!("nested.x"),
Some(PhysicalPredicate::Some(
column_pred!("nested.x").into(),
StructType::new_unchecked(vec![StructField::nullable(
"nested",
StructType::new_unchecked(vec![StructField::nullable("x", DataType::LONG)]),
)])
.into(),
)),
),
(
column_pred!("nested.y"),
Some(PhysicalPredicate::Some(
column_pred!("nested.phys_y").into(),
StructType::new_unchecked(vec![StructField::nullable(
"nested",
StructType::new_unchecked(vec![StructField::nullable(
"phys_y",
DataType::LONG,
)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_y",
)])]),
)])
.into(),
)),
),
(
column_pred!("mapped.n"),
Some(PhysicalPredicate::Some(
column_pred!("phys_mapped.phys_n").into(),
StructType::new_unchecked(vec![StructField::nullable(
"phys_mapped",
StructType::new_unchecked(vec![StructField::nullable(
"phys_n",
DataType::LONG,
)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_n",
)])]),
)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_mapped",
)])])
.into(),
)),
),
(
Pred::and(column_pred!("mapped.n"), Pred::literal(true)),
Some(PhysicalPredicate::Some(
Pred::and(column_pred!("phys_mapped.phys_n"), Pred::literal(true)).into(),
StructType::new_unchecked(vec![StructField::nullable(
"phys_mapped",
StructType::new_unchecked(vec![StructField::nullable(
"phys_n",
DataType::LONG,
)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_n",
)])]),
)
.with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_mapped",
)])])
.into(),
)),
),
(
Pred::and(column_pred!("mapped.n"), Pred::literal(false)),
Some(PhysicalPredicate::StaticSkipAll),
),
];
for (predicate, expected) in test_cases {
let result =
PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::Name).ok();
assert_eq!(
result, expected,
"Failed for predicate: {predicate:#?}, expected {expected:#?}, got {result:#?}"
);
}
}
#[rstest]
#[case::without_column_mapping(
// predicate: createdat > 500 AND value < 100, schema: createdAt, Value
StructType::new_unchecked(vec![
StructField::nullable("createdAt", DataType::LONG),
StructField::nullable("Value", DataType::LONG),
]),
Pred::and(
Pred::gt(column_expr!("createdat"), Expr::literal(500i64)),
Pred::lt(column_expr!("value"), Expr::literal(100i64)),
),
ColumnMappingMode::None,
PhysicalPredicate::Some(
Arc::new(Pred::and(
Pred::gt(column_expr!("createdAt"), Expr::literal(500i64)),
Pred::lt(column_expr!("Value"), Expr::literal(100i64)),
)),
StructType::new_unchecked(vec![
StructField::nullable("createdAt", DataType::LONG),
StructField::nullable("Value", DataType::LONG),
]).into(),
),
)]
#[case::with_column_mapping(
// predicate: createdat > 500 AND value < 100, schema has physical name metadata
StructType::new_unchecked(vec![
StructField::nullable("createdAt", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_created",
)]),
StructField::nullable("Value", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_value",
)]),
]),
Pred::and(
Pred::gt(column_expr!("createdat"), Expr::literal(500i64)),
Pred::lt(column_expr!("value"), Expr::literal(100i64)),
),
ColumnMappingMode::Name,
PhysicalPredicate::Some(
Arc::new(Pred::and(
Pred::gt(column_expr!("phys_created"), Expr::literal(500i64)),
Pred::lt(column_expr!("phys_value"), Expr::literal(100i64)),
)),
StructType::new_unchecked(vec![
StructField::nullable("phys_created", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_created",
)]),
StructField::nullable("phys_value", DataType::LONG).with_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
"phys_value",
)]),
]).into(),
),
)]
#[case::duplicate_column_different_casing(
// predicate references same column with different casings: value > 5 AND VALUE < 10
StructType::new_unchecked(vec![
StructField::nullable("Value", DataType::LONG),
]),
Pred::and(
Pred::gt(column_expr!("value"), Expr::literal(5i64)),
Pred::lt(column_expr!("VALUE"), Expr::literal(10i64)),
),
ColumnMappingMode::None,
PhysicalPredicate::Some(
Arc::new(Pred::and(
Pred::gt(column_expr!("Value"), Expr::literal(5i64)),
Pred::lt(column_expr!("Value"), Expr::literal(10i64)),
)),
StructType::new_unchecked(vec![StructField::nullable("Value", DataType::LONG)])
.into(),
),
)]
#[case::nested_fields(
// predicate references nested.fieldname but schema has Nested.FieldName
StructType::new_unchecked(vec![StructField::nullable(
"Nested",
StructType::new_unchecked(vec![StructField::nullable("FieldName", DataType::LONG)]),
)]),
column_pred!("nested.fieldname"),
ColumnMappingMode::None,
PhysicalPredicate::Some(
column_pred!("Nested.FieldName").into(),
StructType::new_unchecked(vec![StructField::nullable(
"Nested",
StructType::new_unchecked(vec![
StructField::nullable("FieldName", DataType::LONG)
]),
)]).into(),
),
)]
fn test_physical_predicate_case_insensitive(
#[case] logical_schema: StructType,
#[case] predicate: Predicate,
#[case] column_mapping_mode: ColumnMappingMode,
#[case] expected: PhysicalPredicate,
) {
let result =
PhysicalPredicate::try_new(&predicate, &logical_schema, column_mapping_mode).unwrap();
assert_eq!(result, expected);
}
#[test]
fn test_physical_predicate_case_insensitive_unknown_column() {
let logical_schema =
StructType::new_unchecked(vec![StructField::nullable("createdAt", DataType::LONG)]);
let result = PhysicalPredicate::try_new(
&column_pred!("nonexistent"),
&logical_schema,
ColumnMappingMode::None,
);
assert!(result.is_err());
}
fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult<Vec<String>> {
let scan_metadata_iter = scan.scan_metadata(engine)?;
fn scan_metadata_callback(paths: &mut Vec<String>, scan_file: ScanFile) {
paths.push(scan_file.path.to_string());
assert!(scan_file.dv_info.deletion_vector.is_none());
}
let mut files = vec![];
for res in scan_metadata_iter {
let scan_metadata = res?;
files = scan_metadata.visit_scan_files(files, scan_metadata_callback)?;
}
Ok(files)
}
#[test]
fn test_scan_metadata_paths() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files = get_files_for_scan(scan, &engine).unwrap();
assert_eq!(files.len(), 1);
assert_eq!(
files[0],
"part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"
);
}
#[test_log::test]
fn test_scan_metadata() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files: Vec<Box<dyn EngineData>> = scan.execute(engine).unwrap().try_collect().unwrap();
assert_eq!(files.len(), 1);
let num_rows = files[0].as_ref().len();
assert_eq!(num_rows, 10)
}
#[test_log::test]
fn test_scan_metadata_from_same_version() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let version = snapshot.version();
let scan = snapshot.scan_builder().build().unwrap();
let files: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.map_ok(|ScanMetadata { scan_files, .. }| {
let (underlying_data, selection_vector) = scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
let filtered_batch =
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap();
Box::new(ArrowEngineData::from(filtered_batch)) as Box<dyn EngineData>
})
.try_collect()
.unwrap();
let new_files: Vec<_> = scan
.scan_metadata_from(engine.as_ref(), version, files, None)
.unwrap()
.try_collect()
.unwrap();
assert_eq!(new_files.len(), 1);
}
#[test_log::test]
fn test_scan_metadata_from_with_update() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url.clone())
.at_version(0)
.build(engine.as_ref())
.unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let files: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.map_ok(|ScanMetadata { scan_files, .. }| {
let (underlying_data, selection_vector) = scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap()
})
.try_collect()
.unwrap();
assert_eq!(files.len(), 1);
assert_eq!(files[0].num_rows(), 3);
let files: Vec<_> = files
.into_iter()
.map(|b| Box::new(ArrowEngineData::from(b)) as Box<dyn EngineData>)
.collect();
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(engine.as_ref())
.unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let new_files: Vec<_> = scan
.scan_metadata_from(engine.as_ref(), 0, files, None)
.unwrap()
.map_ok(|ScanMetadata { scan_files, .. }| {
let (underlying_data, selection_vector) = scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap()
})
.try_collect()
.unwrap();
assert_eq!(new_files.len(), 2);
assert_eq!(new_files[0].num_rows(), 3);
assert_eq!(new_files[1].num_rows(), 3);
}
#[test]
fn test_get_partition_value() {
let cases = [
(
"string",
PrimitiveType::String,
Scalar::String("string".to_string()),
),
("123", PrimitiveType::Integer, Scalar::Integer(123)),
("1234", PrimitiveType::Long, Scalar::Long(1234)),
("12", PrimitiveType::Short, Scalar::Short(12)),
("1", PrimitiveType::Byte, Scalar::Byte(1)),
("1.1", PrimitiveType::Float, Scalar::Float(1.1)),
("10.10", PrimitiveType::Double, Scalar::Double(10.1)),
("true", PrimitiveType::Boolean, Scalar::Boolean(true)),
("2024-01-01", PrimitiveType::Date, Scalar::Date(19723)),
("1970-01-01", PrimitiveType::Date, Scalar::Date(0)),
(
"1970-01-01 00:00:00",
PrimitiveType::Timestamp,
Scalar::Timestamp(0),
),
(
"1970-01-01 00:00:00.123456",
PrimitiveType::Timestamp,
Scalar::Timestamp(123456),
),
(
"1970-01-01 00:00:00.123456789",
PrimitiveType::Timestamp,
Scalar::Timestamp(123456),
),
];
for (raw, data_type, expected) in &cases {
let value = crate::scan::transform_spec::parse_partition_value_raw(
Some(&raw.to_string()),
&DataType::Primitive(data_type.clone()),
)
.unwrap();
assert_eq!(value, *expected);
}
}
#[test]
fn test_replay_for_scan_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let result = scan.replay_for_scan_metadata(&engine).unwrap();
let data: Vec<_> = result.actions.try_collect().unwrap();
assert_eq!(data.len(), 5);
}
#[test]
fn test_data_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.clone().scan_builder().build().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
let int_col = column_expr!("numeric.ints.int32");
let value = Expr::literal(1000i32);
let predicate = Arc::new(int_col.clone().gt(value.clone()));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
let predicate = Arc::new(int_col.lt(value));
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
}
#[test]
fn test_missing_column_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let predicate = Arc::new(column_expr!("missing").lt(Expr::literal(1000i64)));
let scan = snapshot
.clone()
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
let predicate = Arc::new(column_expr!("numeric.ints.invalid").lt(Expr::literal(1000)));
snapshot
.scan_builder()
.with_predicate(predicate)
.build()
.expect_err("unknown column");
}
#[test_log::test]
fn test_scan_with_checkpoint() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/with_checkpoint_no_last_checkpoint/",
))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build()?;
let files = get_files_for_scan(scan, &engine)?;
assert_eq!(
files,
vec!["part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet"]
);
Ok(())
}
fn assert_stats_struct_matches_json(
struct_array: &StructArray,
json_object: &serde_json::Map<String, serde_json::Value>,
row_idx: usize,
field_name: &str,
) {
for (col_name, json_val) in json_object {
let Some(col) = struct_array.column_by_name(col_name) else {
continue;
};
if col.is_null(row_idx) {
continue;
}
if let Some(int_col) = col.as_any().downcast_ref::<Int64Array>() {
assert_eq!(
json_val.as_i64().unwrap(),
int_col.value(row_idx),
"{field_name}.{col_name} mismatch at row {row_idx}"
);
}
}
}
#[test]
fn test_scan_metadata_with_stats_columns() {
const STATS_PARSED_COL: &str = "stats_parsed";
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot
.scan_builder()
.include_all_stats_columns()
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!scan_metadata_results.is_empty(),
"Should have scan metadata"
);
let mut total_num_records: i64 = 0;
let mut file_count = 0;
for scan_metadata in scan_metadata_results {
let (underlying_data, selection_vector) = scan_metadata.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
let filtered_batch =
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap();
let schema = filtered_batch.schema();
let field = schema
.field_with_name(STATS_PARSED_COL)
.expect("Schema should contain stats_parsed column");
assert!(
matches!(field.data_type(), ArrowDataType::Struct(_)),
"stats_parsed should be a struct type, got: {:?}",
field.data_type()
);
let stats_parsed = get_column!(filtered_batch, STATS_PARSED_COL, StructArray);
let num_records = get_column!(stats_parsed, "numRecords", Int64Array);
let min_values = get_column!(stats_parsed, "minValues", StructArray);
let max_values = get_column!(stats_parsed, "maxValues", StructArray);
let null_count = get_column!(stats_parsed, "nullCount", StructArray);
let stats_json = get_column!(filtered_batch, "stats", StringArray);
for i in 0..stats_json.len() {
if stats_parsed.is_null(i) || stats_json.is_null(i) {
continue;
}
let json_stats: serde_json::Value =
serde_json::from_str(stats_json.value(i)).expect("stats JSON should be valid");
if let Some(json_num) = json_stats.get("numRecords").and_then(|v| v.as_i64()) {
assert_eq!(
json_num,
num_records.value(i),
"numRecords mismatch at row {i}"
);
}
if let Some(obj) = json_stats.get("minValues").and_then(|v| v.as_object()) {
assert_stats_struct_matches_json(min_values, obj, i, "minValues");
}
if let Some(obj) = json_stats.get("maxValues").and_then(|v| v.as_object()) {
assert_stats_struct_matches_json(max_values, obj, i, "maxValues");
}
if let Some(obj) = json_stats.get("nullCount").and_then(|v| v.as_object()) {
assert_stats_struct_matches_json(null_count, obj, i, "nullCount");
}
total_num_records += num_records.value(i);
file_count += 1;
}
}
assert!(file_count > 0, "Should have processed at least one file");
assert!(total_num_records > 0, "Should have non-zero numRecords");
println!(
"Verified {file_count} files with total {total_num_records} records from stats_parsed"
);
}
#[test]
fn test_scan_metadata_stats_columns_with_predicate() {
const STATS_PARSED_COL: &str = "stats_parsed";
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let predicate = Arc::new(column_expr!("id").gt(Expr::literal(0i64)));
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.include_all_stats_columns()
.build()
.expect("Should succeed when using both predicate and stats_columns");
assert!(
scan.physical_predicate().is_some(),
"Scan should have a physical predicate for data skipping"
);
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!scan_metadata_results.is_empty(),
"Should have scan metadata results"
);
let mut file_count = 0;
for scan_metadata in scan_metadata_results {
let (underlying_data, selection_vector) = scan_metadata.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
let filtered_batch =
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap();
let schema = filtered_batch.schema();
let field = schema
.field_with_name(STATS_PARSED_COL)
.expect("Schema should contain stats_parsed column");
assert!(
matches!(field.data_type(), ArrowDataType::Struct(_)),
"stats_parsed should be a struct type"
);
let stats_parsed = get_column!(filtered_batch, STATS_PARSED_COL, StructArray);
let num_records = get_column!(stats_parsed, "numRecords", Int64Array);
for i in 0..filtered_batch.num_rows() {
if !stats_parsed.is_null(i) {
assert!(num_records.value(i) > 0, "numRecords should be positive");
file_count += 1;
}
}
}
assert!(
file_count > 0,
"Should have processed at least one file with stats"
);
}
#[test]
fn test_prefix_columns_simple() {
let mut prefixer = PrefixColumns {
prefix: ColumnName::new(["add", "stats_parsed"]),
};
let pred = Pred::gt(column_expr!("x"), Expr::literal(100i64));
let result = prefixer.transform_pred(&pred).unwrap().into_owned();
let refs: Vec<_> = result.references().into_iter().collect();
assert_eq!(refs.len(), 1);
assert_eq!(*refs[0], column_name!("add.stats_parsed.x"));
}
#[test]
fn test_build_actions_meta_predicate_with_predicate() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let predicate = Arc::new(Pred::gt(column_expr!("id"), Expr::literal(400i64)));
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
let meta_pred = scan.build_actions_meta_predicate();
assert!(
meta_pred.is_some(),
"Should produce an actions meta predicate for a data-skipping-eligible predicate"
);
let pred = meta_pred.unwrap();
for col_ref in pred.references() {
let path: Vec<_> = col_ref.iter().collect();
assert_eq!(
path[0], "add",
"Column reference should start with 'add': {col_ref}"
);
assert_eq!(
path[1], "stats_parsed",
"Column reference should have 'stats_parsed' as second element: {col_ref}"
);
}
}
#[test]
fn test_build_actions_meta_predicate_no_predicate() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
assert!(
scan.build_actions_meta_predicate().is_none(),
"Should return None when there is no predicate"
);
}
#[test]
fn test_build_actions_meta_predicate_static_skip_all() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let predicate = Arc::new(Pred::literal(false));
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.build()
.unwrap();
assert!(
scan.build_actions_meta_predicate().is_none(),
"StaticSkipAll predicate should return None"
);
}
struct CheckpointParquetBuilder {
arrow_schema: Arc<ArrowSchema>,
id_fields: Fields,
stats_fields: Fields,
buffer: Vec<u8>,
writer: Option<ArrowWriter<Vec<u8>>>,
}
impl CheckpointParquetBuilder {
fn new() -> Self {
let id_fields = Fields::from(vec![Field::new("id", ArrowDataType::Int64, true)]);
let stats_fields = Fields::from(vec![
Field::new("maxValues", ArrowDataType::Struct(id_fields.clone()), true),
Field::new("minValues", ArrowDataType::Struct(id_fields.clone()), true),
Field::new("nullCount", ArrowDataType::Struct(id_fields.clone()), true),
Field::new("numRecords", ArrowDataType::Int64, true),
]);
let add_fields = Fields::from(vec![Field::new(
"stats_parsed",
ArrowDataType::Struct(stats_fields.clone()),
true,
)]);
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"add",
ArrowDataType::Struct(add_fields.clone()),
true,
)]));
let buffer = Vec::new();
let writer = ArrowWriter::try_new(buffer, arrow_schema.clone(), None).unwrap();
Self {
arrow_schema,
id_fields,
stats_fields,
buffer: Vec::new(),
writer: Some(writer),
}
}
fn write_row_group(
&mut self,
max_ids: &[Option<i64>],
min_ids: &[Option<i64>],
null_counts: &[Option<i64>],
num_records: &[i64],
) {
let make_id_struct = |vals: &[Option<i64>]| -> StructArray {
StructArray::from(vec![(
Arc::new(Field::new("id", ArrowDataType::Int64, true)),
Arc::new(Int64Array::from(vals.to_vec())) as Arc<dyn Array>,
)])
};
let stats_parsed = StructArray::from(vec![
(
Arc::new(Field::new(
"maxValues",
ArrowDataType::Struct(self.id_fields.clone()),
true,
)),
Arc::new(make_id_struct(max_ids)) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"minValues",
ArrowDataType::Struct(self.id_fields.clone()),
true,
)),
Arc::new(make_id_struct(min_ids)) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"nullCount",
ArrowDataType::Struct(self.id_fields.clone()),
true,
)),
Arc::new(make_id_struct(null_counts)) as Arc<dyn Array>,
),
(
Arc::new(Field::new("numRecords", ArrowDataType::Int64, true)),
Arc::new(Int64Array::from(num_records.to_vec())) as Arc<dyn Array>,
),
]);
let add = StructArray::from(vec![(
Arc::new(Field::new(
"stats_parsed",
ArrowDataType::Struct(self.stats_fields.clone()),
true,
)),
Arc::new(stats_parsed) as Arc<dyn Array>,
)]);
let batch = RecordBatch::try_new(self.arrow_schema.clone(), vec![Arc::new(add)]).unwrap();
let writer = self.writer.as_mut().unwrap();
writer.write(&batch).unwrap();
writer.flush().unwrap();
}
fn finish(mut self) -> Bytes {
let writer = self.writer.take().unwrap();
self.buffer = writer.into_inner().unwrap();
Bytes::from(self.buffer)
}
}
fn build_prefixed_checkpoint_predicate(pred: &Pred) -> Option<Pred> {
let skipping_pred = as_checkpoint_skipping_predicate(pred, &[])?;
let mut prefixer = PrefixColumns {
prefix: ColumnName::new(["add", "stats_parsed"]),
};
Some(
prefixer
.transform_pred(&skipping_pred)
.unwrap()
.into_owned(),
)
}
fn apply_row_group_filter(parquet_bytes: Bytes, meta_predicate: &Pred) -> usize {
ParquetRecordBatchReaderBuilder::try_new(parquet_bytes)
.unwrap()
.with_row_group_filter(meta_predicate, None)
.build()
.unwrap()
.map(|b| b.unwrap().num_rows())
.sum()
}
#[rstest]
#[case::comparison(
Pred::gt(column_expr!("id"), Expr::literal(200i64)),
// Should skip RG2 and RG3, but https://github.com/apache/arrow-rs/issues/9451
Some(6), // Some(3),
"keep RG 0 (null stats) + RG 1 (max>200), skip RG 2 + RG 3 (max<200)"
)]
#[case::is_null(
Pred::is_null(column_expr!("id")),
// Should skip RG 1 (nullCount=0), but https://github.com/apache/arrow-rs/issues/9451
Some(6), // Some(5),
"keep RG 0 (nullCount>0) + RG 2 (nullCount>0) + RG 3 (null nullCount), skip RG 1 (nullCount=0)"
)]
#[case::is_not_null(
Pred::not(Pred::is_null(column_expr!("id"))),
None,
"IS NOT NULL produces no skipping predicate — column vs column (#1873)"
)]
fn test_checkpoint_row_group_skipping(
#[case] pred: Pred,
#[case] expected_rows: Option<usize>,
#[case] description: &str,
) {
let mut builder = CheckpointParquetBuilder::new();
builder.write_row_group(
&[Some(100), None],
&[Some(1), None],
&[Some(5), None],
&[100, 50],
);
builder.write_row_group(&[Some(300)], &[Some(201)], &[Some(0)], &[100]);
builder.write_row_group(&[Some(50)], &[Some(1)], &[Some(10)], &[100]);
builder.write_row_group(
&[Some(150), Some(40)],
&[Some(1), Some(1)],
&[Some(0), None],
&[100, 50],
);
let parquet_bytes = builder.finish();
let meta_predicate = build_prefixed_checkpoint_predicate(&pred);
match expected_rows {
Some(expected) => {
let meta_predicate =
meta_predicate.expect("predicate should produce a checkpoint skipping predicate");
let total_rows = apply_row_group_filter(parquet_bytes, &meta_predicate);
assert_eq!(total_rows, expected, "{description}");
}
None => {
assert!(meta_predicate.is_none(), "{description}");
let total_rows: usize = ParquetRecordBatchReaderBuilder::try_new(parquet_bytes)
.unwrap()
.build()
.unwrap()
.map(|b| b.unwrap().num_rows())
.sum();
assert_eq!(total_rows, 6, "all rows should be read without a predicate");
}
}
}
#[test]
fn test_skip_stats_disables_data_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let predicate = Arc::new(Pred::gt(column_expr!("id"), Expr::literal(400i64)));
let scan = snapshot
.scan_builder()
.with_predicate(predicate)
.with_skip_stats(true)
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let mut selected_file_count = 0;
for scan_metadata in &scan_metadata_results {
let selection_vector = scan_metadata.scan_files.selection_vector();
selected_file_count += selection_vector
.iter()
.filter(|&&selected| selected)
.count();
}
assert_eq!(selected_file_count, 6);
}
#[test]
fn test_skip_stats_after_include_all_stats_columns_wins() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let predicate = Arc::new(Pred::gt(column_expr!("id"), Expr::literal(400i64)));
let scan = snapshot
.scan_builder()
.include_all_stats_columns()
.with_skip_stats(true)
.with_predicate(predicate)
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
let mut selected_file_count = 0;
for scan_metadata in &scan_metadata_results {
let selection_vector = scan_metadata.scan_files.selection_vector();
selected_file_count += selection_vector
.iter()
.filter(|&&selected| selected)
.count();
}
assert_eq!(selected_file_count, 6);
}
#[test]
fn test_with_stats_columns_empty_no_stats_output() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot
.scan_builder()
.with_stats_columns(vec![])
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!scan_metadata_results.is_empty(),
"Should have scan metadata"
);
for scan_metadata in scan_metadata_results {
let (underlying_data, _selection_vector) = scan_metadata.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
assert!(
batch.column_by_name("stats_parsed").is_none(),
"stats_parsed should not be present with empty stats columns"
);
}
}
#[test]
fn test_scan_metadata_with_specific_stats_columns() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot
.scan_builder()
.with_stats_columns(vec![column_name!("id")])
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!scan_metadata_results.is_empty(),
"Should have scan metadata"
);
for scan_metadata in scan_metadata_results {
let (underlying_data, selection_vector) = scan_metadata.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
let filtered_batch =
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap();
let stats_parsed = get_column!(filtered_batch, "stats_parsed", StructArray);
let min_values = get_column!(stats_parsed, "minValues", StructArray);
let max_values = get_column!(stats_parsed, "maxValues", StructArray);
let null_count = get_column!(stats_parsed, "nullCount", StructArray);
assert_eq!(
field_names(min_values),
vec!["id"],
"minValues should only contain 'id'"
);
assert_eq!(
field_names(max_values),
vec!["id"],
"maxValues should only contain 'id'"
);
assert_eq!(
field_names(null_count),
vec!["id"],
"nullCount should only contain 'id'"
);
}
}
#[test]
fn test_scan_metadata_with_multiple_stats_columns() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot
.scan_builder()
.with_stats_columns(vec![column_name!("id"), column_name!("name")])
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!scan_metadata_results.is_empty(),
"Should have scan metadata"
);
for scan_metadata in scan_metadata_results {
let (underlying_data, selection_vector) = scan_metadata.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
let filtered_batch =
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap();
let stats_parsed = get_column!(filtered_batch, "stats_parsed", StructArray);
let min_values = get_column!(stats_parsed, "minValues", StructArray);
let max_values = get_column!(stats_parsed, "maxValues", StructArray);
let null_count = get_column!(stats_parsed, "nullCount", StructArray);
let expected = vec!["id", "name"];
assert_eq!(
field_names(min_values),
expected,
"minValues should contain 'id' and 'name'"
);
assert_eq!(
field_names(max_values),
expected,
"maxValues should contain 'id' and 'name'"
);
assert_eq!(
field_names(null_count),
expected,
"nullCount should contain 'id' and 'name'"
);
assert!(
min_values.column_by_name("age").is_none(),
"minValues should NOT contain 'age'"
);
assert!(
min_values.column_by_name("salary").is_none(),
"minValues should NOT contain 'salary'"
);
}
}
#[test]
fn test_scan_metadata_with_nonexistent_stats_columns() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot
.scan_builder()
.with_stats_columns(vec![column_name!("nonexistent_column")])
.build()
.unwrap();
let scan_metadata_results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(
!scan_metadata_results.is_empty(),
"Should have scan metadata"
);
for scan_metadata in scan_metadata_results {
let (underlying_data, selection_vector) = scan_metadata.scan_files.into_parts();
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(underlying_data)
.unwrap()
.into();
let filtered_batch =
filter_record_batch(&batch, &BooleanArray::from(selection_vector)).unwrap();
let stats_parsed = get_column!(filtered_batch, "stats_parsed", StructArray);
assert!(
stats_parsed.column_by_name("numRecords").is_some(),
"Should still have numRecords"
);
}
}
struct EmptyParquetHandler;
impl ParquetHandler for EmptyParquetHandler {
fn read_parquet_files(
&self,
_files: &[FileMeta],
_schema: crate::schema::SchemaRef,
_predicate: Option<PredicateRef>,
) -> crate::DeltaResult<FileDataReadResultIterator> {
Ok(Box::new(std::iter::empty()))
}
fn read_parquet_footer(&self, _file: &FileMeta) -> crate::DeltaResult<ParquetFooter> {
unimplemented!()
}
fn write_parquet_file(
&self,
_location: url::Url,
_data: Box<dyn Iterator<Item = crate::DeltaResult<Box<dyn EngineData>>> + Send>,
) -> crate::DeltaResult<()> {
unimplemented!()
}
}
struct EmptyParquetEngine(Arc<SyncEngine>);
impl Engine for EmptyParquetEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.0.evaluation_handler()
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
self.0.json_handler()
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
Arc::new(EmptyParquetHandler)
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
self.0.storage_handler()
}
}
#[test]
fn execute_errors_when_parquet_returns_empty_for_file_with_positive_stats() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(EmptyParquetEngine(Arc::new(SyncEngine::new())));
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let results: Vec<_> = scan.execute(engine).unwrap().collect();
assert_eq!(results.len(), 1, "should emit exactly one error item");
assert!(results[0].is_err(), "the result should be an error, got Ok");
let err = results[0].as_ref().err().unwrap().to_string();
assert!(
err.contains("ParquetHandler returned no data"),
"unexpected error message: {err}"
);
}
#[test]
fn execute_does_not_error_when_parquet_returns_empty_and_stats_absent() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-cdf/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = Arc::new(EmptyParquetEngine(Arc::new(SyncEngine::new())));
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
let results: Vec<_> = scan.execute(engine).unwrap().collect();
assert!(
results.iter().all(|r| r.is_ok()),
"expected no errors for stats-absent files"
);
}
mod scan_metadata_completed_tests {
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use rstest::rstest;
use crate::engine::default::DefaultEngineBuilder;
use crate::expressions::{column_expr, Expression as Expr, Predicate as Pred};
use crate::metrics::MetricEvent;
use crate::object_store::local::LocalFileSystem;
use crate::utils::test_utils::CapturingReporter;
use crate::Snapshot;
fn run_scan(table: &str, predicate: Option<Arc<Pred>>) -> (Arc<CapturingReporter>, usize) {
let path = std::fs::canonicalize(PathBuf::from(table)).unwrap();
let url = url::Url::from_directory_path(&path).unwrap();
let reporter = Arc::new(CapturingReporter::default());
let engine = Arc::new(
DefaultEngineBuilder::new(Arc::new(LocalFileSystem::new()))
.with_metrics_reporter(reporter.clone())
.build(),
);
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let mut builder = snapshot.scan_builder();
if let Some(pred) = predicate {
builder = builder.with_predicate(pred);
}
let scan = builder.build().unwrap();
let results: Vec<_> = scan
.scan_metadata(engine.as_ref())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
(reporter, results.len())
}
fn get_scan_event(reporter: &CapturingReporter) -> MetricEvent {
reporter
.events()
.into_iter()
.find(|e| matches!(e, MetricEvent::ScanMetadataCompleted { .. }))
.expect("expected ScanMetadataCompleted event")
}
#[rstest]
#[case::basic_scan("./tests/data/parsed-stats/", None, 6, 6, 0, 0)]
#[case::static_skip_all(
"./tests/data/parsed-stats/",
Some(Arc::new(Pred::literal(false))),
0,
0,
0,
0
)]
#[case::with_removes("./tests/data/table-with-cdf/", None, 1, 0, 2, 0)]
#[case::with_removes("./tests/data/with_checkpoint_no_last_checkpoint/", None, 2, 1, 1, 0)]
#[case::partition_filter(
"./tests/data/basic_partitioned/",
Some(Arc::new(Expr::eq(column_expr!("letter"), Expr::literal("a")))),
2, 2, 0, 4
)]
fn test_scan_metrics(
#[case] table: &str,
#[case] predicate: Option<Arc<Pred>>,
#[case] expected_add_seen: u64,
#[case] expected_active: u64,
#[case] expected_removes: u64,
#[case] expected_filtered: u64,
) {
let (reporter, _) = run_scan(table, predicate);
let MetricEvent::ScanMetadataCompleted {
total_duration,
num_add_files_seen,
num_active_add_files,
num_remove_files_seen,
num_predicate_filtered,
..
} = get_scan_event(&reporter)
else {
panic!("expected ScanMetadataCompleted");
};
assert!(total_duration > Duration::ZERO);
assert_eq!(num_add_files_seen, expected_add_seen);
assert_eq!(num_active_add_files, expected_active);
assert_eq!(num_remove_files_seen, expected_removes);
assert_eq!(num_predicate_filtered, expected_filtered);
}
#[test]
fn test_no_metrics_on_early_drop() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/")).unwrap();
let url = url::Url::from_directory_path(&path).unwrap();
let reporter = Arc::new(CapturingReporter::default());
let engine = Arc::new(
DefaultEngineBuilder::new(Arc::new(LocalFileSystem::new()))
.with_metrics_reporter(reporter.clone())
.build(),
);
let snapshot = Snapshot::builder_for(url).build(engine.as_ref()).unwrap();
let scan = snapshot.scan_builder().build().unwrap();
{
let mut iter = scan.scan_metadata(engine.as_ref()).unwrap();
let _ = iter.next();
}
assert!(reporter
.events()
.iter()
.all(|e| !matches!(e, MetricEvent::ScanMetadataCompleted { .. })));
}
}