use std::collections::HashMap;
use std::sync::LazyLock;
use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::scan::get_transform_for_row;
use crate::schema::Schema;
use crate::utils::require;
use crate::ExpressionRef;
use crate::{
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at},
engine_data::{FilteredRowVisitor, GetData, RowIndexIterator, TypedGetData},
schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef},
DeltaResult, Engine, EngineData, Error,
};
use roaring::RoaringTreemap;
use serde::Deserialize;
use tracing::warn;
use super::log_replay::SCAN_ROW_SCHEMA;
use super::ScanMetadata;
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct DvInfo {
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
}
impl From<DeletionVectorDescriptor> for DvInfo {
fn from(deletion_vector: DeletionVectorDescriptor) -> Self {
let deletion_vector = Some(deletion_vector);
DvInfo { deletion_vector }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
pub num_records: u64,
}
impl DvInfo {
pub fn has_vector(&self) -> bool {
self.deletion_vector.is_some()
}
pub(crate) fn get_treemap(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<RoaringTreemap>> {
self.deletion_vector
.as_ref()
.map(|dv_descriptor| {
let storage = engine.storage_handler();
dv_descriptor.read(storage, table_root)
})
.transpose()
}
pub fn get_selection_vector(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<bool>>> {
let dv_treemap = self.get_treemap(engine, table_root)?;
Ok(dv_treemap.map(deletion_treemap_to_bools))
}
pub fn get_row_indexes(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<u64>>> {
self.deletion_vector
.as_ref()
.map(|dv| {
let storage = engine.storage_handler();
dv.row_indexes(storage, table_root)
})
.transpose()
}
}
pub fn transform_to_logical(
engine: &dyn Engine,
physical_data: Box<dyn EngineData>,
physical_schema: &SchemaRef,
logical_schema: &Schema,
transform: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn EngineData>> {
match transform {
Some(transform) => engine
.evaluation_handler()
.new_expression_evaluator(
physical_schema.clone(),
transform,
logical_schema.clone().into(), )?
.evaluate(physical_data.as_ref()),
None => Ok(physical_data),
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ScanFile {
pub path: String,
pub size: i64,
pub modification_time: i64,
pub stats: Option<Stats>,
pub dv_info: DvInfo,
pub transform: Option<ExpressionRef>,
pub partition_values: HashMap<String, String>,
}
pub type ScanCallback<T> = fn(context: &mut T, scan_file: ScanFile);
impl ScanMetadata {
pub fn visit_scan_files<T>(&self, context: T, callback: ScanCallback<T>) -> DeltaResult<T> {
let mut visitor = ScanFileVisitor {
callback,
transforms: &self.scan_file_transforms,
context,
};
visitor.visit_rows_of(&self.scan_files)?;
Ok(visitor.context)
}
}
struct ScanFileVisitor<'a, T> {
callback: ScanCallback<T>,
transforms: &'a [Option<ExpressionRef>],
context: T,
}
impl<T> FilteredRowVisitor for ScanFileVisitor<'_, T> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| SCAN_ROW_SCHEMA.leaves(None));
NAMES_AND_TYPES.as_ref()
}
fn visit_filtered<'a>(
&mut self,
getters: &[&'a dyn GetData<'a>],
rows: RowIndexIterator<'_>,
) -> DeltaResult<()> {
require!(
getters.len() == 14,
Error::InternalError(format!(
"Wrong number of ScanFileVisitor getters: {}",
getters.len()
))
);
for row_index in rows {
if let Some(path) = getters[0].get_opt(row_index, "scanFile.path")? {
let size = getters[1].get(row_index, "scanFile.size")?;
let modification_time: i64 = getters[2].get(row_index, "add.modificationTime")?;
let stats: Option<String> = getters[3].get_opt(row_index, "scanFile.stats")?;
let stats: Option<Stats> =
stats.and_then(|json| match serde_json::from_str(json.as_str()) {
Ok(stats) => Some(stats),
Err(e) => {
warn!("Invalid stats string in Add file {json}: {}", e);
None
}
});
let dv_index = SCAN_ROW_SCHEMA
.index_of("deletionVector")
.ok_or_else(|| Error::missing_column("deletionVector"))?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[dv_index..])?;
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
let scan_file = ScanFile {
path,
size,
modification_time,
stats,
dv_info,
transform: get_transform_for_row(row_index, self.transforms),
partition_values,
};
(self.callback)(&mut self.context, scan_file)
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::actions::get_commit_schema;
use crate::scan::state::ScanFile;
use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};
#[derive(Clone)]
struct TestContext {
id: usize,
}
fn validate_visit(context: &mut TestContext, scan_file: ScanFile) {
assert_eq!(
scan_file.path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(scan_file.size, 635);
assert_eq!(scan_file.modification_time, 1677811178336);
assert!(scan_file.stats.is_some());
assert_eq!(scan_file.stats.as_ref().unwrap().num_records, 10);
assert_eq!(
scan_file.partition_values.get("date"),
Some(&"2017-12-10".to_string())
);
assert_eq!(scan_file.partition_values.get("non-existent"), None);
assert!(scan_file.dv_info.deletion_vector.is_some());
let dv = scan_file.dv_info.deletion_vector.unwrap();
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1");
assert!(scan_file.transform.is_none());
assert_eq!(context.id, 2);
}
#[test]
fn test_simple_visit_scan_metadata() {
let context = TestContext { id: 2 };
run_with_validate_callback(
vec![add_batch_simple(get_commit_schema().clone())],
None, None, &[true, false],
context,
validate_visit,
);
}
}