use std::time::Duration;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};
pub(crate) mod log_replay;
pub use log_replay::{ActionReconciliationIterator, ActionReconciliationIteratorState};
const SECONDS_PER_MINUTE: u64 = 60;
const MINUTES_PER_HOUR: u64 = 60;
const HOURS_PER_DAY: u64 = 24;
pub(crate) const DEFAULT_RETENTION_SECS: u64 =
7 * HOURS_PER_DAY * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
pub(crate) trait RetentionCalculator {
fn table_properties(&self) -> &TableProperties;
fn deleted_file_retention_timestamp(&self) -> DeltaResult<i64> {
let retention_duration = self.table_properties().deleted_file_retention_duration;
deleted_file_retention_timestamp_with_time(
retention_duration,
crate::utils::current_time_duration()?,
)
}
fn get_transaction_expiration_timestamp(&self) -> DeltaResult<Option<i64>> {
calculate_transaction_expiration_timestamp(self.table_properties())
}
}
pub(crate) fn deleted_file_retention_timestamp_with_time(
retention_duration: Option<Duration>,
now_duration: Duration,
) -> DeltaResult<i64> {
let retention_duration =
retention_duration.unwrap_or_else(|| Duration::from_secs(DEFAULT_RETENTION_SECS));
let now_ms = i64::try_from(now_duration.as_millis())
.map_err(|_| Error::checkpoint_write("Current timestamp exceeds i64 millisecond range"))?;
let retention_ms = i64::try_from(retention_duration.as_millis())
.map_err(|_| Error::checkpoint_write("Retention duration exceeds i64 millisecond range"))?;
Ok(now_ms - retention_ms)
}
pub(crate) fn calculate_transaction_expiration_timestamp(
table_properties: &TableProperties,
) -> DeltaResult<Option<i64>> {
table_properties
.set_transaction_retention_duration
.map(|duration| -> DeltaResult<i64> {
let now_ms = crate::utils::current_time_ms()?;
let expiration_ms = i64::try_from(duration.as_millis())
.map_err(|_| Error::generic("Retention duration exceeds i64 millisecond range"))?;
Ok(now_ms - expiration_ms)
})
.transpose()
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn test_deleted_file_retention_timestamp_with_time() -> DeltaResult<()> {
let reference_time = Duration::from_secs(1_000_000_000);
let result = deleted_file_retention_timestamp_with_time(None, reference_time)?;
let expected = 1_000_000_000_000 - (7 * 24 * 60 * 60 * 1000);
assert_eq!(result, expected);
let retention = Duration::from_secs(24 * 60 * 60); let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time)?;
let expected = 1_000_000_000_000 - (24 * 60 * 60 * 1000); assert_eq!(result, expected);
let retention = Duration::from_secs(0);
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time)?;
let expected = 1_000_000_000_000; assert_eq!(result, expected);
Ok(())
}
#[test]
fn test_deleted_file_retention_timestamp_edge_cases() {
let reference_time = Duration::from_secs(1_000_000_000);
let large_retention = Duration::from_secs(u64::MAX);
let result =
deleted_file_retention_timestamp_with_time(Some(large_retention), reference_time);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Retention duration exceeds i64 millisecond range"));
}
#[test]
fn test_deleted_file_retention_timestamp_with_large_now_time() {
let reference_time = Duration::from_secs(u64::MAX);
let retention = Duration::from_secs(1);
let result = deleted_file_retention_timestamp_with_time(Some(retention), reference_time);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Current timestamp exceeds i64 millisecond range"));
}
#[test]
fn test_calculate_transaction_expiration_timestamp() -> DeltaResult<()> {
let properties = TableProperties::default();
let result = calculate_transaction_expiration_timestamp(&properties)?;
assert_eq!(result, None);
let properties = TableProperties {
set_transaction_retention_duration: Some(Duration::from_secs(3600)), ..Default::default()
};
let result = calculate_transaction_expiration_timestamp(&properties)?;
assert!(result.is_some());
let timestamp = result.unwrap();
let now_ms = crate::utils::current_time_ms().unwrap();
let one_hour_ms = 3600 * 1000;
assert!(timestamp < now_ms);
assert!(timestamp > now_ms - one_hour_ms - 1000);
Ok(())
}
#[test]
fn test_calculate_transaction_expiration_timestamp_edge_cases() {
let properties = TableProperties {
set_transaction_retention_duration: Some(Duration::from_secs(u64::MAX)),
..Default::default()
};
let result = calculate_transaction_expiration_timestamp(&properties);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Retention duration exceeds i64 millisecond range"));
}
struct MockRetentionCalculator {
properties: TableProperties,
}
impl MockRetentionCalculator {
fn new(properties: TableProperties) -> Self {
Self { properties }
}
}
impl RetentionCalculator for MockRetentionCalculator {
fn table_properties(&self) -> &TableProperties {
&self.properties
}
}
#[test]
fn test_retention_calculator_trait_deleted_file_retention_timestamp() -> DeltaResult<()> {
let properties = TableProperties::default();
let calculator = MockRetentionCalculator::new(properties);
let result = calculator.deleted_file_retention_timestamp()?;
let now_ms = crate::utils::current_time_ms().unwrap();
let seven_days_ms = 7 * 24 * 60 * 60 * 1000;
assert!(result < now_ms);
assert!(result > now_ms - seven_days_ms - 1000);
let properties = TableProperties {
deleted_file_retention_duration: Some(Duration::from_secs(1800)), ..Default::default()
};
let calculator = MockRetentionCalculator::new(properties);
let result = calculator.deleted_file_retention_timestamp()?;
let thirty_minutes_ms = 30 * 60 * 1000;
assert!(result < now_ms);
assert!(result > now_ms - thirty_minutes_ms - 1000);
Ok(())
}
#[test]
fn test_retention_calculator_trait_get_transaction_expiration_timestamp() -> DeltaResult<()> {
let properties = TableProperties::default();
let calculator = MockRetentionCalculator::new(properties);
let result = calculator.get_transaction_expiration_timestamp()?;
assert_eq!(result, None);
let properties = TableProperties {
set_transaction_retention_duration: Some(Duration::from_secs(7200)), ..Default::default()
};
let calculator = MockRetentionCalculator::new(properties);
let result = calculator.get_transaction_expiration_timestamp()?;
assert!(result.is_some());
let timestamp = result.unwrap();
let now_ms = crate::utils::current_time_ms().unwrap();
let two_hours_ms = 2 * 60 * 60 * 1000;
assert!(timestamp < now_ms);
assert!(timestamp > now_ms - two_hours_ms - 1000);
Ok(())
}
}