use std::collections::HashSet;
use tracing::warn;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::engine_data::{GetData, TypedGetData};
use crate::log_replay::FileActionKey;
use crate::DeltaResult;
pub(crate) struct FileActionInfo {
pub(crate) key: FileActionKey,
pub(crate) size: u64,
pub(crate) is_add: bool,
}
pub(crate) trait Deduplicator {
fn extract_file_action<'a>(
&self,
i: usize,
getters: &[&'a dyn GetData<'a>],
skip_removes: bool,
) -> DeltaResult<Option<FileActionInfo>>;
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool;
fn is_log_batch(&self) -> bool;
fn extract_dv_unique_id<'a>(
&self,
i: usize,
getters: &[&'a dyn GetData<'a>],
dv_start_index: usize,
) -> DeltaResult<Option<String>> {
let Some(storage_type) =
getters[dv_start_index].get_opt(i, "deletionVector.storageType")?
else {
return Ok(None);
};
let path_or_inline = getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?;
let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?;
Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
storage_type,
path_or_inline,
offset,
)))
}
}
#[allow(unused)]
pub(crate) struct CheckpointDeduplicator<'a> {
seen_file_keys: &'a HashSet<FileActionKey>,
add_path_index: usize,
add_size_index: usize,
add_dv_start_index: usize,
}
impl<'a> CheckpointDeduplicator<'a> {
#[allow(unused)]
pub(crate) fn try_new(
seen_file_keys: &'a HashSet<FileActionKey>,
add_path_index: usize,
add_size_index: usize,
add_dv_start_index: usize,
) -> DeltaResult<Self> {
Ok(CheckpointDeduplicator {
seen_file_keys,
add_path_index,
add_size_index,
add_dv_start_index,
})
}
}
impl Deduplicator for CheckpointDeduplicator<'_> {
fn extract_file_action<'b>(
&self,
i: usize,
getters: &[&'b dyn GetData<'b>],
_skip_removes: bool,
) -> DeltaResult<Option<FileActionInfo>> {
let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? else {
return Ok(None);
};
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
let size = match getters[self.add_size_index].get_long(i, "add.size")? {
Some(s) => u64::try_from(s).unwrap_or_else(|e| {
warn!("Could not convert add.size {s} to u64: {e}");
0
}),
None => {
warn!("Add action without required size field");
0
}
};
Ok(Some(FileActionInfo {
key: FileActionKey::new(path, dv_unique_id),
size,
is_add: true,
}))
}
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
self.seen_file_keys.contains(&key)
}
fn is_log_batch(&self) -> bool {
false
}
}