use std::collections::HashSet;
use std::sync::Arc;
use either::Either;
use lazy_static::lazy_static;
use tracing::debug;
use super::data_skipping::DataSkippingFilter;
use super::ScanData;
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::actions::{visitors::AddVisitor, visitors::RemoveVisitor, Add, Remove};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::Expression;
use crate::schema::{DataType, MapType, SchemaRef, StructField, StructType};
use crate::{DataVisitor, DeltaResult, Engine, EngineData, ExpressionHandler};
struct LogReplayScanner {
filter: Option<DataSkippingFilter>,
seen: HashSet<(String, Option<String>)>,
}
#[derive(Default)]
struct AddRemoveVisitor {
adds: Vec<(Add, usize)>,
removes: Vec<Remove>,
selection_vector: Option<Vec<bool>>,
is_log_batch: bool,
}
const ADD_FIELD_COUNT: usize = 15;
impl AddRemoveVisitor {
fn new(selection_vector: Option<Vec<bool>>, is_log_batch: bool) -> Self {
AddRemoveVisitor {
selection_vector,
is_log_batch,
..Default::default()
}
}
}
impl DataVisitor for AddRemoveVisitor {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
if let Some(path) = getters[0].get_opt(i, "add.path")? {
if !self
.selection_vector
.as_ref()
.is_some_and(|selection| !selection[i])
{
self.adds.push((
AddVisitor::visit_add(i, path, &getters[..ADD_FIELD_COUNT])?,
i,
))
}
}
else if self.is_log_batch {
if let Some(path) = getters[ADD_FIELD_COUNT].get_opt(i, "remove.path")? {
let remove_getters = &getters[ADD_FIELD_COUNT..];
self.removes
.push(RemoveVisitor::visit_remove(i, path, remove_getters)?);
}
}
}
Ok(())
}
}
lazy_static! {
pub(crate) static ref SCAN_ROW_SCHEMA: Arc<StructType> = Arc::new(StructType::new(vec!(
StructField::new("path", DataType::STRING, false),
StructField::new("size", DataType::LONG, true),
StructField::new("modificationTime", DataType::LONG, true),
StructField::new("stats", DataType::STRING, true),
StructField::new(
"deletionVector",
StructType::new(vec![
StructField::new("storageType", DataType::STRING, false),
StructField::new("pathOrInlineDv", DataType::STRING, false),
StructField::new("offset", DataType::INTEGER, true),
StructField::new("sizeInBytes", DataType::INTEGER, false),
StructField::new("cardinality", DataType::LONG, false),
]),
true
),
StructField::new(
"fileConstantValues",
StructType::new(vec![StructField::new(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
)]),
true
),
)));
static ref SCAN_ROW_DATATYPE: DataType = SCAN_ROW_SCHEMA.as_ref().clone().into();
}
impl LogReplayScanner {
fn new(engine: &dyn Engine, table_schema: &SchemaRef, predicate: &Option<Expression>) -> Self {
Self {
filter: DataSkippingFilter::new(engine, table_schema, predicate),
seen: Default::default(),
}
}
fn process_batch(
&mut self,
actions: &dyn EngineData,
is_log_batch: bool,
) -> DeltaResult<Vec<Add>> {
let selection_vector = self
.filter
.as_ref()
.map(|filter| filter.apply(actions))
.transpose()?;
let adds = self.setup_batch_process(selection_vector, actions, is_log_batch)?;
adds.into_iter()
.filter_map(|(add, _)| {
if !self.seen.contains(&(add.path.clone(), add.dv_unique_id())) {
debug!("Found file: {}, is log {}", &add.path, is_log_batch);
if is_log_batch {
self.seen.insert((add.path.clone(), add.dv_unique_id()));
}
Some(Ok(add))
} else {
None
}
})
.collect()
}
fn get_add_transform_expr(&self) -> Expression {
Expression::Struct(vec![
Expression::column("add.path"),
Expression::column("add.size"),
Expression::column("add.modificationTime"),
Expression::column("add.stats"),
Expression::column("add.deletionVector"),
Expression::Struct(vec![Expression::column("add.partitionValues")]),
])
}
fn process_scan_batch(
&mut self,
expression_handler: &dyn ExpressionHandler,
actions: &dyn EngineData,
is_log_batch: bool,
) -> DeltaResult<ScanData> {
let filter_vector = self
.filter
.as_ref()
.map(|filter| filter.apply(actions))
.transpose()?;
let mut selection_vector = match filter_vector {
Some(ref filter_vector) => filter_vector.clone(),
None => vec![false; actions.length()],
};
let adds = self.setup_batch_process(filter_vector, actions, is_log_batch)?;
for (add, index) in adds.into_iter() {
if !self.seen.contains(&(add.path.clone(), add.dv_unique_id())) {
debug!(
"Including file in scan: ({}, {:?}), is log {is_log_batch}",
add.path,
add.dv_unique_id(),
);
if is_log_batch {
self.seen.insert((add.path.clone(), add.dv_unique_id()));
}
selection_vector[index] = true;
} else {
debug!(
"Filtering out Add due to it being removed {}, is log {is_log_batch}",
add.path
);
selection_vector[index] = false;
}
}
let result = expression_handler
.get_evaluator(
get_log_schema().project(&[ADD_NAME])?,
self.get_add_transform_expr(),
SCAN_ROW_DATATYPE.clone(),
)
.evaluate(actions)?;
Ok((result, selection_vector))
}
fn setup_batch_process(
&mut self,
selection_vector: Option<Vec<bool>>,
actions: &dyn EngineData,
is_log_batch: bool,
) -> DeltaResult<Vec<(Add, usize)>> {
let schema_to_use = if is_log_batch {
get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?
} else {
get_log_schema().project(&[ADD_NAME])?
};
let mut visitor = AddRemoveVisitor::new(selection_vector, is_log_batch);
actions.extract(schema_to_use, &mut visitor)?;
for remove in visitor.removes.into_iter() {
let dv_id = remove.dv_unique_id();
self.seen.insert((remove.path, dv_id));
}
Ok(visitor.adds)
}
}
pub fn log_replay_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send,
table_schema: &SchemaRef,
predicate: &Option<Expression>,
) -> impl Iterator<Item = DeltaResult<Add>> {
let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate);
action_iter.flat_map(move |actions| match actions {
Ok((batch, is_log_batch)) => {
match log_scanner.process_batch(batch.as_ref(), is_log_batch) {
Ok(adds) => Either::Left(adds.into_iter().map(Ok)),
Err(err) => Either::Right(std::iter::once(Err(err))),
}
}
Err(err) => Either::Right(std::iter::once(Err(err))),
})
}
pub fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
table_schema: &SchemaRef,
predicate: &Option<Expression>,
) -> impl Iterator<Item = DeltaResult<ScanData>> {
let mut log_scanner = LogReplayScanner::new(engine, table_schema, predicate);
let expression_handler = engine.get_expression_handler();
action_iter
.map(move |action_res| {
action_res.and_then(|(batch, is_log_batch)| {
log_scanner.process_scan_batch(
expression_handler.as_ref(),
batch.as_ref(),
is_log_batch,
)
})
})
.filter(|action_res| {
match action_res {
Ok((_, sel_vec)) => {
sel_vec.contains(&true)
}
Err(_) => true, }
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::scan::{
state::{DvInfo, Stats},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
};
fn validate_simple(
_: &mut (),
path: &str,
size: i64,
stats: Option<Stats>,
_: DvInfo,
part_vals: HashMap<String, String>,
) {
assert_eq!(
path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
}
#[test]
fn test_scan_action_iter() {
run_with_validate_callback(
vec![add_batch_simple()],
&[true, false],
(),
validate_simple,
);
}
#[test]
fn test_scan_action_iter_with_remove() {
run_with_validate_callback(
vec![add_batch_with_remove()],
&[false, false, true, false],
(),
validate_simple,
);
}
}