use std::collections::HashMap;
use std::num::NonZero;
use std::time::Duration;
use crate::expressions::ColumnName;
use crate::table_features::ColumnMappingMode;
use crate::{Error, Version};
use strum::EnumString;
mod deserialize;
pub use deserialize::ParseIntervalError;
pub const DELTA_PROPERTY_PREFIX: &str = "delta.";
pub(crate) const APPEND_ONLY: &str = "delta.appendOnly";
pub(crate) const AUTO_COMPACT: &str = "delta.autoOptimize.autoCompact";
pub(crate) const OPTIMIZE_WRITE: &str = "delta.autoOptimize.optimizeWrite";
pub(crate) const CHECKPOINT_INTERVAL: &str = "delta.checkpointInterval";
pub(crate) const CHECKPOINT_WRITE_STATS_AS_JSON: &str = "delta.checkpoint.writeStatsAsJson";
pub(crate) const CHECKPOINT_WRITE_STATS_AS_STRUCT: &str = "delta.checkpoint.writeStatsAsStruct";
pub(crate) const COLUMN_MAPPING_MODE: &str = "delta.columnMapping.mode";
pub(crate) const COLUMN_MAPPING_MAX_COLUMN_ID: &str = "delta.columnMapping.maxColumnId";
pub(crate) const DATA_SKIPPING_NUM_INDEXED_COLS: &str = "delta.dataSkippingNumIndexedCols";
pub(crate) const DATA_SKIPPING_STATS_COLUMNS: &str = "delta.dataSkippingStatsColumns";
pub(crate) const DELETED_FILE_RETENTION_DURATION: &str = "delta.deletedFileRetentionDuration";
pub(crate) const ENABLE_CHANGE_DATA_FEED: &str = "delta.enableChangeDataFeed";
pub(crate) const ENABLE_DELETION_VECTORS: &str = "delta.enableDeletionVectors";
pub(crate) const ENABLE_TYPE_WIDENING: &str = "delta.enableTypeWidening";
pub(crate) const ENABLE_ICEBERG_COMPAT_V1: &str = "delta.enableIcebergCompatV1";
pub(crate) const ENABLE_ICEBERG_COMPAT_V2: &str = "delta.enableIcebergCompatV2";
pub(crate) const ISOLATION_LEVEL: &str = "delta.isolationLevel";
pub(crate) const LOG_RETENTION_DURATION: &str = "delta.logRetentionDuration";
pub(crate) const ENABLE_EXPIRED_LOG_CLEANUP: &str = "delta.enableExpiredLogCleanup";
pub(crate) const RANDOMIZE_FILE_PREFIXES: &str = "delta.randomizeFilePrefixes";
pub(crate) const RANDOM_PREFIX_LENGTH: &str = "delta.randomPrefixLength";
pub(crate) const SET_TRANSACTION_RETENTION_DURATION: &str = "delta.setTransactionRetentionDuration";
pub(crate) const TARGET_FILE_SIZE: &str = "delta.targetFileSize";
pub(crate) const TUNE_FILE_SIZES_FOR_REWRITES: &str = "delta.tuneFileSizesForRewrites";
pub(crate) const CHECKPOINT_POLICY: &str = "delta.checkpointPolicy";
pub(crate) const ENABLE_ROW_TRACKING: &str = "delta.enableRowTracking";
pub(crate) const MATERIALIZED_ROW_ID_COLUMN_NAME: &str =
"delta.rowTracking.materializedRowIdColumnName";
pub(crate) const MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME: &str =
"delta.rowTracking.materializedRowCommitVersionColumnName";
pub(crate) const ROW_TRACKING_SUSPENDED: &str = "delta.rowTrackingSuspended";
pub(crate) const PARQUET_FORMAT_VERSION: &str = "delta.parquet.format.version";
pub(crate) const ENABLE_IN_COMMIT_TIMESTAMPS: &str = "delta.enableInCommitTimestamps";
pub(crate) const IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION: &str =
"delta.inCommitTimestampEnablementVersion";
pub(crate) const IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP: &str =
"delta.inCommitTimestampEnablementTimestamp";
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct TableProperties {
pub append_only: Option<bool>,
pub auto_compact: Option<bool>,
pub optimize_write: Option<bool>,
pub checkpoint_interval: Option<NonZero<u64>>,
pub checkpoint_write_stats_as_json: Option<bool>,
pub checkpoint_write_stats_as_struct: Option<bool>,
pub column_mapping_mode: Option<ColumnMappingMode>,
pub data_skipping_num_indexed_cols: Option<DataSkippingNumIndexedCols>,
pub data_skipping_stats_columns: Option<Vec<ColumnName>>,
pub deleted_file_retention_duration: Option<Duration>,
pub enable_change_data_feed: Option<bool>,
pub enable_deletion_vectors: Option<bool>,
pub enable_type_widening: Option<bool>,
pub enable_iceberg_compat_v1: Option<bool>,
pub enable_iceberg_compat_v2: Option<bool>,
pub isolation_level: Option<IsolationLevel>,
pub log_retention_duration: Option<Duration>,
pub enable_expired_log_cleanup: Option<bool>,
pub randomize_file_prefixes: Option<bool>,
pub random_prefix_length: Option<NonZero<u64>>,
pub set_transaction_retention_duration: Option<Duration>,
pub target_file_size: Option<NonZero<u64>>,
pub tune_file_sizes_for_rewrites: Option<bool>,
pub checkpoint_policy: Option<CheckpointPolicy>,
pub enable_row_tracking: Option<bool>,
pub row_tracking_suspended: Option<bool>,
pub materialized_row_id_column_name: Option<String>,
pub materialized_row_commit_version_column_name: Option<String>,
pub parquet_format_version: Option<String>,
pub enable_in_commit_timestamps: Option<bool>,
pub in_commit_timestamp_enablement_version: Option<Version>,
pub in_commit_timestamp_enablement_timestamp: Option<i64>,
pub unknown_properties: HashMap<String, String>,
}
impl TableProperties {
pub fn should_write_stats_as_json(&self) -> bool {
self.checkpoint_write_stats_as_json.unwrap_or(true)
}
pub fn should_write_stats_as_struct(&self) -> bool {
self.checkpoint_write_stats_as_struct.unwrap_or(false)
}
}
pub const DEFAULT_NUM_INDEXED_COLS: u64 = 32;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DataSkippingNumIndexedCols {
AllColumns,
NumColumns(u64),
}
impl Default for DataSkippingNumIndexedCols {
fn default() -> Self {
DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEXED_COLS)
}
}
impl TryFrom<&str> for DataSkippingNumIndexedCols {
type Error = Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let num: i64 = value.parse().map_err(|_| {
Error::generic("couldn't parse DataSkippingNumIndexedCols to an integer")
})?;
match num {
-1 => Ok(DataSkippingNumIndexedCols::AllColumns),
x => Ok(DataSkippingNumIndexedCols::NumColumns(
x.try_into().map_err(|_| {
Error::generic("couldn't parse DataSkippingNumIndexedCols to positive integer")
})?,
)),
}
}
}
#[derive(Debug, EnumString, Default, Copy, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
pub enum IsolationLevel {
#[default]
Serializable,
WriteSerializable,
SnapshotIsolation,
}
#[derive(Debug, EnumString, Default, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
pub enum CheckpointPolicy {
#[default]
Classic,
V2,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::column_name;
use std::collections::HashMap;
#[test]
fn test_property_key_constants() {
assert_eq!(APPEND_ONLY, "delta.appendOnly");
assert_eq!(AUTO_COMPACT, "delta.autoOptimize.autoCompact");
assert_eq!(OPTIMIZE_WRITE, "delta.autoOptimize.optimizeWrite");
assert_eq!(CHECKPOINT_INTERVAL, "delta.checkpointInterval");
assert_eq!(
CHECKPOINT_WRITE_STATS_AS_JSON,
"delta.checkpoint.writeStatsAsJson"
);
assert_eq!(
CHECKPOINT_WRITE_STATS_AS_STRUCT,
"delta.checkpoint.writeStatsAsStruct"
);
assert_eq!(COLUMN_MAPPING_MODE, "delta.columnMapping.mode");
assert_eq!(
DATA_SKIPPING_NUM_INDEXED_COLS,
"delta.dataSkippingNumIndexedCols"
);
assert_eq!(
DATA_SKIPPING_STATS_COLUMNS,
"delta.dataSkippingStatsColumns"
);
assert_eq!(
DELETED_FILE_RETENTION_DURATION,
"delta.deletedFileRetentionDuration"
);
assert_eq!(ENABLE_CHANGE_DATA_FEED, "delta.enableChangeDataFeed");
assert_eq!(ENABLE_DELETION_VECTORS, "delta.enableDeletionVectors");
assert_eq!(ENABLE_TYPE_WIDENING, "delta.enableTypeWidening");
assert_eq!(ENABLE_ICEBERG_COMPAT_V1, "delta.enableIcebergCompatV1");
assert_eq!(ENABLE_ICEBERG_COMPAT_V2, "delta.enableIcebergCompatV2");
assert_eq!(ISOLATION_LEVEL, "delta.isolationLevel");
assert_eq!(LOG_RETENTION_DURATION, "delta.logRetentionDuration");
assert_eq!(ENABLE_EXPIRED_LOG_CLEANUP, "delta.enableExpiredLogCleanup");
assert_eq!(RANDOMIZE_FILE_PREFIXES, "delta.randomizeFilePrefixes");
assert_eq!(RANDOM_PREFIX_LENGTH, "delta.randomPrefixLength");
assert_eq!(
SET_TRANSACTION_RETENTION_DURATION,
"delta.setTransactionRetentionDuration"
);
assert_eq!(TARGET_FILE_SIZE, "delta.targetFileSize");
assert_eq!(
TUNE_FILE_SIZES_FOR_REWRITES,
"delta.tuneFileSizesForRewrites"
);
assert_eq!(CHECKPOINT_POLICY, "delta.checkpointPolicy");
assert_eq!(ENABLE_ROW_TRACKING, "delta.enableRowTracking");
assert_eq!(
MATERIALIZED_ROW_ID_COLUMN_NAME,
"delta.rowTracking.materializedRowIdColumnName"
);
assert_eq!(
MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME,
"delta.rowTracking.materializedRowCommitVersionColumnName"
);
assert_eq!(ROW_TRACKING_SUSPENDED, "delta.rowTrackingSuspended");
assert_eq!(PARQUET_FORMAT_VERSION, "delta.parquet.format.version");
assert_eq!(
ENABLE_IN_COMMIT_TIMESTAMPS,
"delta.enableInCommitTimestamps"
);
assert_eq!(
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION,
"delta.inCommitTimestampEnablementVersion"
);
assert_eq!(
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP,
"delta.inCommitTimestampEnablementTimestamp"
);
}
#[test]
fn test_parse_type_widening() {
let properties = HashMap::from([(ENABLE_TYPE_WIDENING.to_string(), "true".to_string())]);
let table_properties = TableProperties::from(properties.iter());
assert_eq!(table_properties.enable_type_widening, Some(true));
let properties = HashMap::from([(ENABLE_TYPE_WIDENING.to_string(), "false".to_string())]);
let table_properties = TableProperties::from(properties.iter());
assert_eq!(table_properties.enable_type_widening, Some(false));
}
#[test]
fn test_parse_iceberg_compat_v1() {
let properties =
HashMap::from([(ENABLE_ICEBERG_COMPAT_V1.to_string(), "true".to_string())]);
let table_properties = TableProperties::from(properties.iter());
assert_eq!(table_properties.enable_iceberg_compat_v1, Some(true));
let properties =
HashMap::from([(ENABLE_ICEBERG_COMPAT_V1.to_string(), "false".to_string())]);
let table_properties = TableProperties::from(properties.iter());
assert_eq!(table_properties.enable_iceberg_compat_v1, Some(false));
}
#[test]
fn test_parse_iceberg_compat_v2() {
let properties =
HashMap::from([(ENABLE_ICEBERG_COMPAT_V2.to_string(), "true".to_string())]);
let table_properties = TableProperties::from(properties.iter());
assert_eq!(table_properties.enable_iceberg_compat_v2, Some(true));
let properties =
HashMap::from([(ENABLE_ICEBERG_COMPAT_V2.to_string(), "false".to_string())]);
let table_properties = TableProperties::from(properties.iter());
assert_eq!(table_properties.enable_iceberg_compat_v2, Some(false));
}
#[test]
fn known_key_unknown_val() {
let properties = HashMap::from([(APPEND_ONLY.to_string(), "wack".to_string())]);
let table_properties = TableProperties::from(properties.iter());
let unknown_properties = HashMap::from([(APPEND_ONLY.to_string(), "wack".to_string())]);
let expected = TableProperties {
unknown_properties,
..Default::default()
};
assert_eq!(table_properties, expected);
}
#[test]
fn allow_unknown_keys() {
let properties = [("unknown_properties".to_string(), "two words".to_string())];
let actual = TableProperties::from(properties.clone().into_iter());
let expected = TableProperties {
unknown_properties: HashMap::from(properties),
..Default::default()
};
assert_eq!(actual, expected);
}
#[test]
fn test_empty_table_properties() {
let map: HashMap<String, String> = HashMap::new();
let actual = TableProperties::from(map.iter());
let default_table_properties = TableProperties::default();
assert_eq!(actual, default_table_properties);
}
#[test]
fn test_parse_table_properties() {
let properties = [
(APPEND_ONLY, "true"),
(OPTIMIZE_WRITE, "true"),
(AUTO_COMPACT, "true"),
(CHECKPOINT_INTERVAL, "101"),
(CHECKPOINT_WRITE_STATS_AS_JSON, "true"),
(CHECKPOINT_WRITE_STATS_AS_STRUCT, "true"),
(COLUMN_MAPPING_MODE, "id"),
(DATA_SKIPPING_NUM_INDEXED_COLS, "-1"),
(DATA_SKIPPING_STATS_COLUMNS, "col1,col2"),
(DELETED_FILE_RETENTION_DURATION, "interval 1 second"),
(ENABLE_CHANGE_DATA_FEED, "true"),
(ENABLE_DELETION_VECTORS, "true"),
(ENABLE_TYPE_WIDENING, "true"),
(ENABLE_ICEBERG_COMPAT_V1, "true"),
(ENABLE_ICEBERG_COMPAT_V2, "true"),
(ISOLATION_LEVEL, "snapshotIsolation"),
(LOG_RETENTION_DURATION, "interval 2 seconds"),
(ENABLE_EXPIRED_LOG_CLEANUP, "true"),
(RANDOMIZE_FILE_PREFIXES, "true"),
(RANDOM_PREFIX_LENGTH, "1001"),
(SET_TRANSACTION_RETENTION_DURATION, "interval 60 seconds"),
(TARGET_FILE_SIZE, "1000000000"),
(TUNE_FILE_SIZES_FOR_REWRITES, "true"),
(CHECKPOINT_POLICY, "v2"),
(ENABLE_ROW_TRACKING, "true"),
(MATERIALIZED_ROW_ID_COLUMN_NAME, "_row-id-col-some_uuid"),
(
MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME,
"_row-commit-version-col-some_uuid",
),
(ROW_TRACKING_SUSPENDED, "false"),
(ENABLE_IN_COMMIT_TIMESTAMPS, "true"),
(IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION, "15"),
(IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP, "1612345678"),
(PARQUET_FORMAT_VERSION, "2.12.0"),
];
let actual = TableProperties::from(properties.into_iter());
let expected = TableProperties {
append_only: Some(true),
optimize_write: Some(true),
auto_compact: Some(true),
checkpoint_interval: Some(NonZero::new(101).unwrap()),
checkpoint_write_stats_as_json: Some(true),
checkpoint_write_stats_as_struct: Some(true),
column_mapping_mode: Some(ColumnMappingMode::Id),
data_skipping_num_indexed_cols: Some(DataSkippingNumIndexedCols::AllColumns),
data_skipping_stats_columns: Some(vec![column_name!("col1"), column_name!("col2")]),
deleted_file_retention_duration: Some(Duration::new(1, 0)),
enable_change_data_feed: Some(true),
enable_deletion_vectors: Some(true),
enable_type_widening: Some(true),
enable_iceberg_compat_v1: Some(true),
enable_iceberg_compat_v2: Some(true),
isolation_level: Some(IsolationLevel::SnapshotIsolation),
log_retention_duration: Some(Duration::new(2, 0)),
enable_expired_log_cleanup: Some(true),
randomize_file_prefixes: Some(true),
random_prefix_length: Some(NonZero::new(1001).unwrap()),
set_transaction_retention_duration: Some(Duration::new(60, 0)),
target_file_size: Some(NonZero::new(1_000_000_000).unwrap()),
tune_file_sizes_for_rewrites: Some(true),
checkpoint_policy: Some(CheckpointPolicy::V2),
enable_row_tracking: Some(true),
materialized_row_id_column_name: Some("_row-id-col-some_uuid".to_string()),
materialized_row_commit_version_column_name: Some(
"_row-commit-version-col-some_uuid".to_string(),
),
row_tracking_suspended: Some(false),
enable_in_commit_timestamps: Some(true),
in_commit_timestamp_enablement_version: Some(15),
parquet_format_version: Some("2.12.0".to_string()),
in_commit_timestamp_enablement_timestamp: Some(1_612_345_678),
unknown_properties: HashMap::new(),
};
assert_eq!(actual, expected);
}
}