use std::collections::HashMap;
fn parse_property<T: std::str::FromStr>(
properties: &HashMap<String, String>,
key: &str,
default: T,
) -> Result<T, anyhow::Error>
where
<T as std::str::FromStr>::Err: std::fmt::Display,
{
properties.get(key).map_or(Ok(default), |value| {
value
.parse::<T>()
.map_err(|e| anyhow::anyhow!("Invalid value for {key}: {e}"))
})
}
#[derive(Debug)]
pub struct TableProperties {
pub commit_num_retries: usize,
pub commit_min_retry_wait_ms: u64,
pub commit_max_retry_wait_ms: u64,
pub commit_total_retry_timeout_ms: u64,
pub write_format_default: String,
pub write_target_file_size_bytes: usize,
pub write_datafusion_fanout_enabled: bool,
}
impl TableProperties {
pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
pub const PROPERTY_UUID: &str = "uuid";
pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
"write.metadata.previous-versions-max";
pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
pub const RESERVED_PROPERTIES: [&str; 9] = [
Self::PROPERTY_FORMAT_VERSION,
Self::PROPERTY_UUID,
Self::PROPERTY_SNAPSHOT_COUNT,
Self::PROPERTY_CURRENT_SNAPSHOT_ID,
Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
Self::PROPERTY_CURRENT_SCHEMA,
Self::PROPERTY_DEFAULT_PARTITION_SPEC,
Self::PROPERTY_DEFAULT_SORT_ORDER,
];
pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000;
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000;
pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
}
impl TryFrom<&HashMap<String, String>> for TableProperties {
type Error = anyhow::Error;
fn try_from(props: &HashMap<String, String>) -> Result<Self, Self::Error> {
Ok(TableProperties {
commit_num_retries: parse_property(
props,
TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
)?,
commit_min_retry_wait_ms: parse_property(
props,
TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
)?,
commit_max_retry_wait_ms: parse_property(
props,
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
)?,
commit_total_retry_timeout_ms: parse_property(
props,
TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
)?,
write_format_default: parse_property(
props,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
)?,
write_target_file_size_bytes: parse_property(
props,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)?,
write_datafusion_fanout_enabled: parse_property(
props,
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
)?,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_table_properties_default() {
let props = HashMap::new();
let table_properties = TableProperties::try_from(&props).unwrap();
assert_eq!(
table_properties.commit_num_retries,
TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
);
assert_eq!(
table_properties.commit_min_retry_wait_ms,
TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
);
assert_eq!(
table_properties.commit_max_retry_wait_ms,
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
);
assert_eq!(
table_properties.write_format_default,
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
);
assert_eq!(
table_properties.write_target_file_size_bytes,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
);
}
#[test]
fn test_table_properties_valid() {
let props = HashMap::from([
(
TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
"10".to_string(),
),
(
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
"20".to_string(),
),
(
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
"avro".to_string(),
),
(
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
"512".to_string(),
),
]);
let table_properties = TableProperties::try_from(&props).unwrap();
assert_eq!(table_properties.commit_num_retries, 10);
assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
assert_eq!(table_properties.write_format_default, "avro".to_string());
assert_eq!(table_properties.write_target_file_size_bytes, 512);
}
#[test]
fn test_table_properties_invalid() {
let invalid_retries = HashMap::from([(
TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
"abc".to_string(),
)]);
let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
assert!(
table_properties.to_string().contains(
"Invalid value for commit.retry.num-retries: invalid digit found in string"
)
);
let invalid_min_wait = HashMap::from([(
TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
"abc".to_string(),
)]);
let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
assert!(
table_properties.to_string().contains(
"Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
)
);
let invalid_max_wait = HashMap::from([(
TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
"abc".to_string(),
)]);
let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
assert!(
table_properties.to_string().contains(
"Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
)
);
let invalid_target_size = HashMap::from([(
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
"abc".to_string(),
)]);
let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
assert!(table_properties.to_string().contains(
"Invalid value for write.target-file-size-bytes: invalid digit found in string"
));
}
}