use std::collections::HashMap;
const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled";
const GLOBAL_INDEX_ENABLED_OPTION: &str = "global-index.enabled";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
const BUCKET_KEY_OPTION: &str = "bucket-key";
const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
const BUCKET_OPTION: &str = "bucket";
const DEFAULT_BUCKET: i32 = -1;
const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait";
const FILE_COMPRESSION_OPTION: &str = "file.compression";
const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level";
const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10;
const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000;
const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000;
const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000;
pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
pub const SCAN_VERSION_OPTION: &str = "scan.version";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
#[derive(Debug, Clone, Copy)]
pub struct CoreOptions<'a> {
options: &'a HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum TimeTravelSelector<'a> {
TimestampMillis(i64),
Version(&'a str),
}
impl<'a> CoreOptions<'a> {
pub fn new(options: &'a HashMap<String, String>) -> Self {
Self { options }
}
pub fn deletion_vectors_enabled(&self) -> bool {
self.options
.get(DELETION_VECTORS_ENABLED_OPTION)
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
pub fn data_evolution_enabled(&self) -> bool {
self.options
.get(DATA_EVOLUTION_ENABLED_OPTION)
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
pub fn global_index_enabled(&self) -> bool {
self.options
.get(GLOBAL_INDEX_ENABLED_OPTION)
.map(|value| value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
pub fn source_split_target_size(&self) -> i64 {
self.options
.get(SOURCE_SPLIT_TARGET_SIZE_OPTION)
.and_then(|value| parse_memory_size(value))
.unwrap_or(DEFAULT_SOURCE_SPLIT_TARGET_SIZE)
}
pub fn source_split_open_file_cost(&self) -> i64 {
self.options
.get(SOURCE_SPLIT_OPEN_FILE_COST_OPTION)
.and_then(|value| parse_memory_size(value))
.unwrap_or(DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST)
}
pub fn partition_default_name(&self) -> &str {
self.options
.get(PARTITION_DEFAULT_NAME_OPTION)
.map(String::as_str)
.unwrap_or(DEFAULT_PARTITION_DEFAULT_NAME)
}
pub fn legacy_partition_name(&self) -> bool {
self.options
.get(PARTITION_LEGACY_NAME_OPTION)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(true)
}
fn parse_i64_option(&self, option_name: &'static str) -> crate::Result<Option<i64>> {
match self.options.get(option_name) {
Some(value) => value
.parse::<i64>()
.map(Some)
.map_err(|e| crate::Error::DataInvalid {
message: format!("Invalid value for {option_name}: '{value}'"),
source: Some(Box::new(e)),
}),
None => Ok(None),
}
}
pub fn scan_timestamp_millis(&self) -> Option<i64> {
self.options
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
.and_then(|v| v.parse().ok())
}
fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
let mut selectors = Vec::with_capacity(2);
if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
}
if self.options.contains_key(SCAN_VERSION_OPTION) {
selectors.push(SCAN_VERSION_OPTION);
}
selectors
}
pub(crate) fn try_time_travel_selector(&self) -> crate::Result<Option<TimeTravelSelector<'a>>> {
let selectors = self.configured_time_travel_selectors();
if selectors.len() > 1 {
return Err(crate::Error::DataInvalid {
message: format!(
"Only one time-travel selector may be set, found: {}",
selectors.join(", ")
),
source: None,
});
}
if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
} else if let Some(version) = self.options.get(SCAN_VERSION_OPTION).map(String::as_str) {
Ok(Some(TimeTravelSelector::Version(version)))
} else {
Ok(None)
}
}
pub fn bucket_key(&self) -> Option<Vec<String>> {
self.options
.get(BUCKET_KEY_OPTION)
.map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
}
pub fn commit_max_retries(&self) -> u32 {
self.options
.get(COMMIT_MAX_RETRIES_OPTION)
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_COMMIT_MAX_RETRIES)
}
pub fn commit_timeout_ms(&self) -> u64 {
self.options
.get(COMMIT_TIMEOUT_OPTION)
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_COMMIT_TIMEOUT_MS)
}
pub fn commit_min_retry_wait_ms(&self) -> u64 {
self.options
.get(COMMIT_MIN_RETRY_WAIT_OPTION)
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_COMMIT_MIN_RETRY_WAIT_MS)
}
pub fn commit_max_retry_wait_ms(&self) -> u64 {
self.options
.get(COMMIT_MAX_RETRY_WAIT_OPTION)
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_COMMIT_MAX_RETRY_WAIT_MS)
}
pub fn row_tracking_enabled(&self) -> bool {
self.options
.get(ROW_TRACKING_ENABLED_OPTION)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
pub fn bucket(&self) -> i32 {
self.options
.get(BUCKET_OPTION)
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_BUCKET)
}
pub fn is_default_bucket_function(&self) -> bool {
self.options
.get(BUCKET_FUNCTION_TYPE_OPTION)
.map(|v| v.eq_ignore_ascii_case("default"))
.unwrap_or(true)
}
pub fn target_file_size(&self) -> i64 {
self.options
.get("target-file-size")
.and_then(|v| parse_memory_size(v))
.unwrap_or(DEFAULT_TARGET_FILE_SIZE)
}
pub fn file_compression(&self) -> &str {
self.options
.get(FILE_COMPRESSION_OPTION)
.map(String::as_str)
.unwrap_or("zstd")
}
pub fn file_compression_zstd_level(&self) -> i32 {
self.options
.get(FILE_COMPRESSION_ZSTD_LEVEL_OPTION)
.and_then(|v| v.parse().ok())
.unwrap_or(1)
}
pub fn write_parquet_buffer_size(&self) -> i64 {
self.options
.get(WRITE_PARQUET_BUFFER_SIZE_OPTION)
.and_then(|v| parse_memory_size(v))
.unwrap_or(DEFAULT_WRITE_PARQUET_BUFFER_SIZE)
}
}
fn parse_memory_size(value: &str) -> Option<i64> {
let value = value.trim();
if value.is_empty() {
return None;
}
let pos = value
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(value.len());
let (num_str, unit_str) = value.split_at(pos);
let num: i64 = num_str.trim().parse().ok()?;
let multiplier = match unit_str.trim().to_ascii_lowercase().as_str() {
"" | "b" => 1,
"kb" | "k" => 1024,
"mb" | "m" => 1024 * 1024,
"gb" | "g" => 1024 * 1024 * 1024,
"tb" | "t" => 1024 * 1024 * 1024 * 1024,
_ => return None,
};
Some(num * multiplier)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_source_split_defaults() {
let options = HashMap::new();
let core_options = CoreOptions::new(&options);
assert_eq!(core_options.source_split_target_size(), 128 * 1024 * 1024);
assert_eq!(core_options.source_split_open_file_cost(), 4 * 1024 * 1024);
}
#[test]
fn test_source_split_custom_values() {
let options = HashMap::from([
(
SOURCE_SPLIT_TARGET_SIZE_OPTION.to_string(),
"256 mb".to_string(),
),
(
SOURCE_SPLIT_OPEN_FILE_COST_OPTION.to_string(),
"8 mb".to_string(),
),
]);
let core_options = CoreOptions::new(&options);
assert_eq!(core_options.source_split_target_size(), 256 * 1024 * 1024);
assert_eq!(core_options.source_split_open_file_cost(), 8 * 1024 * 1024);
}
#[test]
fn test_parse_memory_size() {
assert_eq!(parse_memory_size("1024"), Some(1024));
assert_eq!(parse_memory_size("128 mb"), Some(128 * 1024 * 1024));
assert_eq!(parse_memory_size("128mb"), Some(128 * 1024 * 1024));
assert_eq!(parse_memory_size("4MB"), Some(4 * 1024 * 1024));
assert_eq!(parse_memory_size("1 gb"), Some(1024 * 1024 * 1024));
assert_eq!(parse_memory_size("1024 kb"), Some(1024 * 1024));
assert_eq!(parse_memory_size("100 b"), Some(100));
assert_eq!(parse_memory_size(""), None);
assert_eq!(parse_memory_size("abc"), None);
}
#[test]
fn test_partition_options_defaults() {
let options = HashMap::new();
let core = CoreOptions::new(&options);
assert_eq!(core.partition_default_name(), "__DEFAULT_PARTITION__");
assert!(core.legacy_partition_name());
}
#[test]
fn test_partition_options_custom() {
let options = HashMap::from([
(
PARTITION_DEFAULT_NAME_OPTION.to_string(),
"NULL_PART".to_string(),
),
(
PARTITION_LEGACY_NAME_OPTION.to_string(),
"false".to_string(),
),
]);
let core = CoreOptions::new(&options);
assert_eq!(core.partition_default_name(), "NULL_PART");
assert!(!core.legacy_partition_name());
}
#[test]
fn test_try_time_travel_selector_rejects_conflicting_selectors() {
let options = HashMap::from([
(SCAN_VERSION_OPTION.to_string(), "tag1".to_string()),
(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string()),
]);
let core = CoreOptions::new(&options);
let err = core
.try_time_travel_selector()
.expect_err("conflicting selectors should fail");
match err {
crate::Error::DataInvalid { message, .. } => {
assert!(message.contains("Only one time-travel selector may be set"));
assert!(message.contains(SCAN_VERSION_OPTION));
assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
let timestamp_options =
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "xyz".to_string())]);
let timestamp_core = CoreOptions::new(×tamp_options);
let timestamp_err = timestamp_core
.try_time_travel_selector()
.expect_err("invalid timestamp millis should fail");
match timestamp_err {
crate::Error::DataInvalid { message, .. } => {
assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
}
other => panic!("unexpected error: {other:?}"),
}
}
#[test]
fn test_commit_options_defaults() {
let options = HashMap::new();
let core = CoreOptions::new(&options);
assert_eq!(core.bucket(), -1);
assert_eq!(core.commit_max_retries(), 10);
assert_eq!(core.commit_timeout_ms(), 120_000);
assert_eq!(core.commit_min_retry_wait_ms(), 1_000);
assert_eq!(core.commit_max_retry_wait_ms(), 10_000);
assert!(!core.row_tracking_enabled());
}
#[test]
fn test_commit_options_custom() {
let options = HashMap::from([
(BUCKET_OPTION.to_string(), "4".to_string()),
(COMMIT_MAX_RETRIES_OPTION.to_string(), "20".to_string()),
(COMMIT_TIMEOUT_OPTION.to_string(), "60000".to_string()),
(COMMIT_MIN_RETRY_WAIT_OPTION.to_string(), "500".to_string()),
(COMMIT_MAX_RETRY_WAIT_OPTION.to_string(), "5000".to_string()),
(ROW_TRACKING_ENABLED_OPTION.to_string(), "true".to_string()),
]);
let core = CoreOptions::new(&options);
assert_eq!(core.bucket(), 4);
assert_eq!(core.commit_max_retries(), 20);
assert_eq!(core.commit_timeout_ms(), 60_000);
assert_eq!(core.commit_min_retry_wait_ms(), 500);
assert_eq!(core.commit_max_retry_wait_ms(), 5_000);
assert!(core.row_tracking_enabled());
}
#[test]
fn test_try_time_travel_selector_normalizes_valid_selector() {
let timestamp_options =
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string())]);
let timestamp_core = CoreOptions::new(×tamp_options);
assert_eq!(
timestamp_core
.try_time_travel_selector()
.expect("timestamp selector"),
Some(TimeTravelSelector::TimestampMillis(1234))
);
let version_options =
HashMap::from([(SCAN_VERSION_OPTION.to_string(), "my-tag".to_string())]);
let version_core = CoreOptions::new(&version_options);
assert_eq!(
version_core
.try_time_travel_selector()
.expect("version selector"),
Some(TimeTravelSelector::Version("my-tag"))
);
let version_num_options =
HashMap::from([(SCAN_VERSION_OPTION.to_string(), "3".to_string())]);
let version_num_core = CoreOptions::new(&version_num_options);
assert_eq!(
version_num_core
.try_time_travel_selector()
.expect("version numeric selector"),
Some(TimeTravelSelector::Version("3"))
);
}
#[test]
fn test_write_options_defaults() {
let options = HashMap::new();
let core = CoreOptions::new(&options);
assert_eq!(core.write_parquet_buffer_size(), 256 * 1024 * 1024);
}
#[test]
fn test_write_options_custom() {
let options = HashMap::from([(
WRITE_PARQUET_BUFFER_SIZE_OPTION.to_string(),
"32mb".to_string(),
)]);
let core = CoreOptions::new(&options);
assert_eq!(core.write_parquet_buffer_size(), 32 * 1024 * 1024);
}
}