use std::collections::HashMap;
use std::num::NonZero;
use std::time::Duration;
use crate::expressions::ColumnName;
use crate::table_features::ColumnMappingMode;
use crate::Error;
use strum::EnumString;
mod deserialize;
pub use deserialize::ParseIntervalError;
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct TableProperties {
pub append_only: Option<bool>,
pub auto_compact: Option<bool>,
pub optimize_write: Option<bool>,
pub checkpoint_interval: Option<NonZero<u64>>,
pub checkpoint_write_stats_as_json: Option<bool>,
pub checkpoint_write_stats_as_struct: Option<bool>,
pub column_mapping_mode: Option<ColumnMappingMode>,
pub data_skipping_num_indexed_cols: Option<DataSkippingNumIndexedCols>,
pub data_skipping_stats_columns: Option<Vec<ColumnName>>,
pub deleted_file_retention_duration: Option<Duration>,
pub enable_change_data_feed: Option<bool>,
pub enable_deletion_vectors: Option<bool>,
pub isolation_level: Option<IsolationLevel>,
pub log_retention_duration: Option<Duration>,
pub enable_expired_log_cleanup: Option<bool>,
pub randomize_file_prefixes: Option<bool>,
pub random_prefix_length: Option<NonZero<u64>>,
pub set_transaction_retention_duration: Option<Duration>,
pub target_file_size: Option<NonZero<u64>>,
pub tune_file_sizes_for_rewrites: Option<bool>,
pub checkpoint_policy: Option<CheckpointPolicy>,
pub enable_row_tracking: Option<bool>,
pub unknown_properties: HashMap<String, String>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum DataSkippingNumIndexedCols {
AllColumns,
NumColumns(u64),
}
impl TryFrom<&str> for DataSkippingNumIndexedCols {
type Error = Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let num: i64 = value.parse().map_err(|_| {
Error::generic("couldn't parse DataSkippingNumIndexedCols to an integer")
})?;
match num {
-1 => Ok(DataSkippingNumIndexedCols::AllColumns),
x => Ok(DataSkippingNumIndexedCols::NumColumns(
x.try_into().map_err(|_| {
Error::generic("couldn't parse DataSkippingNumIndexedCols to positive integer")
})?,
)),
}
}
}
#[derive(Debug, EnumString, Default, Copy, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
pub enum IsolationLevel {
#[default]
Serializable,
WriteSerializable,
SnapshotIsolation,
}
#[derive(Debug, EnumString, Default, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
pub enum CheckpointPolicy {
#[default]
Classic,
V2,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::column_name;
use std::collections::HashMap;
#[test]
fn known_key_unknown_val() {
let properties = HashMap::from([("delta.appendOnly".to_string(), "wack".to_string())]);
let table_properties = TableProperties::from(properties.iter());
let unknown_properties =
HashMap::from([("delta.appendOnly".to_string(), "wack".to_string())]);
let expected = TableProperties {
unknown_properties,
..Default::default()
};
assert_eq!(table_properties, expected);
}
#[test]
fn allow_unknown_keys() {
let properties = [("unknown_properties".to_string(), "two words".to_string())];
let actual = TableProperties::from(properties.clone().into_iter());
let expected = TableProperties {
unknown_properties: HashMap::from(properties),
..Default::default()
};
assert_eq!(actual, expected);
}
#[test]
fn test_empty_table_properties() {
let map: HashMap<String, String> = HashMap::new();
let actual = TableProperties::from(map.iter());
let default_table_properties = TableProperties::default();
assert_eq!(actual, default_table_properties);
}
#[test]
fn test_parse_table_properties() {
let properties = [
("delta.appendOnly", "true"),
("delta.autoOptimize.optimizeWrite", "true"),
("delta.autoOptimize.autoCompact", "true"),
("delta.checkpointInterval", "101"),
("delta.checkpoint.writeStatsAsJson", "true"),
("delta.checkpoint.writeStatsAsStruct", "true"),
("delta.columnMapping.mode", "id"),
("delta.dataSkippingNumIndexedCols", "-1"),
("delta.dataSkippingStatsColumns", "col1,col2"),
("delta.deletedFileRetentionDuration", "interval 1 second"),
("delta.enableChangeDataFeed", "true"),
("delta.enableDeletionVectors", "true"),
("delta.isolationLevel", "snapshotIsolation"),
("delta.logRetentionDuration", "interval 2 seconds"),
("delta.enableExpiredLogCleanup", "true"),
("delta.randomizeFilePrefixes", "true"),
("delta.randomPrefixLength", "1001"),
(
"delta.setTransactionRetentionDuration",
"interval 60 seconds",
),
("delta.targetFileSize", "1000000000"),
("delta.tuneFileSizesForRewrites", "true"),
("delta.checkpointPolicy", "v2"),
("delta.enableRowTracking", "true"),
];
let actual = TableProperties::from(properties.into_iter());
let expected = TableProperties {
append_only: Some(true),
optimize_write: Some(true),
auto_compact: Some(true),
checkpoint_interval: Some(NonZero::new(101).unwrap()),
checkpoint_write_stats_as_json: Some(true),
checkpoint_write_stats_as_struct: Some(true),
column_mapping_mode: Some(ColumnMappingMode::Id),
data_skipping_num_indexed_cols: Some(DataSkippingNumIndexedCols::AllColumns),
data_skipping_stats_columns: Some(vec![column_name!("col1"), column_name!("col2")]),
deleted_file_retention_duration: Some(Duration::new(1, 0)),
enable_change_data_feed: Some(true),
enable_deletion_vectors: Some(true),
isolation_level: Some(IsolationLevel::SnapshotIsolation),
log_retention_duration: Some(Duration::new(2, 0)),
enable_expired_log_cleanup: Some(true),
randomize_file_prefixes: Some(true),
random_prefix_length: Some(NonZero::new(1001).unwrap()),
set_transaction_retention_duration: Some(Duration::new(60, 0)),
target_file_size: Some(NonZero::new(1_000_000_000).unwrap()),
tune_file_sizes_for_rewrites: Some(true),
checkpoint_policy: Some(CheckpointPolicy::V2),
enable_row_tracking: Some(true),
unknown_properties: HashMap::new(),
};
assert_eq!(actual, expected);
}
}