use url::Url;
use super::COMPACTION_ACTIONS_SCHEMA;
use crate::action_reconciliation::log_replay::ActionReconciliationProcessor;
use crate::action_reconciliation::{ActionReconciliationIterator, RetentionCalculator};
use crate::log_replay::LogReplayProcessor;
use crate::log_segment::LogSegment;
use crate::path::ParsedLogPath;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, Error, SnapshotRef, Version};
pub fn should_compact(commit_version: Version, compaction_interval: Version) -> bool {
compaction_interval > 0
&& commit_version > 0
&& (commit_version + 1).is_multiple_of(compaction_interval)
}
#[derive(Debug)]
pub struct LogCompactionWriter {
snapshot: SnapshotRef,
start_version: Version,
end_version: Version,
compaction_path: Url,
}
impl RetentionCalculator for LogCompactionWriter {
fn table_properties(&self) -> &TableProperties {
self.snapshot.table_properties()
}
}
impl LogCompactionWriter {
pub(crate) fn try_new(
snapshot: SnapshotRef,
start_version: Version,
end_version: Version,
) -> DeltaResult<Self> {
if start_version >= end_version {
return Err(Error::generic(format!(
"Invalid version range: end_version {end_version} must be greater than start_version {start_version}"
)));
}
snapshot.log_segment().validate_published()?;
let compaction_path =
ParsedLogPath::new_log_compaction(snapshot.table_root(), start_version, end_version)?;
Ok(Self {
snapshot,
start_version,
end_version,
compaction_path: compaction_path.location,
})
}
pub fn compaction_path(&self) -> &Url {
&self.compaction_path
}
pub fn compaction_data(
&mut self,
engine: &dyn Engine,
) -> DeltaResult<ActionReconciliationIterator> {
let snapshot_end_version = self.snapshot.version();
if self.end_version > snapshot_end_version {
return Err(Error::generic(format!(
"End version {} exceeds snapshot version {}",
self.end_version, snapshot_end_version
)));
}
let compaction_log_segment = LogSegment::for_table_changes(
engine.storage_handler().as_ref(),
self.snapshot.log_segment().log_root.clone(),
self.start_version,
Some(self.end_version),
)?;
let actions_iter =
compaction_log_segment.read_actions(engine, COMPACTION_ACTIONS_SCHEMA.clone())?;
let min_file_retention_timestamp_millis = self.deleted_file_retention_timestamp()?;
let processor = ActionReconciliationProcessor::new(
min_file_retention_timestamp_millis,
self.get_transaction_expiration_timestamp()?,
);
let result_iter = processor.process_actions_iter(actions_iter);
Ok(ActionReconciliationIterator::new(Box::new(result_iter)))
}
}