use std::collections::HashSet;
use std::sync::Arc;
use delta_kernel_derive::internal_api;
use tracing::debug;
use crate::engine_data::GetData;
use crate::log_replay::deduplicator::Deduplicator;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::{DeltaResult, EngineData};
pub(crate) mod deduplicator;
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
pub struct FileActionKey {
pub(crate) path: String,
pub(crate) dv_unique_id: Option<String>,
}
impl FileActionKey {
pub(crate) fn new(path: impl Into<String>, dv_unique_id: Option<String>) -> Self {
let path = path.into();
Self { path, dv_unique_id }
}
}
pub(crate) struct FileActionDeduplicator<'seen> {
seen_file_keys: &'seen mut HashSet<FileActionKey>,
is_log_batch: bool,
add_path_index: usize,
remove_path_index: usize,
add_dv_start_index: usize,
remove_dv_start_index: usize,
}
impl<'seen> FileActionDeduplicator<'seen> {
pub(crate) fn new(
seen_file_keys: &'seen mut HashSet<FileActionKey>,
is_log_batch: bool,
add_path_index: usize,
remove_path_index: usize,
add_dv_start_index: usize,
remove_dv_start_index: usize,
) -> Self {
Self {
seen_file_keys,
is_log_batch,
add_path_index,
remove_path_index,
add_dv_start_index,
remove_dv_start_index,
}
}
}
impl Deduplicator for FileActionDeduplicator<'_> {
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
if self.seen_file_keys.contains(&key) {
debug!(
"Ignoring duplicate ({}, {:?}) in scan, is log {}",
key.path, key.dv_unique_id, self.is_log_batch
);
true
} else {
debug!(
"Including ({}, {:?}) in scan, is log {}",
key.path, key.dv_unique_id, self.is_log_batch
);
if self.is_log_batch {
self.seen_file_keys.insert(key);
}
false
}
}
fn extract_file_action<'a>(
&self,
i: usize,
getters: &[&'a dyn GetData<'a>],
skip_removes: bool,
) -> DeltaResult<Option<(FileActionKey, bool)>> {
if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? {
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
return Ok(Some((FileActionKey::new(path, dv_unique_id), true)));
}
if skip_removes {
return Ok(None);
}
if let Some(path) = getters[self.remove_path_index].get_str(i, "remove.path")? {
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.remove_dv_start_index)?;
return Ok(Some((FileActionKey::new(path, dv_unique_id), false)));
}
Ok(None)
}
fn is_log_batch(&self) -> bool {
self.is_log_batch
}
}
#[internal_api]
pub(crate) struct ActionsBatch {
pub actions: Box<dyn EngineData>,
pub is_log_batch: bool,
}
impl ActionsBatch {
pub(crate) fn new(actions: Box<dyn EngineData>, is_log_batch: bool) -> Self {
Self {
actions,
is_log_batch,
}
}
#[allow(unused)]
#[internal_api]
pub(crate) fn actions(&self) -> &dyn EngineData {
self.actions.as_ref()
}
}
#[internal_api]
pub(crate) trait ParallelLogReplayProcessor {
type Output;
fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output>;
}
impl<T> ParallelLogReplayProcessor for Arc<T>
where
T: ParallelLogReplayProcessor,
{
type Output = T::Output;
fn process_actions_batch(&self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output> {
T::process_actions_batch(self, actions_batch)
}
}
#[allow(rustdoc::broken_intra_doc_links, rustdoc::private_intra_doc_links)]
#[internal_api]
pub(crate) trait LogReplayProcessor: Sized {
type Output: HasSelectionVector;
fn process_actions_batch(&mut self, actions_batch: ActionsBatch) -> DeltaResult<Self::Output>;
fn process_actions_iter(
mut self,
action_iter: impl Iterator<Item = DeltaResult<ActionsBatch>>,
) -> impl Iterator<Item = DeltaResult<Self::Output>> {
action_iter
.map(move |actions_batch| self.process_actions_batch(actions_batch?))
.filter(|res| {
res.as_ref()
.ok()
.is_none_or(|result| result.has_selected_rows())
})
}
fn build_selection_vector(&self, batch: &dyn EngineData) -> DeltaResult<Vec<bool>> {
match self.data_skipping_filter() {
Some(filter) => filter.apply(batch),
None => Ok(vec![true; batch.len()]), }
}
fn data_skipping_filter(&self) -> Option<&DataSkippingFilter>;
}
#[internal_api]
pub(crate) trait HasSelectionVector {
fn has_selected_rows(&self) -> bool;
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use super::deduplicator::CheckpointDeduplicator;
use super::*;
use crate::engine_data::GetData;
use crate::DeltaResult;
struct MockGetData {
string_values: HashMap<(usize, String), String>,
int_values: HashMap<(usize, String), i32>,
errors: HashMap<(usize, String), String>,
}
impl MockGetData {
fn new() -> Self {
Self {
string_values: HashMap::new(),
int_values: HashMap::new(),
errors: HashMap::new(),
}
}
fn add_string(&mut self, row: usize, field: &str, value: &str) {
self.string_values
.insert((row, field.to_string()), value.to_string());
}
fn add_int(&mut self, row: usize, field: &str, value: i32) {
self.int_values.insert((row, field.to_string()), value);
}
}
impl<'a> GetData<'a> for MockGetData {
fn get_str(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<&'a str>> {
if let Some(error_msg) = self.errors.get(&(row_index, field_name.to_string())) {
return Err(crate::Error::Generic(error_msg.clone()));
}
Ok(self
.string_values
.get(&(row_index, field_name.to_string()))
.map(|s| s.as_str()))
}
fn get_int(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<i32>> {
if let Some(error_msg) = self.errors.get(&(row_index, field_name.to_string())) {
return Err(crate::Error::Generic(error_msg.clone()));
}
Ok(self
.int_values
.get(&(row_index, field_name.to_string()))
.cloned())
}
}
fn create_deduplicator(
seen: &mut HashSet<FileActionKey>,
is_log_batch: bool,
) -> FileActionDeduplicator<'_> {
FileActionDeduplicator::new(
seen,
is_log_batch,
0, 5, 2, 6, )
}
fn create_getters_with_mocks<'a>(
add_mock: Option<&'a MockGetData>,
remove_mock: Option<&'a MockGetData>,
) -> Vec<&'a dyn GetData<'a>> {
use std::sync::LazyLock;
static EMPTY: LazyLock<MockGetData> = LazyLock::new(MockGetData::new);
let empty_ref = &*EMPTY;
vec![
add_mock.unwrap_or(empty_ref), empty_ref, add_mock.unwrap_or(empty_ref), add_mock.unwrap_or(empty_ref), add_mock.unwrap_or(empty_ref), remove_mock.unwrap_or(empty_ref), remove_mock.unwrap_or(empty_ref), remove_mock.unwrap_or(empty_ref), remove_mock.unwrap_or(empty_ref), ]
}
#[test]
fn test_extract_file_action_add() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);
let mut mock_add = MockGetData::new();
mock_add.add_string(0, "add.path", "file1.parquet");
let getters = create_getters_with_mocks(Some(&mock_add), None);
let result = deduplicator.extract_file_action(0, &getters, false)?;
assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert_eq!(key.path, "file1.parquet");
assert!(key.dv_unique_id.is_none());
assert!(is_add);
Ok(())
}
#[test]
fn test_extract_file_action_remove() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);
let mut mock_remove = MockGetData::new();
mock_remove.add_string(0, "remove.path", "file2.parquet");
let getters = create_getters_with_mocks(None, Some(&mock_remove));
let result = deduplicator.extract_file_action(0, &getters, false)?;
assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert_eq!(key.path, "file2.parquet");
assert!(!is_add);
Ok(())
}
#[test]
fn test_extract_file_action_with_deletion_vector() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);
let mut mock_dv = MockGetData::new();
mock_dv.add_string(0, "add.path", "file_with_dv.parquet");
mock_dv.add_string(0, "deletionVector.storageType", "s3");
mock_dv.add_string(0, "deletionVector.pathOrInlineDv", "path/to/dv");
mock_dv.add_int(0, "deletionVector.offset", 100);
let getters = create_getters_with_mocks(Some(&mock_dv), None);
let result = deduplicator.extract_file_action(0, &getters, false)?;
assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert!(matches!(
key.dv_unique_id.as_deref(),
Some("s3path/to/dv@100")
));
assert!(is_add);
Ok(())
}
#[test]
fn test_extract_file_action_skip_removes() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);
let mut mock_remove = MockGetData::new();
mock_remove.add_string(0, "remove.path", "file2.parquet");
let getters = create_getters_with_mocks(None, Some(&mock_remove));
assert!(deduplicator
.extract_file_action(0, &getters, true)?
.is_none());
assert!(deduplicator
.extract_file_action(0, &getters, false)?
.is_some());
Ok(())
}
#[test]
fn test_extract_file_action_no_action_found() -> DeltaResult<()> {
let mut seen = HashSet::new();
let deduplicator = create_deduplicator(&mut seen, true);
let getters = create_getters_with_mocks(None, None);
assert!(deduplicator
.extract_file_action(0, &getters, false)?
.is_none());
Ok(())
}
#[test]
fn test_check_and_record_seen() {
let mut seen = HashSet::new();
let pre_existing_key = FileActionKey::new("existing.parquet", None);
seen.insert(pre_existing_key.clone());
let key1 = FileActionKey::new("file1.parquet", None);
let key2 = FileActionKey::new("file2.parquet", None);
let key_with_dv = FileActionKey::new("file1.parquet", Some("dv1".to_string()));
{
let mut deduplicator = create_deduplicator(&mut seen, true);
assert!(deduplicator.check_and_record_seen(pre_existing_key.clone()));
assert!(!deduplicator.check_and_record_seen(key1.clone()));
assert!(!deduplicator.check_and_record_seen(key2.clone()));
assert!(!deduplicator.check_and_record_seen(key_with_dv.clone()));
assert!(deduplicator.check_and_record_seen(key1.clone()));
assert!(deduplicator.check_and_record_seen(key_with_dv.clone()));
}
assert!(seen.contains(&key1));
assert!(seen.contains(&key2));
assert!(seen.contains(&key_with_dv));
{
let mut deduplicator = create_deduplicator(&mut seen, false);
let new_key = FileActionKey::new("new.parquet", None);
assert!(!deduplicator.check_and_record_seen(new_key.clone()));
assert!(!deduplicator.check_and_record_seen(new_key.clone()));
assert!(deduplicator.check_and_record_seen(key1.clone()));
}
}
#[test]
fn test_is_log_batch() {
let mut seen = HashSet::new();
let deduplicator_log = create_deduplicator(&mut seen, true);
assert!(deduplicator_log.is_log_batch());
let deduplicator_checkpoint = create_deduplicator(&mut seen, false);
assert!(!deduplicator_checkpoint.is_log_batch());
}
#[test]
fn test_checkpoint_extract_file_action_add() -> DeltaResult<()> {
let seen = HashSet::new();
let deduplicator = CheckpointDeduplicator::try_new(&seen, 0, 2)?;
let mut mock_add = MockGetData::new();
mock_add.add_string(0, "add.path", "checkpoint_file.parquet");
let getters = create_getters_with_mocks(Some(&mock_add), None);
let result = deduplicator.extract_file_action(0, &getters, false)?;
assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert_eq!(key.path, "checkpoint_file.parquet");
assert!(key.dv_unique_id.is_none());
assert!(is_add);
Ok(())
}
#[test]
fn test_checkpoint_extract_file_action_with_deletion_vector() -> DeltaResult<()> {
let seen = HashSet::new();
let deduplicator = CheckpointDeduplicator::try_new(&seen, 0, 2)?;
let mut mock_dv = MockGetData::new();
mock_dv.add_string(0, "add.path", "file_with_dv.parquet");
mock_dv.add_string(0, "deletionVector.storageType", "s3");
mock_dv.add_string(0, "deletionVector.pathOrInlineDv", "path/to/dv");
mock_dv.add_int(0, "deletionVector.offset", 100);
let getters = create_getters_with_mocks(Some(&mock_dv), None);
let result = deduplicator.extract_file_action(0, &getters, false)?;
assert!(result.is_some());
let (key, is_add) = result.unwrap();
assert_eq!(key.path, "file_with_dv.parquet");
assert!(matches!(
key.dv_unique_id.as_deref(),
Some("s3path/to/dv@100")
));
assert!(is_add);
Ok(())
}
#[test]
fn test_checkpoint_deduplicator_filters_commit_duplicates() -> DeltaResult<()> {
let mut seen = HashSet::new();
seen.insert(FileActionKey::new("modified_in_commit.parquet", None));
seen.insert(FileActionKey::new(
"modified_with_dv.parquet",
Some("dv123".to_string()),
));
let mut deduplicator = CheckpointDeduplicator::try_new(&seen, 0, 2)?;
let commit_modified = FileActionKey::new("modified_in_commit.parquet", None);
assert!(
deduplicator.check_and_record_seen(commit_modified),
"Files seen in commits should be filtered from checkpoint"
);
let commit_modified_dv =
FileActionKey::new("modified_with_dv.parquet", Some("dv123".to_string()));
assert!(
deduplicator.check_and_record_seen(commit_modified_dv),
"Files with DVs seen in commits should be filtered from checkpoint"
);
let checkpoint_only = FileActionKey::new("checkpoint_only.parquet", None);
assert!(
!deduplicator.check_and_record_seen(checkpoint_only),
"Files only in checkpoint should not be filtered"
);
Ok(())
}
}