use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};
use crate::actions::schemas::GetStructField;
use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor};
use crate::actions::{
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME,
PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_name, ColumnName};
use crate::path::ParsedLogPath;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::state::DvInfo;
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor};
use itertools::Itertools;
#[cfg(test)]
mod tests;
pub(crate) struct TableChangesScanData {
pub(crate) scan_data: Box<dyn EngineData>,
pub(crate) selection_vector: Vec<bool>,
pub(crate) remove_dvs: Arc<HashMap<String, DvInfo>>,
}
pub(crate) fn table_changes_action_iter(
engine: Arc<dyn Engine>,
commit_files: impl IntoIterator<Item = ParsedLogPath>,
table_schema: SchemaRef,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> {
let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new);
let result = commit_files
.into_iter()
.map(move |commit_file| -> DeltaResult<_> {
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit_file, &table_schema)?;
scanner.into_scan_batches(engine.clone(), filter.clone())
}) .flatten_ok() .map(|x| x?); Ok(result)
}
struct LogReplayScanner {
has_cdc_action: bool,
remove_dvs: HashMap<String, DvInfo>,
commit_file: ParsedLogPath,
timestamp: i64,
}
impl LogReplayScanner {
fn try_new(
engine: &dyn Engine,
commit_file: ParsedLogPath,
table_schema: &SchemaRef,
) -> DeltaResult<Self> {
let visitor_schema = PreparePhaseVisitor::schema();
let action_iter = engine.get_json_handler().read_json_files(
&[commit_file.location.clone()],
visitor_schema,
None, )?;
let mut remove_dvs = HashMap::default();
let mut add_paths = HashSet::default();
let mut has_cdc_action = false;
for actions in action_iter {
let actions = actions?;
let mut visitor = PreparePhaseVisitor {
add_paths: &mut add_paths,
remove_dvs: &mut remove_dvs,
has_cdc_action: &mut has_cdc_action,
protocol: None,
metadata_info: None,
};
visitor.visit_rows_of(actions.as_ref())?;
if let Some(protocol) = visitor.protocol {
ensure_cdf_read_supported(&protocol)
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
}
if let Some((schema, configuration)) = visitor.metadata_info {
let schema: StructType = serde_json::from_str(&schema)?;
require!(
table_schema.as_ref() == &schema,
Error::change_data_feed_incompatible_schema(table_schema, &schema)
);
let table_properties = TableProperties::from(configuration);
check_cdf_table_properties(&table_properties)
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
}
}
if has_cdc_action {
remove_dvs.clear();
} else {
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
}
Ok(LogReplayScanner {
timestamp: commit_file.location.last_modified,
commit_file,
has_cdc_action,
remove_dvs,
})
}
fn into_scan_batches(
self,
engine: Arc<dyn Engine>,
filter: Option<Arc<DataSkippingFilter>>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> {
let Self {
has_cdc_action,
remove_dvs,
commit_file,
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);
let schema = FileActionSelectionVisitor::schema();
let action_iter = engine.get_json_handler().read_json_files(
&[commit_file.location.clone()],
schema,
None,
)?;
let commit_version = commit_file
.version
.try_into()
.map_err(|_| Error::generic("Failed to convert commit version to i64"))?;
let evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
cdf_scan_row_expression(timestamp, commit_version),
cdf_scan_row_schema().into(),
);
let result = action_iter.map(move |actions| -> DeltaResult<_> {
let actions = actions?;
let selection_vector = match &filter {
Some(filter) => filter.apply(actions.as_ref())?,
None => vec![true; actions.len()],
};
let mut visitor =
FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action);
visitor.visit_rows_of(actions.as_ref())?;
let scan_data = evaluator.evaluate(actions.as_ref())?;
Ok(TableChangesScanData {
scan_data,
selection_vector: visitor.selection_vector,
remove_dvs: remove_dvs.clone(),
})
});
Ok(result)
}
}
struct PreparePhaseVisitor<'a> {
protocol: Option<Protocol>,
metadata_info: Option<(String, HashMap<String, String>)>,
has_cdc_action: &'a mut bool,
add_paths: &'a mut HashSet<String>,
remove_dvs: &'a mut HashMap<String, DvInfo>,
}
impl PreparePhaseVisitor<'_> {
fn schema() -> Arc<StructType> {
Arc::new(StructType::new(vec![
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
]))
}
}
impl RowVisitor for PreparePhaseVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
const LONG: DataType = DataType::LONG;
const BOOLEAN: DataType = DataType::BOOLEAN;
let string_list: DataType = ArrayType::new(STRING, false).into();
let string_string_map = MapType::new(STRING, STRING, false).into();
let types_and_names = vec![
(STRING, column_name!("add.path")),
(BOOLEAN, column_name!("add.dataChange")),
(STRING, column_name!("remove.path")),
(BOOLEAN, column_name!("remove.dataChange")),
(STRING, column_name!("remove.deletionVector.storageType")),
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("remove.deletionVector.offset")),
(INTEGER, column_name!("remove.deletionVector.sizeInBytes")),
(LONG, column_name!("remove.deletionVector.cardinality")),
(STRING, column_name!("cdc.path")),
(STRING, column_name!("metaData.schemaString")),
(string_string_map, column_name!("metaData.configuration")),
(INTEGER, column_name!("protocol.minReaderVersion")),
(INTEGER, column_name!("protocol.minWriterVersion")),
(string_list.clone(), column_name!("protocol.readerFeatures")),
(string_list, column_name!("protocol.writerFeatures")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
});
NAMES_AND_TYPES.as_ref()
}
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 16,
Error::InternalError(format!(
"Wrong number of PreparePhaseVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if let Some(path) = getters[0].get_str(i, "add.path")? {
if !*self.has_cdc_action && getters[1].get(i, "add.dataChange")? {
self.add_paths.insert(path.to_string());
}
} else if let Some(path) = getters[2].get_str(i, "remove.path")? {
if !*self.has_cdc_action && getters[3].get(i, "remove.dataChange")? {
let deletion_vector = visit_deletion_vector_at(i, &getters[4..=8])?;
self.remove_dvs
.insert(path.to_string(), DvInfo { deletion_vector });
}
} else if getters[9].get_str(i, "cdc.path")?.is_some() {
*self.has_cdc_action = true;
} else if let Some(schema) = getters[10].get_str(i, "metaData.schemaString")? {
let configuration_map_opt = getters[11].get_opt(i, "metadata.configuration")?;
let configuration = configuration_map_opt.unwrap_or_else(HashMap::new);
self.metadata_info = Some((schema.to_string(), configuration));
} else if let Some(min_reader_version) =
getters[12].get_int(i, "protocol.min_reader_version")?
{
let protocol =
ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?;
self.protocol = Some(protocol);
}
}
Ok(())
}
}
struct FileActionSelectionVisitor<'a> {
selection_vector: Vec<bool>,
has_cdc_action: bool,
remove_dvs: &'a HashMap<String, DvInfo>,
}
impl<'a> FileActionSelectionVisitor<'a> {
fn new(
remove_dvs: &'a HashMap<String, DvInfo>,
selection_vector: Vec<bool>,
has_cdc_action: bool,
) -> Self {
FileActionSelectionVisitor {
selection_vector,
has_cdc_action,
remove_dvs,
}
}
fn schema() -> Arc<StructType> {
Arc::new(StructType::new(vec![
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
]))
}
}
impl RowVisitor for FileActionSelectionVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const BOOLEAN: DataType = DataType::BOOLEAN;
let types_and_names = vec![
(STRING, column_name!("cdc.path")),
(STRING, column_name!("add.path")),
(BOOLEAN, column_name!("add.dataChange")),
(STRING, column_name!("remove.path")),
(BOOLEAN, column_name!("remove.dataChange")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
});
NAMES_AND_TYPES.as_ref()
}
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 5,
Error::InternalError(format!(
"Wrong number of FileActionSelectionVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if !self.selection_vector[i] {
continue;
}
if self.has_cdc_action {
self.selection_vector[i] = getters[0].get_str(i, "cdc.path")?.is_some()
} else if getters[1].get_str(i, "add.path")?.is_some() {
self.selection_vector[i] = getters[2].get(i, "add.dataChange")?;
} else if let Some(path) = getters[3].get_str(i, "remove.path")? {
let data_change: bool = getters[4].get(i, "remove.dataChange")?;
self.selection_vector[i] = data_change && !self.remove_dvs.contains_key(path)
} else {
self.selection_vector[i] = false
}
}
Ok(())
}
}