use std::num::{NonZero, NonZeroU64};
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;
use delta_kernel::table_properties::{DataSkippingNumIndexedCols, IsolationLevel, TableProperties};
use super::Constraint;
use crate::errors::DeltaTableError;
#[derive(PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum TableProperty {
AppendOnly,
AutoOptimizeAutoCompact,
AutoOptimizeOptimizeWrite,
CheckpointInterval,
CheckpointWriteStatsAsJson,
CheckpointWriteStatsAsStruct,
CheckpointUseRunLengthEncoding,
ColumnMappingMode,
DataSkippingNumIndexedCols,
DataSkippingStatsColumns,
DeletedFileRetentionDuration,
EnableChangeDataFeed,
EnableDeletionVectors,
IsolationLevel,
LogRetentionDuration,
EnableExpiredLogCleanup,
MinReaderVersion,
MinWriterVersion,
RandomizeFilePrefixes,
RandomPrefixLength,
SetTransactionRetentionDuration,
TargetFileSize,
TuneFileSizesForRewrites,
CheckpointPolicy,
}
impl AsRef<str> for TableProperty {
fn as_ref(&self) -> &str {
match self {
Self::AppendOnly => "delta.appendOnly",
Self::CheckpointInterval => "delta.checkpointInterval",
Self::AutoOptimizeAutoCompact => "delta.autoOptimize.autoCompact",
Self::AutoOptimizeOptimizeWrite => "delta.autoOptimize.optimizeWrite",
Self::CheckpointWriteStatsAsJson => "delta.checkpoint.writeStatsAsJson",
Self::CheckpointWriteStatsAsStruct => "delta.checkpoint.writeStatsAsStruct",
Self::CheckpointUseRunLengthEncoding => "delta-rs.checkpoint.useRunLengthEncoding",
Self::CheckpointPolicy => "delta.checkpointPolicy",
Self::ColumnMappingMode => "delta.columnMapping.mode",
Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols",
Self::DataSkippingStatsColumns => "delta.dataSkippingStatsColumns",
Self::DeletedFileRetentionDuration => "delta.deletedFileRetentionDuration",
Self::EnableChangeDataFeed => "delta.enableChangeDataFeed",
Self::EnableDeletionVectors => "delta.enableDeletionVectors",
Self::IsolationLevel => "delta.isolationLevel",
Self::LogRetentionDuration => "delta.logRetentionDuration",
Self::EnableExpiredLogCleanup => "delta.enableExpiredLogCleanup",
Self::MinReaderVersion => "delta.minReaderVersion",
Self::MinWriterVersion => "delta.minWriterVersion",
Self::RandomizeFilePrefixes => "delta.randomizeFilePrefixes",
Self::RandomPrefixLength => "delta.randomPrefixLength",
Self::SetTransactionRetentionDuration => "delta.setTransactionRetentionDuration",
Self::TargetFileSize => "delta.targetFileSize",
Self::TuneFileSizesForRewrites => "delta.tuneFileSizesForRewrites",
}
}
}
impl FromStr for TableProperty {
type Err = DeltaTableError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"delta.appendOnly" => Ok(Self::AppendOnly),
"delta.checkpointInterval" => Ok(Self::CheckpointInterval),
"delta.autoOptimize.autoCompact" => Ok(Self::AutoOptimizeAutoCompact),
"delta.autoOptimize.optimizeWrite" => Ok(Self::AutoOptimizeOptimizeWrite),
"delta.checkpoint.writeStatsAsJson" => Ok(Self::CheckpointWriteStatsAsJson),
"delta.checkpoint.writeStatsAsStruct" => Ok(Self::CheckpointWriteStatsAsStruct),
"delta-rs.checkpoint.useRunLengthEncoding" => Ok(Self::CheckpointUseRunLengthEncoding),
"delta.checkpointPolicy" => Ok(Self::CheckpointPolicy),
"delta.columnMapping.mode" => Ok(Self::ColumnMappingMode),
"delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols),
"delta.dataSkippingStatsColumns" => Ok(Self::DataSkippingStatsColumns),
"delta.deletedFileRetentionDuration" | "deletedFileRetentionDuration" => {
Ok(Self::DeletedFileRetentionDuration)
}
"delta.enableChangeDataFeed" => Ok(Self::EnableChangeDataFeed),
"delta.enableDeletionVectors" => Ok(Self::EnableDeletionVectors),
"delta.isolationLevel" => Ok(Self::IsolationLevel),
"delta.logRetentionDuration" | "logRetentionDuration" => Ok(Self::LogRetentionDuration),
"delta.enableExpiredLogCleanup" | "enableExpiredLogCleanup" => {
Ok(Self::EnableExpiredLogCleanup)
}
"delta.minReaderVersion" => Ok(Self::MinReaderVersion),
"delta.minWriterVersion" => Ok(Self::MinWriterVersion),
"delta.randomizeFilePrefixes" => Ok(Self::RandomizeFilePrefixes),
"delta.randomPrefixLength" => Ok(Self::RandomPrefixLength),
"delta.setTransactionRetentionDuration" => Ok(Self::SetTransactionRetentionDuration),
"delta.targetFileSize" => Ok(Self::TargetFileSize),
"delta.tuneFileSizesForRewrites" => Ok(Self::TuneFileSizesForRewrites),
_ => Err(DeltaTableError::Generic("unknown config key".into())),
}
}
}
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum DeltaConfigError {
#[error("Validation failed - {0}")]
Validation(String),
}
pub const DEFAULT_NUM_INDEX_COLS: u64 = 32;
pub const DEFAULT_TARGET_FILE_SIZE: NonZeroU64 = NonZeroU64::new(100 * 1024 * 1024).unwrap();
pub trait TablePropertiesExt {
fn append_only(&self) -> bool;
fn log_retention_duration(&self) -> Duration;
fn enable_expired_log_cleanup(&self) -> bool;
fn checkpoint_interval(&self) -> NonZero<u64>;
fn num_indexed_cols(&self) -> DataSkippingNumIndexedCols;
fn target_file_size(&self) -> NonZero<u64>;
fn enable_change_data_feed(&self) -> bool;
fn deleted_file_retention_duration(&self) -> Duration;
fn isolation_level(&self) -> IsolationLevel;
fn get_constraints(&self) -> Vec<Constraint>;
}
impl TablePropertiesExt for TableProperties {
fn append_only(&self) -> bool {
self.append_only.unwrap_or(false)
}
fn log_retention_duration(&self) -> Duration {
static DEFAULT_DURATION: LazyLock<Duration> =
LazyLock::new(|| parse_interval("interval 30 days").unwrap());
self.log_retention_duration
.unwrap_or(DEFAULT_DURATION.to_owned())
}
fn enable_expired_log_cleanup(&self) -> bool {
self.enable_expired_log_cleanup.unwrap_or(true)
}
fn checkpoint_interval(&self) -> NonZero<u64> {
static DEFAULT_INTERVAL: LazyLock<NonZero<u64>> =
LazyLock::new(|| NonZero::new(100).unwrap());
self.checkpoint_interval
.unwrap_or(DEFAULT_INTERVAL.to_owned())
}
fn num_indexed_cols(&self) -> DataSkippingNumIndexedCols {
self.data_skipping_num_indexed_cols
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32))
}
fn target_file_size(&self) -> NonZeroU64 {
self.target_file_size.unwrap_or(DEFAULT_TARGET_FILE_SIZE)
}
fn enable_change_data_feed(&self) -> bool {
self.enable_change_data_feed.unwrap_or(false)
}
fn deleted_file_retention_duration(&self) -> Duration {
static DEFAULT_DURATION: LazyLock<Duration> =
LazyLock::new(|| parse_interval("interval 1 weeks").unwrap());
self.deleted_file_retention_duration
.unwrap_or(DEFAULT_DURATION.to_owned())
}
fn isolation_level(&self) -> IsolationLevel {
self.isolation_level.unwrap_or_default()
}
fn get_constraints(&self) -> Vec<Constraint> {
self.unknown_properties
.iter()
.filter_map(|(field, value)| {
if field.starts_with("delta.constraints") {
let constraint_name = field.replace("delta.constraints.", "");
Some(Constraint::new(&constraint_name, value))
} else {
None
}
})
.collect()
}
}
const SECONDS_PER_MINUTE: u64 = 60;
const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE;
const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR;
const SECONDS_PER_WEEK: u64 = 7 * SECONDS_PER_DAY;
fn parse_interval(value: &str) -> Result<Duration, DeltaConfigError> {
let not_an_interval = || DeltaConfigError::Validation(format!("'{value}' is not an interval"));
if !value.starts_with("interval ") {
return Err(not_an_interval());
}
let mut it = value.split_whitespace();
let _ = it.next(); let number = parse_int(it.next().ok_or_else(not_an_interval)?)?;
if number < 0 {
return Err(DeltaConfigError::Validation(format!(
"interval '{value}' cannot be negative"
)));
}
let number = number as u64;
let duration = match it.next().ok_or_else(not_an_interval)? {
"nanosecond" | "nanoseconds" => Duration::from_nanos(number),
"microsecond" | "microseconds" => Duration::from_micros(number),
"millisecond" | "milliseconds" => Duration::from_millis(number),
"second" | "seconds" => Duration::from_secs(number),
"minute" | "minutes" => Duration::from_secs(number * SECONDS_PER_MINUTE),
"hour" | "hours" => Duration::from_secs(number * SECONDS_PER_HOUR),
"day" | "days" => Duration::from_secs(number * SECONDS_PER_DAY),
"week" | "weeks" => Duration::from_secs(number * SECONDS_PER_WEEK),
unit => {
return Err(DeltaConfigError::Validation(format!(
"Unknown unit '{unit}'"
)));
}
};
Ok(duration)
}
fn parse_int(value: &str) -> Result<i64, DeltaConfigError> {
value.parse().map_err(|e| {
DeltaConfigError::Validation(format!("Cannot parse '{value}' as integer: {e}"))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_interval_test() {
assert_eq!(
parse_interval("interval 123 nanosecond").unwrap(),
Duration::from_nanos(123)
);
assert_eq!(
parse_interval("interval 123 nanoseconds").unwrap(),
Duration::from_nanos(123)
);
assert_eq!(
parse_interval("interval 123 microsecond").unwrap(),
Duration::from_micros(123)
);
assert_eq!(
parse_interval("interval 123 microseconds").unwrap(),
Duration::from_micros(123)
);
assert_eq!(
parse_interval("interval 123 millisecond").unwrap(),
Duration::from_millis(123)
);
assert_eq!(
parse_interval("interval 123 milliseconds").unwrap(),
Duration::from_millis(123)
);
assert_eq!(
parse_interval("interval 123 second").unwrap(),
Duration::from_secs(123)
);
assert_eq!(
parse_interval("interval 123 seconds").unwrap(),
Duration::from_secs(123)
);
assert_eq!(
parse_interval("interval 123 minute").unwrap(),
Duration::from_secs(123 * 60)
);
assert_eq!(
parse_interval("interval 123 minutes").unwrap(),
Duration::from_secs(123 * 60)
);
assert_eq!(
parse_interval("interval 123 hour").unwrap(),
Duration::from_secs(123 * 3600)
);
assert_eq!(
parse_interval("interval 123 hours").unwrap(),
Duration::from_secs(123 * 3600)
);
assert_eq!(
parse_interval("interval 123 day").unwrap(),
Duration::from_secs(123 * 86400)
);
assert_eq!(
parse_interval("interval 123 days").unwrap(),
Duration::from_secs(123 * 86400)
);
assert_eq!(
parse_interval("interval 123 week").unwrap(),
Duration::from_secs(123 * 604800)
);
assert_eq!(
parse_interval("interval 123 week").unwrap(),
Duration::from_secs(123 * 604800)
);
}
#[test]
fn parse_interval_invalid_test() {
assert_eq!(
parse_interval("whatever").err().unwrap(),
DeltaConfigError::Validation("'whatever' is not an interval".to_string())
);
assert_eq!(
parse_interval("interval").err().unwrap(),
DeltaConfigError::Validation("'interval' is not an interval".to_string())
);
assert_eq!(
parse_interval("interval 2").err().unwrap(),
DeltaConfigError::Validation("'interval 2' is not an interval".to_string())
);
assert_eq!(
parse_interval("interval 2 years").err().unwrap(),
DeltaConfigError::Validation("Unknown unit 'years'".to_string())
);
assert_eq!(
parse_interval("interval two years").err().unwrap(),
DeltaConfigError::Validation(
"Cannot parse 'two' as integer: invalid digit found in string".to_string()
)
);
assert_eq!(
parse_interval("interval -25 hours").err().unwrap(),
DeltaConfigError::Validation(
"interval 'interval -25 hours' cannot be negative".to_string()
)
);
}
}