use std::collections::HashMap;
use std::sync::Arc;
use rand::Rng;
use url::Url;
use crate::actions::deletion_vector::DeletionVectorPath;
use crate::expressions::{ColumnName, ExpressionRef};
use crate::partition::hive::build_partition_path;
use crate::schema::SchemaRef;
use crate::table_features::ColumnMappingMode;
#[derive(Debug)]
pub(super) struct SharedWriteState {
pub(super) table_root: Url,
pub(super) logical_schema: SchemaRef,
pub(super) physical_schema: SchemaRef,
pub(super) logical_to_physical: ExpressionRef,
pub(super) column_mapping_mode: ColumnMappingMode,
pub(super) stats_columns: Vec<ColumnName>,
pub(super) logical_partition_columns: Vec<String>,
}
#[derive(Debug)]
pub struct WriteContext {
pub(super) shared: Arc<SharedWriteState>,
pub(super) physical_partition_values: HashMap<String, Option<String>>,
}
impl WriteContext {
pub fn table_root_dir(&self) -> &Url {
&self.shared.table_root
}
pub fn write_dir(&self) -> Url {
let mut url = self.shared.table_root.clone();
match self.shared.column_mapping_mode {
ColumnMappingMode::None => {
if !self.shared.logical_partition_columns.is_empty() {
let path_suffix = self.hive_partition_path_suffix();
url.set_path(&format!("{}{}", url.path(), path_suffix));
}
}
ColumnMappingMode::Id | ColumnMappingMode::Name => {
let prefix = random_alphanumeric_prefix();
url.set_path(&format!("{}{}/", url.path(), prefix));
}
}
url
}
pub fn logical_schema(&self) -> &SchemaRef {
&self.shared.logical_schema
}
pub fn physical_schema(&self) -> &SchemaRef {
&self.shared.physical_schema
}
pub fn logical_to_physical(&self) -> ExpressionRef {
self.shared.logical_to_physical.clone()
}
pub fn column_mapping_mode(&self) -> ColumnMappingMode {
self.shared.column_mapping_mode
}
pub fn stats_columns(&self) -> &[ColumnName] {
&self.shared.stats_columns
}
pub fn physical_partition_values(&self) -> &HashMap<String, Option<String>> {
&self.physical_partition_values
}
fn hive_partition_path_suffix(&self) -> String {
debug_assert!(
self.shared.column_mapping_mode == ColumnMappingMode::None,
"Hive-style paths should only be used when column mapping is OFF"
);
let columns: Vec<(&str, Option<&str>)> = self
.shared
.logical_partition_columns
.iter()
.map(|logical_name| {
let value = self
.physical_partition_values
.get(logical_name.as_str())
.and_then(|v| v.as_deref());
(logical_name.as_str(), value)
})
.collect();
build_partition_path(&columns)
}
pub fn new_deletion_vector_path(&self, random_prefix: String) -> DeletionVectorPath {
DeletionVectorPath::new(self.shared.table_root.clone(), random_prefix)
}
}
fn random_alphanumeric_prefix() -> String {
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let mut rng = rand::rng();
(0..2)
.map(|_| CHARSET[rng.random_range(0..CHARSET.len())] as char)
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::Expression;
use std::collections::HashSet;
use std::sync::Arc;
use rstest::rstest;
use crate::schema::{DataType, StructField, StructType};
fn make_write_context(
cm_mode: ColumnMappingMode,
partition_columns: Vec<String>,
partition_values: HashMap<String, Option<String>>,
) -> WriteContext {
let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
"value",
DataType::INTEGER,
)]));
let shared = Arc::new(SharedWriteState {
table_root: Url::parse("s3://bucket/table/").unwrap(),
logical_schema: schema.clone(),
physical_schema: schema.clone(),
logical_to_physical: Arc::new(Expression::literal(true)),
column_mapping_mode: cm_mode,
stats_columns: vec![],
logical_partition_columns: partition_columns,
});
WriteContext {
shared,
physical_partition_values: partition_values,
}
}
#[rstest]
fn test_write_dir_structure(
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
#[values(true, false)] is_partitioned: bool,
) {
let (cols, pvs) = if is_partitioned {
(
vec!["year".into(), "month".into()],
HashMap::from([
("year".into(), Some("2024".into())),
("month".into(), Some("03".into())),
]),
)
} else {
(vec![], HashMap::new())
};
let wc = make_write_context(cm_mode, cols, pvs);
let path = wc.write_dir().path().to_string();
match cm_mode {
ColumnMappingMode::None if !is_partitioned => {
assert_eq!(
path, "/table/",
"CM off, unpartitioned: should be table root"
);
}
ColumnMappingMode::None => {
assert_eq!(
path, "/table/year=2024/month=03/",
"CM off, partitioned: full Hive-style path"
);
}
ColumnMappingMode::Name | ColumnMappingMode::Id => {
assert!(
!path.contains("year="),
"CM on: should NOT contain Hive-style dirs, got: {path}"
);
let prefix_dir = path
.strip_prefix("/table/")
.unwrap()
.strip_suffix('/')
.unwrap();
assert_eq!(
prefix_dir.len(),
2,
"expected 2-char prefix, got: {prefix_dir}"
);
assert!(
prefix_dir.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be alphanumeric, got: {prefix_dir}"
);
}
}
}
#[test]
fn test_write_dir_cm_on_generates_different_prefixes_per_call() {
let wc = make_write_context(ColumnMappingMode::Name, vec![], HashMap::new());
let dirs: Vec<String> = (0..20).map(|_| wc.write_dir().path().to_string()).collect();
let unique: HashSet<_> = dirs.iter().collect();
assert!(
unique.len() > 1,
"20 calls should produce at least 2 distinct prefixes"
);
}
#[test]
fn test_write_dir_cm_off_partitioned_null_value_uses_hive_default() {
let wc = make_write_context(
ColumnMappingMode::None,
vec!["region".into()],
HashMap::from([("region".into(), None)]),
);
let path = wc.write_dir().path().to_string();
assert!(
path.contains("__HIVE_DEFAULT_PARTITION__"),
"null partition value should use HIVE_DEFAULT_PARTITION, got: {path}"
);
}
#[test]
fn test_random_alphanumeric_prefix_format() {
for _ in 0..100 {
let prefix = random_alphanumeric_prefix();
assert_eq!(prefix.len(), 2, "prefix should be exactly 2 chars");
assert!(
prefix.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be alphanumeric, got: {prefix}"
);
}
}
}