use std::collections::HashSet;
use std::sync::{Arc, LazyLock};
use crate::actions::{Add, Remove, ADD_NAME, REMOVE_NAME};
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor};
use crate::expressions::{column_name, ColumnName};
use crate::log_replay::deduplicator::Deduplicator;
use crate::log_replay::{FileActionDeduplicator, FileActionKey};
use crate::schema::{ColumnNamesAndTypes, DataType, SchemaRef, StructField, StructType, ToSchema};
use crate::snapshot::SnapshotRef;
use crate::table_features::Operation;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, FileDataReadResultIterator, FileMeta, Version,
};
static INCREMENTAL_READ_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
]))
});
#[derive(Debug)]
pub struct IncrementalScanBuilder {
target_snapshot: SnapshotRef,
base_version: Version,
}
impl IncrementalScanBuilder {
pub(crate) fn new(target_snapshot: impl Into<SnapshotRef>, base_version: Version) -> Self {
Self {
target_snapshot: target_snapshot.into(),
base_version,
}
}
pub fn build(self, engine: &dyn Engine) -> DeltaResult<Option<IncrementalScanStream>> {
let target_version = self.target_snapshot.version();
require!(
self.base_version < target_version,
Error::generic(format!(
"IncrementalScanBuilder: base_version ({}) must be less than target_version ({})",
self.base_version, target_version
))
);
let start_version = self.base_version.checked_add(1).ok_or_else(|| {
Error::generic("IncrementalScanBuilder: base_version + 1 overflowed u64")
})?;
self.target_snapshot
.table_configuration()
.ensure_operation_supported(Operation::Scan)?;
let snapshot_commits = &self
.target_snapshot
.log_segment()
.listed
.ascending_commit_files;
let snapshot_first_version = snapshot_commits.first().map(|c| c.version);
if snapshot_first_version.is_none_or(|v| v > start_version) {
return Ok(None);
}
let commit_locations: Vec<FileMeta> = snapshot_commits
.iter()
.filter(|c| c.version >= start_version && c.version <= target_version)
.rev()
.map(|c| c.location.clone())
.collect();
let actions = engine.json_handler().read_json_files(
&commit_locations,
INCREMENTAL_READ_SCHEMA.clone(),
None,
)?;
Ok(Some(IncrementalScanStream {
base_version: self.base_version,
target_version,
actions,
seen_file_keys: HashSet::new(),
live_adds: HashSet::new(),
removes: HashSet::new(),
errored: false,
}))
}
}
pub struct IncrementalScanStream {
base_version: Version,
target_version: Version,
actions: FileDataReadResultIterator,
seen_file_keys: HashSet<FileActionKey>,
live_adds: HashSet<FileActionKey>,
removes: HashSet<FileActionKey>,
errored: bool,
}
impl Iterator for IncrementalScanStream {
type Item = DeltaResult<FilteredEngineData>;
fn next(&mut self) -> Option<Self::Item> {
if self.errored {
return None;
}
loop {
match self.actions.next()? {
Err(e) => {
self.errored = true;
return Some(Err(e));
}
Ok(batch) => match process_batch(
batch,
&mut self.seen_file_keys,
&mut self.live_adds,
&mut self.removes,
) {
Ok(Some(filtered)) => return Some(Ok(filtered)),
Ok(None) => continue,
Err(e) => {
self.errored = true;
return Some(Err(e));
}
},
}
}
}
}
impl IncrementalScanStream {
pub fn into_summary(mut self) -> DeltaResult<IncrementalScanSummary> {
if self.errored {
return Err(Error::generic(
"IncrementalScanStream: cannot finish a stream that previously errored",
));
}
for item in self.by_ref() {
item?;
}
Ok(IncrementalScanSummary {
base_version: self.base_version,
target_version: self.target_version,
live_adds: self.live_adds,
removes: self.removes,
})
}
pub fn into_listing(mut self) -> DeltaResult<IncrementalListing> {
let mut add_files: Vec<FilteredEngineData> = Vec::new();
for item in self.by_ref() {
add_files.push(item?);
}
let summary = self.into_summary()?;
Ok(IncrementalListing { summary, add_files })
}
pub fn into_summary_against_base_iter<'a>(
self,
base_keys: impl IntoIterator<Item = &'a FileActionKey>,
) -> DeltaResult<IncrementalScanSummaryAgainstBase> {
let summary = self.into_summary()?;
let duplicate_adds: HashSet<FileActionKey> = base_keys
.into_iter()
.filter(|k| summary.live_adds.contains(*k))
.cloned()
.collect();
Ok(IncrementalScanSummaryAgainstBase {
base_version: summary.base_version,
target_version: summary.target_version,
duplicate_adds,
removes: summary.removes,
})
}
pub fn into_summary_against_base_closure(
self,
base_contains: impl Fn(&FileActionKey) -> bool,
) -> DeltaResult<IncrementalScanSummaryAgainstBase> {
let summary = self.into_summary()?;
let duplicate_adds: HashSet<FileActionKey> = summary
.live_adds
.iter()
.filter(|k| base_contains(k))
.cloned()
.collect();
Ok(IncrementalScanSummaryAgainstBase {
base_version: summary.base_version,
target_version: summary.target_version,
duplicate_adds,
removes: summary.removes,
})
}
pub fn into_listing_against_base_iter<'a>(
mut self,
base_keys: impl IntoIterator<Item = &'a FileActionKey>,
) -> DeltaResult<IncrementalListingAgainstBase> {
let mut add_files: Vec<FilteredEngineData> = Vec::new();
for item in self.by_ref() {
add_files.push(item?);
}
let summary = self.into_summary_against_base_iter(base_keys)?;
Ok(IncrementalListingAgainstBase { summary, add_files })
}
pub fn into_listing_against_base_closure(
mut self,
base_contains: impl Fn(&FileActionKey) -> bool,
) -> DeltaResult<IncrementalListingAgainstBase> {
let mut add_files: Vec<FilteredEngineData> = Vec::new();
for item in self.by_ref() {
add_files.push(item?);
}
let summary = self.into_summary_against_base_closure(base_contains)?;
Ok(IncrementalListingAgainstBase { summary, add_files })
}
}
impl std::fmt::Debug for IncrementalScanStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IncrementalScanStream")
.field("base_version", &self.base_version)
.field("target_version", &self.target_version)
.field("live_adds", &self.live_adds.len())
.field("removes", &self.removes.len())
.field("errored", &self.errored)
.finish()
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct IncrementalScanSummary {
pub base_version: Version,
pub target_version: Version,
pub live_adds: HashSet<FileActionKey>,
pub removes: HashSet<FileActionKey>,
}
#[non_exhaustive]
pub struct IncrementalListing {
pub summary: IncrementalScanSummary,
pub add_files: Vec<FilteredEngineData>,
}
impl std::fmt::Debug for IncrementalListing {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IncrementalListing")
.field("summary", &self.summary)
.field("add_files_batch_count", &self.add_files.len())
.finish()
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct IncrementalScanSummaryAgainstBase {
pub base_version: Version,
pub target_version: Version,
pub duplicate_adds: HashSet<FileActionKey>,
pub removes: HashSet<FileActionKey>,
}
#[non_exhaustive]
pub struct IncrementalListingAgainstBase {
pub summary: IncrementalScanSummaryAgainstBase,
pub add_files: Vec<FilteredEngineData>,
}
impl std::fmt::Debug for IncrementalListingAgainstBase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IncrementalListingAgainstBase")
.field("summary", &self.summary)
.field("add_files_batch_count", &self.add_files.len())
.finish()
}
}
fn process_batch(
batch: Box<dyn EngineData>,
seen_file_keys: &mut HashSet<FileActionKey>,
live_adds: &mut HashSet<FileActionKey>,
removes: &mut HashSet<FileActionKey>,
) -> DeltaResult<Option<FilteredEngineData>> {
let row_count = batch.len();
let mut adds_sel = vec![false; row_count];
let deduplicator = FileActionDeduplicator::new(
seen_file_keys,
true, ADD_PATH_INDEX,
REMOVE_PATH_INDEX,
ADD_DV_START_INDEX,
REMOVE_DV_START_INDEX,
);
let mut visitor = IncrementalDedupVisitor {
deduplicator,
adds_sel: &mut adds_sel,
live_adds,
removes,
};
visitor.visit_rows_of(batch.as_ref())?;
if !adds_sel.iter().any(|s| *s) {
return Ok(None);
}
Ok(Some(FilteredEngineData::try_new(batch, adds_sel)?))
}
const ADD_PATH_INDEX: usize = 0;
const ADD_DV_START_INDEX: usize = 1; const REMOVE_PATH_INDEX: usize = 4;
const REMOVE_DV_START_INDEX: usize = 5; const NUM_GETTERS: usize = 8;
struct IncrementalDedupVisitor<'a, 'seen> {
deduplicator: FileActionDeduplicator<'seen>,
adds_sel: &'a mut [bool],
live_adds: &'a mut HashSet<FileActionKey>,
removes: &'a mut HashSet<FileActionKey>,
}
impl RowVisitor for IncrementalDedupVisitor<'_, '_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
let columns = vec![
(STRING, column_name!("add.path")),
(STRING, column_name!("add.deletionVector.storageType")),
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("add.deletionVector.offset")),
(STRING, column_name!("remove.path")),
(STRING, column_name!("remove.deletionVector.storageType")),
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("remove.deletionVector.offset")),
];
let (types, names) = columns.into_iter().unzip();
(names, types).into()
});
let (names, types) = NAMES_AND_TYPES.as_ref();
(names, types)
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == NUM_GETTERS,
Error::InternalError(format!(
"IncrementalDedupVisitor expected {NUM_GETTERS} getters, got {}",
getters.len()
))
);
for i in 0..row_count {
let Some((key, is_add)) = self.deduplicator.extract_file_action(i, getters, false)?
else {
continue;
};
if self.deduplicator.check_and_record_seen(key.clone()) {
continue;
}
if is_add {
self.adds_sel[i] = true;
self.live_adds.insert(key);
} else {
self.removes.insert(key);
}
}
Ok(())
}
}