use std::num::NonZero;
use std::time::Duration;
use tracing::warn;
use super::*;
use crate::expressions::ColumnName;
use crate::table_features::ColumnMappingMode;
use crate::utils::require;
const SECONDS_PER_MINUTE: u64 = 60;
const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE;
const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR;
const SECONDS_PER_WEEK: u64 = 7 * SECONDS_PER_DAY;
impl<K, V, I> From<I> for TableProperties
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str> + Into<String>,
V: AsRef<str> + Into<String>,
{
fn from(unparsed: I) -> Self {
let mut props = TableProperties::default();
let unparsed = unparsed.into_iter().filter(|(k, v)| {
try_parse(&mut props, k.as_ref(), v.as_ref()).is_none()
});
props.unknown_properties = unparsed.map(|(k, v)| (k.into(), v.into())).collect();
props
}
}
fn try_parse(props: &mut TableProperties, k: &str, v: &str) -> Option<()> {
match k {
APPEND_ONLY => props.append_only = Some(parse_bool(v)?),
AUTO_COMPACT => props.auto_compact = Some(parse_bool(v)?),
OPTIMIZE_WRITE => props.optimize_write = Some(parse_bool(v)?),
CHECKPOINT_INTERVAL => props.checkpoint_interval = Some(parse_positive_int(v)?),
CHECKPOINT_WRITE_STATS_AS_JSON => {
props.checkpoint_write_stats_as_json = Some(parse_bool(v)?)
}
CHECKPOINT_WRITE_STATS_AS_STRUCT => {
props.checkpoint_write_stats_as_struct = Some(parse_bool(v)?)
}
COLUMN_MAPPING_MODE => props.column_mapping_mode = ColumnMappingMode::try_from(v).ok(),
DATA_SKIPPING_NUM_INDEXED_COLS => {
props.data_skipping_num_indexed_cols = DataSkippingNumIndexedCols::try_from(v).ok()
}
DATA_SKIPPING_STATS_COLUMNS => {
props.data_skipping_stats_columns = Some(parse_column_names(v)?)
}
DELETED_FILE_RETENTION_DURATION => {
props.deleted_file_retention_duration = Some(parse_interval(v)?)
}
ENABLE_CHANGE_DATA_FEED => props.enable_change_data_feed = Some(parse_bool(v)?),
ENABLE_DELETION_VECTORS => props.enable_deletion_vectors = Some(parse_bool(v)?),
ENABLE_TYPE_WIDENING => props.enable_type_widening = Some(parse_bool(v)?),
ENABLE_ICEBERG_COMPAT_V1 => props.enable_iceberg_compat_v1 = Some(parse_bool(v)?),
ENABLE_ICEBERG_COMPAT_V2 => props.enable_iceberg_compat_v2 = Some(parse_bool(v)?),
ISOLATION_LEVEL => props.isolation_level = IsolationLevel::try_from(v).ok(),
LOG_RETENTION_DURATION => props.log_retention_duration = Some(parse_interval(v)?),
ENABLE_EXPIRED_LOG_CLEANUP => props.enable_expired_log_cleanup = Some(parse_bool(v)?),
RANDOMIZE_FILE_PREFIXES => props.randomize_file_prefixes = Some(parse_bool(v)?),
RANDOM_PREFIX_LENGTH => props.random_prefix_length = Some(parse_positive_int(v)?),
SET_TRANSACTION_RETENTION_DURATION => {
props.set_transaction_retention_duration = Some(parse_interval(v)?)
}
TARGET_FILE_SIZE => props.target_file_size = Some(parse_positive_int(v)?),
TUNE_FILE_SIZES_FOR_REWRITES => props.tune_file_sizes_for_rewrites = Some(parse_bool(v)?),
CHECKPOINT_POLICY => props.checkpoint_policy = CheckpointPolicy::try_from(v).ok(),
ENABLE_ROW_TRACKING => props.enable_row_tracking = Some(parse_bool(v)?),
MATERIALIZED_ROW_ID_COLUMN_NAME => {
props.materialized_row_id_column_name = Some(v.to_string())
}
MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME => {
props.materialized_row_commit_version_column_name = Some(v.to_string())
}
ROW_TRACKING_SUSPENDED => props.row_tracking_suspended = Some(parse_bool(v)?),
PARQUET_FORMAT_VERSION => props.parquet_format_version = Some(v.to_string()),
ENABLE_IN_COMMIT_TIMESTAMPS => props.enable_in_commit_timestamps = Some(parse_bool(v)?),
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION => {
props.in_commit_timestamp_enablement_version = Some(parse_non_negative(v)?)
}
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP => {
props.in_commit_timestamp_enablement_timestamp = Some(parse_non_negative(v)?)
}
_ => return None,
}
Some(())
}
pub(crate) fn parse_positive_int(s: &str) -> Option<NonZero<u64>> {
NonZero::new(parse_non_negative(s)?)
}
pub(crate) fn parse_non_negative<T>(s: &str) -> Option<T>
where
i64: TryInto<T>,
{
let n: i64 = s.parse().ok().filter(|&i| i >= 0)?;
n.try_into().ok()
}
pub(crate) fn parse_bool(s: &str) -> Option<bool> {
match s {
"true" => Some(true),
"false" => Some(false),
_ => None,
}
}
pub(crate) fn parse_column_names(s: &str) -> Option<Vec<ColumnName>> {
ColumnName::parse_column_name_list(s)
.inspect_err(|e| warn!("column name list failed to parse: {e}"))
.ok()
}
pub(crate) fn parse_interval(s: &str) -> Option<Duration> {
parse_interval_impl(s).ok()
}
#[derive(thiserror::Error, Debug)]
pub enum ParseIntervalError {
#[error("'{0}' is not an interval")]
NotAnInterval(String),
#[error("Unable to parse '{0}' as an integer")]
ParseIntError(String),
#[error("Interval '{0}' cannot be negative")]
NegativeInterval(String),
#[error("Unsupported interval '{0}'")]
UnsupportedInterval(String),
#[error("Unknown interval unit '{0}'")]
UnknownUnit(String),
}
fn parse_interval_impl(value: &str) -> Result<Duration, ParseIntervalError> {
let mut it = value.split_whitespace();
if it.next() != Some("interval") {
return Err(ParseIntervalError::NotAnInterval(value.to_string()));
}
let number = it
.next()
.ok_or_else(|| ParseIntervalError::NotAnInterval(value.to_string()))?;
let number: i64 = number
.parse()
.map_err(|_| ParseIntervalError::ParseIntError(number.into()))?;
require!(
number >= 0,
ParseIntervalError::NegativeInterval(value.to_string())
);
let number = number as u64;
let duration = match it
.next()
.ok_or_else(|| ParseIntervalError::NotAnInterval(value.to_string()))?
{
"nanosecond" | "nanoseconds" => Duration::from_nanos(number),
"microsecond" | "microseconds" => Duration::from_micros(number),
"millisecond" | "milliseconds" => Duration::from_millis(number),
"second" | "seconds" => Duration::from_secs(number),
"minute" | "minutes" => Duration::from_secs(number * SECONDS_PER_MINUTE),
"hour" | "hours" => Duration::from_secs(number * SECONDS_PER_HOUR),
"day" | "days" => Duration::from_secs(number * SECONDS_PER_DAY),
"week" | "weeks" => Duration::from_secs(number * SECONDS_PER_WEEK),
unit @ ("month" | "months") => {
return Err(ParseIntervalError::UnsupportedInterval(unit.to_string()));
}
unit => {
return Err(ParseIntervalError::UnknownUnit(unit.to_string()));
}
};
Ok(duration)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_bool() {
assert!(parse_bool("true").unwrap());
assert!(!parse_bool("false").unwrap());
assert_eq!(parse_bool("whatever"), None);
}
#[test]
fn test_parse_int() {
assert_eq!(parse_positive_int("12").unwrap().get(), 12);
assert_eq!(parse_positive_int("0"), None);
assert_eq!(parse_positive_int("-12"), None);
assert_eq!(parse_non_negative::<u64>("12").unwrap(), 12);
assert_eq!(parse_non_negative::<u64>("0").unwrap(), 0);
assert_eq!(parse_non_negative::<u64>("-12"), None);
assert_eq!(parse_non_negative::<i64>("12").unwrap(), 12);
assert_eq!(parse_non_negative::<i64>("0").unwrap(), 0);
assert_eq!(parse_non_negative::<i64>("-12"), None);
}
#[test]
fn test_parse_interval() {
assert_eq!(
parse_interval("interval 123 nanosecond").unwrap(),
Duration::from_nanos(123)
);
assert_eq!(
parse_interval("interval 123 nanoseconds").unwrap(),
Duration::from_nanos(123)
);
assert_eq!(
parse_interval("interval 123 microsecond").unwrap(),
Duration::from_micros(123)
);
assert_eq!(
parse_interval("interval 123 microseconds").unwrap(),
Duration::from_micros(123)
);
assert_eq!(
parse_interval("interval 123 millisecond").unwrap(),
Duration::from_millis(123)
);
assert_eq!(
parse_interval("interval 123 milliseconds").unwrap(),
Duration::from_millis(123)
);
assert_eq!(
parse_interval("interval 123 second").unwrap(),
Duration::from_secs(123)
);
assert_eq!(
parse_interval("interval 123 seconds").unwrap(),
Duration::from_secs(123)
);
assert_eq!(
parse_interval("interval 123 minute").unwrap(),
Duration::from_secs(123 * 60)
);
assert_eq!(
parse_interval("interval 123 minutes").unwrap(),
Duration::from_secs(123 * 60)
);
assert_eq!(
parse_interval("interval 123 hour").unwrap(),
Duration::from_secs(123 * 3600)
);
assert_eq!(
parse_interval("interval 123 hours").unwrap(),
Duration::from_secs(123 * 3600)
);
assert_eq!(
parse_interval("interval 123 day").unwrap(),
Duration::from_secs(123 * 86400)
);
assert_eq!(
parse_interval("interval 123 days").unwrap(),
Duration::from_secs(123 * 86400)
);
assert_eq!(
parse_interval("interval 123 week").unwrap(),
Duration::from_secs(123 * 604800)
);
assert_eq!(
parse_interval("interval 123 week").unwrap(),
Duration::from_secs(123 * 604800)
);
}
#[test]
fn test_invalid_parse_interval() {
assert_eq!(
parse_interval_impl("whatever").err().unwrap().to_string(),
"'whatever' is not an interval".to_string()
);
assert_eq!(
parse_interval_impl("interval").err().unwrap().to_string(),
"'interval' is not an interval".to_string()
);
assert_eq!(
parse_interval_impl("interval 2").err().unwrap().to_string(),
"'interval 2' is not an interval".to_string()
);
assert_eq!(
parse_interval_impl("interval 2 months")
.err()
.unwrap()
.to_string(),
"Unsupported interval 'months'".to_string()
);
assert_eq!(
parse_interval_impl("interval 2 years")
.err()
.unwrap()
.to_string(),
"Unknown interval unit 'years'".to_string()
);
assert_eq!(
parse_interval_impl("interval two years")
.err()
.unwrap()
.to_string(),
"Unable to parse 'two' as an integer".to_string()
);
assert_eq!(
parse_interval_impl("interval -25 hours")
.err()
.unwrap()
.to_string(),
"Interval 'interval -25 hours' cannot be negative".to_string()
);
}
}