#![allow(dead_code, unused_imports)]
use url::Url;
use super::COMPACTION_ACTIONS_SCHEMA;
use crate::action_reconciliation::log_replay::ActionReconciliationProcessor;
use crate::action_reconciliation::{ActionReconciliationIterator, RetentionCalculator};
use crate::log_replay::LogReplayProcessor;
use crate::log_segment::LogSegment;
use crate::path::ParsedLogPath;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, Error, SnapshotRef, Version};
pub fn should_compact(_commit_version: Version, _compaction_interval: Version) -> bool {
false
}
#[derive(Debug)]
pub struct LogCompactionWriter {
snapshot: SnapshotRef,
#[allow(dead_code)]
start_version: Version,
#[allow(dead_code)]
end_version: Version,
compaction_path: Url,
}
impl RetentionCalculator for LogCompactionWriter {
fn table_properties(&self) -> &TableProperties {
self.snapshot.table_properties()
}
}
impl LogCompactionWriter {
pub(crate) fn try_new(
_snapshot: SnapshotRef,
_start_version: Version,
_end_version: Version,
) -> DeltaResult<Self> {
Err(Error::unsupported(
"Log compaction is not currently supported",
))
}
pub fn compaction_path(&self) -> &Url {
&self.compaction_path
}
pub fn compaction_data(
&mut self,
engine: &dyn Engine,
) -> DeltaResult<ActionReconciliationIterator> {
let snapshot_end_version = self.snapshot.version();
if self.end_version > snapshot_end_version {
return Err(Error::generic(format!(
"End version {} exceeds snapshot version {}",
self.end_version, snapshot_end_version
)));
}
let compaction_log_segment = LogSegment::for_table_changes(
engine.storage_handler().as_ref(),
self.snapshot.log_segment().log_root.clone(),
self.start_version,
Some(self.end_version),
)?;
let actions_iter =
compaction_log_segment.read_actions(engine, COMPACTION_ACTIONS_SCHEMA.clone())?;
let min_file_retention_timestamp_millis = self.deleted_file_retention_timestamp()?;
let processor = ActionReconciliationProcessor::new(
min_file_retention_timestamp_millis,
self.get_transaction_expiration_timestamp()?,
);
let result_iter = processor.process_actions_iter(actions_iter);
Ok(ActionReconciliationIterator::new(Box::new(result_iter)))
}
}