use std::collections::{HashMap, HashSet};
use std::slice;
use std::sync::{Arc, LazyLock};
use itertools::Itertools;
use tracing::info;
use crate::actions::visitors::{visit_deletion_vector_at, InCommitTimestampVisitor};
use crate::actions::{
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, COMMIT_INFO_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_expr, column_expr_ref, column_name, ColumnName, Expression};
use crate::path::{AsUrl, ParsedLogPath};
use crate::scan::data_skipping::stats_schema::build_stats_schema;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::state::DvInfo;
use crate::schema::{
ColumnNamesAndTypes, DataType, SchemaRef, StructField, StructType, ToSchema as _,
};
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_configuration::TableConfiguration;
use crate::table_features::{format_features, Operation, TableFeature};
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, PredicateRef, RowVisitor};
#[cfg(test)]
mod tests;
pub(crate) struct TableChangesScanMetadata {
pub(crate) scan_metadata: Box<dyn EngineData>,
pub(crate) selection_vector: Vec<bool>,
pub(crate) remove_dvs: Arc<HashMap<String, DvInfo>>,
}
pub(crate) fn table_changes_action_iter(
engine: Arc<dyn Engine>,
start_table_configuration: &TableConfiguration,
commit_files: impl IntoIterator<Item = ParsedLogPath>,
table_schema: SchemaRef,
physical_predicate: Option<(PredicateRef, SchemaRef)>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanMetadata>>> {
let filter = physical_predicate
.and_then(|(predicate, ref_schema)| {
let stats_schema = build_stats_schema(&ref_schema)?;
let stats_expr = Arc::new(Expression::parse_json(
column_expr!("add.stats"),
stats_schema.clone(),
));
DataSkippingFilter::new(
engine.as_ref(),
Some(predicate),
Some(&stats_schema),
stats_expr,
None, column_expr_ref!("partitionValues_parsed"),
get_log_add_schema().clone(),
None, )
})
.map(Arc::new);
let mut current_configuration = start_table_configuration.clone();
let result = commit_files
.into_iter()
.map(move |commit_file| -> DeltaResult<_> {
let scanner = LogReplayScanner::try_new(
engine.as_ref(),
&mut current_configuration,
commit_file,
&table_schema,
)?;
scanner.into_scan_batches(engine.clone(), filter.clone())
}) .flatten_ok() .map(|x| x?); Ok(result)
}
struct LogReplayScanner {
has_cdc_action: bool,
remove_dvs: HashMap<String, DvInfo>,
commit_file: ParsedLogPath,
timestamp: i64,
}
impl LogReplayScanner {
fn try_new(
engine: &dyn Engine,
table_configuration: &mut TableConfiguration,
commit_file: ParsedLogPath,
table_schema: &SchemaRef,
) -> DeltaResult<Self> {
let visitor_schema = PreparePhaseVisitor::schema();
let mut action_iter = engine
.json_handler()
.read_json_files(
slice::from_ref(&commit_file.location),
visitor_schema,
None, )?
.peekable();
let mut in_commit_timestamp_opt = None;
if let Some(Ok(actions)) = action_iter.peek() {
let mut visitor = InCommitTimestampVisitor::default();
visitor.visit_rows_of(actions.as_ref())?;
in_commit_timestamp_opt = visitor.in_commit_timestamp;
}
let mut remove_dvs = HashMap::default();
let mut add_paths = HashSet::default();
let mut has_cdc_action = false;
for actions in action_iter {
let actions = actions?;
let mut visitor = PreparePhaseVisitor {
add_paths: &mut add_paths,
remove_dvs: &mut remove_dvs,
has_cdc_action: &mut has_cdc_action,
};
visitor.visit_rows_of(actions.as_ref())?;
let metadata_opt = Metadata::try_new_from_data(actions.as_ref())?;
let has_metadata_update = metadata_opt.is_some();
let protocol_opt = Protocol::try_new_from_data(actions.as_ref())?;
let has_protocol_update = protocol_opt.is_some();
if let Some(ref metadata) = metadata_opt {
let schema = metadata.parse_schema()?;
require!(
table_schema.as_ref() == &schema,
Error::change_data_feed_incompatible_schema(table_schema, &schema)
);
}
if has_metadata_update || has_protocol_update {
*table_configuration = TableConfiguration::try_new_from(
table_configuration,
metadata_opt,
protocol_opt,
commit_file.version,
)?;
let writer_features_str = table_configuration
.protocol()
.writer_features()
.map(format_features)
.unwrap_or_else(|| "[]".to_string());
info!(
version = commit_file.version,
id = table_configuration.metadata().id(),
writerFeatures = %writer_features_str,
minReaderVersion = table_configuration.protocol().min_reader_version(),
minWriterVersion = table_configuration.protocol().min_writer_version(),
schemaString = %table_configuration.metadata().schema_string(),
configuration = ?table_configuration.metadata().configuration(),
"Table configuration updated during CDF query"
);
}
if has_metadata_update {
require!(
table_configuration.is_feature_enabled(&TableFeature::ChangeDataFeed),
Error::change_data_feed_unsupported(commit_file.version)
);
}
if has_protocol_update {
table_configuration
.ensure_operation_supported(Operation::Cdf)
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
}
}
if has_cdc_action {
remove_dvs.clear();
} else {
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
}
let timestamp = if table_configuration.is_feature_enabled(&TableFeature::InCommitTimestamp)
{
let Some(in_commit_timestamp) = in_commit_timestamp_opt else {
return Err(Error::generic(format!(
"In-commit timestamp is enabled but not found in commit at version {}",
commit_file.version
)));
};
in_commit_timestamp
} else {
commit_file.location.last_modified
};
info!(
version = commit_file.version,
id = table_configuration.metadata().id(),
remove_dvs_size = remove_dvs.len(),
has_cdc_action = has_cdc_action,
file_path = %commit_file.location.as_url(),
timestamp = timestamp,
"Phase 1 of CDF query processing completed"
);
Ok(LogReplayScanner {
timestamp,
commit_file,
has_cdc_action,
remove_dvs,
})
}
fn into_scan_batches(
self,
engine: Arc<dyn Engine>,
filter: Option<Arc<DataSkippingFilter>>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanMetadata>>> {
let Self {
has_cdc_action,
remove_dvs,
commit_file,
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);
let schema = FileActionSelectionVisitor::schema();
let action_iter = engine.json_handler().read_json_files(
slice::from_ref(&commit_file.location),
schema,
None,
)?;
let commit_version = commit_file
.version
.try_into()
.map_err(|_| Error::generic("Failed to convert commit version to i64"))?;
let evaluator = engine.evaluation_handler().new_expression_evaluator(
get_log_add_schema().clone(),
Arc::new(cdf_scan_row_expression(timestamp, commit_version)),
cdf_scan_row_schema().into(),
)?;
let result = action_iter.map(move |actions| -> DeltaResult<_> {
let actions = actions?;
let selection_vector = match &filter {
Some(filter) => filter.apply(actions.as_ref())?,
None => vec![true; actions.len()],
};
let mut visitor =
FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action);
visitor.visit_rows_of(actions.as_ref())?;
let scan_metadata = evaluator.evaluate(actions.as_ref())?;
Ok(TableChangesScanMetadata {
scan_metadata,
selection_vector: visitor.selection_vector,
remove_dvs: remove_dvs.clone(),
})
});
Ok(result)
}
}
struct PreparePhaseVisitor<'a> {
has_cdc_action: &'a mut bool,
add_paths: &'a mut HashSet<String>,
remove_dvs: &'a mut HashMap<String, DvInfo>,
}
impl PreparePhaseVisitor<'_> {
fn schema() -> Arc<StructType> {
Arc::new(StructType::new_unchecked(vec![
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(CDC_NAME, Cdc::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
StructField::nullable(
COMMIT_INFO_NAME,
StructType::new_unchecked([StructField::new(
"inCommitTimestamp",
DataType::LONG,
true,
)]),
),
]))
}
}
impl RowVisitor for PreparePhaseVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
const LONG: DataType = DataType::LONG;
const BOOLEAN: DataType = DataType::BOOLEAN;
let types_and_names = vec![
(STRING, column_name!("add.path")),
(BOOLEAN, column_name!("add.dataChange")),
(STRING, column_name!("remove.path")),
(BOOLEAN, column_name!("remove.dataChange")),
(STRING, column_name!("remove.deletionVector.storageType")),
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("remove.deletionVector.offset")),
(INTEGER, column_name!("remove.deletionVector.sizeInBytes")),
(LONG, column_name!("remove.deletionVector.cardinality")),
(STRING, column_name!("cdc.path")),
(LONG, column_name!("commitInfo.inCommitTimestamp")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
});
NAMES_AND_TYPES.as_ref()
}
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 11,
Error::InternalError(format!(
"Wrong number of PreparePhaseVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if let Some(path) = getters[0].get_str(i, "add.path")? {
if !*self.has_cdc_action && getters[1].get(i, "add.dataChange")? {
self.add_paths.insert(path.to_string());
}
} else if let Some(path) = getters[2].get_str(i, "remove.path")? {
if !*self.has_cdc_action && getters[3].get(i, "remove.dataChange")? {
let deletion_vector = visit_deletion_vector_at(i, &getters[4..=8])?;
self.remove_dvs
.insert(path.to_string(), DvInfo { deletion_vector });
}
} else if getters[9].get_str(i, "cdc.path")?.is_some() {
*self.has_cdc_action = true;
}
}
Ok(())
}
}
struct FileActionSelectionVisitor<'a> {
selection_vector: Vec<bool>,
has_cdc_action: bool,
remove_dvs: &'a HashMap<String, DvInfo>,
}
impl<'a> FileActionSelectionVisitor<'a> {
fn new(
remove_dvs: &'a HashMap<String, DvInfo>,
selection_vector: Vec<bool>,
has_cdc_action: bool,
) -> Self {
FileActionSelectionVisitor {
selection_vector,
has_cdc_action,
remove_dvs,
}
}
fn schema() -> Arc<StructType> {
Arc::new(StructType::new_unchecked(vec![
StructField::nullable(CDC_NAME, Cdc::to_schema()),
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
]))
}
}
impl RowVisitor for FileActionSelectionVisitor<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const BOOLEAN: DataType = DataType::BOOLEAN;
let types_and_names = vec![
(STRING, column_name!("cdc.path")),
(STRING, column_name!("add.path")),
(BOOLEAN, column_name!("add.dataChange")),
(STRING, column_name!("remove.path")),
(BOOLEAN, column_name!("remove.dataChange")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
});
NAMES_AND_TYPES.as_ref()
}
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 5,
Error::InternalError(format!(
"Wrong number of FileActionSelectionVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
if !self.selection_vector[i] {
continue;
}
if self.has_cdc_action {
self.selection_vector[i] = getters[0].get_str(i, "cdc.path")?.is_some()
} else if getters[1].get_str(i, "add.path")?.is_some() {
self.selection_vector[i] = getters[2].get(i, "add.dataChange")?;
} else if let Some(path) = getters[3].get_str(i, "remove.path")? {
let data_change: bool = getters[4].get(i, "remove.dataChange")?;
self.selection_vector[i] = data_change && !self.remove_dvs.contains_key(path)
} else {
self.selection_vector[i] = false
}
}
Ok(())
}
}