use std::collections::HashMap;
use std::num::NonZero;
use std::sync::Arc;
use rand::Rng;
use url::Url;
use crate::actions::deletion_vector::DeletionVectorPath;
use crate::expressions::{ColumnName, ExpressionRef};
use crate::partition::hive::{build_partition_path, uri_encode_path};
use crate::schema::SchemaRef;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Error};
#[derive(Debug)]
pub(super) struct SharedWriteState {
pub(super) table_root: Url,
pub(super) logical_schema: SchemaRef,
pub(super) physical_schema: SchemaRef,
pub(super) logical_to_physical: ExpressionRef,
pub(super) column_mapping_mode: ColumnMappingMode,
pub(super) stats_columns: Vec<ColumnName>,
pub(super) logical_partition_columns: Vec<String>,
pub(super) randomize_file_prefixes: bool,
pub(super) random_prefix_length: NonZero<usize>,
}
#[derive(Debug)]
pub struct WriteContext {
pub(super) shared: Arc<SharedWriteState>,
pub(super) physical_partition_values: HashMap<String, Option<String>>,
}
impl WriteContext {
pub fn table_root_dir(&self) -> &Url {
&self.shared.table_root
}
pub fn write_dir(&self) -> Url {
let mut url = self.shared.table_root.clone();
let should_prefix = self.shared.column_mapping_mode != ColumnMappingMode::None
|| self.shared.randomize_file_prefixes;
if should_prefix {
let prefix = random_alphanumeric_prefix(self.shared.random_prefix_length);
url.set_path(&format!("{}{}/", url.path(), prefix));
} else if !self.shared.logical_partition_columns.is_empty() {
let hive_escaped = self.hive_partition_path_suffix();
let uri_encoded = uri_encode_path(&hive_escaped);
url.set_path(&format!("{}{}", url.path(), uri_encoded));
}
url
}
pub fn logical_schema(&self) -> &SchemaRef {
&self.shared.logical_schema
}
pub fn physical_schema(&self) -> &SchemaRef {
&self.shared.physical_schema
}
pub fn logical_to_physical(&self) -> ExpressionRef {
self.shared.logical_to_physical.clone()
}
pub fn column_mapping_mode(&self) -> ColumnMappingMode {
self.shared.column_mapping_mode
}
pub fn stats_columns(&self) -> &[ColumnName] {
&self.shared.stats_columns
}
pub fn physical_partition_values(&self) -> &HashMap<String, Option<String>> {
&self.physical_partition_values
}
fn hive_partition_path_suffix(&self) -> String {
debug_assert!(
self.shared.column_mapping_mode == ColumnMappingMode::None,
"Hive-style paths should only be used when column mapping is OFF"
);
let columns: Vec<(&str, Option<&str>)> = self
.shared
.logical_partition_columns
.iter()
.map(|logical_name| {
let value = self
.physical_partition_values
.get(logical_name.as_str())
.and_then(|v| v.as_deref());
(logical_name.as_str(), value)
})
.collect();
build_partition_path(&columns)
}
pub fn resolve_file_path(&self, file_location: &Url) -> DeltaResult<String> {
let relative = self
.shared
.table_root
.make_relative(file_location)
.ok_or_else(|| {
Error::internal_error(format!(
"file '{}' is not under table root '{}'",
file_location, self.shared.table_root
))
})?;
if relative.starts_with("..") {
return Err(Error::internal_error(format!(
"file '{}' is not under table root '{}'",
file_location, self.shared.table_root
)));
}
Ok(relative)
}
pub fn new_deletion_vector_path(&self, random_prefix: String) -> DeletionVectorPath {
DeletionVectorPath::new(self.shared.table_root.clone(), random_prefix)
}
}
fn random_alphanumeric_prefix(len: NonZero<usize>) -> String {
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let mut rng = rand::rng();
(0..len.get())
.map(|_| CHARSET[rng.random_range(0..CHARSET.len())] as char)
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use rstest::rstest;
use super::*;
use crate::expressions::Expression;
use crate::schema::{DataType, StructField, StructType};
fn make_write_context(
cm_mode: ColumnMappingMode,
partition_columns: Vec<String>,
partition_values: HashMap<String, Option<String>>,
randomize_file_prefixes: bool,
random_prefix_length: usize,
) -> WriteContext {
let schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
"value",
DataType::INTEGER,
)]));
let shared = Arc::new(SharedWriteState {
table_root: Url::parse("s3://bucket/table/").unwrap(),
logical_schema: schema.clone(),
physical_schema: schema.clone(),
logical_to_physical: Arc::new(Expression::literal(true)),
column_mapping_mode: cm_mode,
stats_columns: vec![],
logical_partition_columns: partition_columns,
randomize_file_prefixes,
random_prefix_length: NonZero::new(random_prefix_length)
.expect("test prefix length must be > 0"),
});
WriteContext {
shared,
physical_partition_values: partition_values,
}
}
#[rstest]
fn test_write_dir_structure(
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
#[values(true, false)] is_partitioned: bool,
) {
let (cols, pvs) = if is_partitioned {
(
vec!["year".into(), "month".into()],
HashMap::from([
("year".into(), Some("2024".into())),
("month".into(), Some("03".into())),
]),
)
} else {
(vec![], HashMap::new())
};
let wc = make_write_context(cm_mode, cols, pvs, false, 2);
let path = wc.write_dir().path().to_string();
match cm_mode {
ColumnMappingMode::None if !is_partitioned => {
assert_eq!(
path, "/table/",
"CM off, unpartitioned: should be table root"
);
}
ColumnMappingMode::None => {
assert_eq!(
path, "/table/year=2024/month=03/",
"CM off, partitioned: full Hive-style path"
);
}
ColumnMappingMode::Name | ColumnMappingMode::Id => {
assert!(
!path.contains("year="),
"CM on: should NOT contain Hive-style dirs, got: {path}"
);
let prefix_dir = path
.strip_prefix("/table/")
.unwrap()
.strip_suffix('/')
.unwrap();
assert_eq!(
prefix_dir.len(),
2,
"expected 2-char prefix, got: {prefix_dir}"
);
assert!(
prefix_dir.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be alphanumeric, got: {prefix_dir}"
);
}
}
}
#[test]
fn test_write_dir_cm_on_generates_different_prefixes_per_call() {
let wc = make_write_context(ColumnMappingMode::Name, vec![], HashMap::new(), false, 2);
let dirs: Vec<String> = (0..20).map(|_| wc.write_dir().path().to_string()).collect();
let unique: HashSet<_> = dirs.iter().collect();
assert!(
unique.len() > 1,
"20 calls should produce at least 2 distinct prefixes"
);
}
#[test]
fn test_write_dir_cm_off_partitioned_null_value_uses_hive_default() {
let wc = make_write_context(
ColumnMappingMode::None,
vec!["region".into()],
HashMap::from([("region".into(), None)]),
false,
2,
);
let path = wc.write_dir().path().to_string();
assert!(
path.contains("__HIVE_DEFAULT_PARTITION__"),
"null partition value should use HIVE_DEFAULT_PARTITION, got: {path}"
);
}
#[rstest]
#[case::colon("p", "2025-03-31T15:30:00Z", "/table/p=2025-03-31T15%253A30%253A00Z/")]
#[case::slash("region", "US/East", "/table/region=US%252FEast/")]
#[case::percent_literal("col", "100%", "/table/col=100%2525/")]
fn test_write_dir_cm_off_partitioned_double_encodes_hive_output(
#[case] col: &str,
#[case] value: &str,
#[case] expected_path: &str,
) {
let wc = make_write_context(
ColumnMappingMode::None,
vec![col.into()],
HashMap::from([(col.into(), Some(value.into()))]),
false,
2,
);
assert_eq!(wc.write_dir().path(), expected_path);
}
#[rstest]
#[case::name_mode(ColumnMappingMode::Name)]
#[case::id_mode(ColumnMappingMode::Id)]
fn test_write_dir_cm_on_prefix_is_uri_safe(#[case] cm_mode: ColumnMappingMode) {
let wc = make_write_context(cm_mode, vec!["p".into()], HashMap::new(), false, 2);
let path = wc.write_dir().path().to_string();
assert!(
!path.contains('%'),
"CM-on path must not contain '%': {path}"
);
let prefix = path
.strip_prefix("/table/")
.unwrap()
.strip_suffix('/')
.unwrap();
assert!(
prefix.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be URI-safe: {prefix:?}"
);
}
#[rstest]
#[case(1)]
#[case(2)]
#[case(8)]
#[case(32)]
fn test_random_alphanumeric_prefix_format(#[case] len: usize) {
let nz_len = NonZero::new(len).unwrap();
for _ in 0..100 {
let prefix = random_alphanumeric_prefix(nz_len);
assert_eq!(prefix.len(), len, "prefix should be exactly {len} chars");
assert!(
prefix.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be alphanumeric, got: {prefix}"
);
}
}
#[rstest]
fn test_write_dir_with_randomize_property(
#[values(
ColumnMappingMode::None,
ColumnMappingMode::Name,
ColumnMappingMode::Id
)]
cm_mode: ColumnMappingMode,
#[values(true, false)] randomize: bool,
#[values(true, false)] is_partitioned: bool,
) {
let (cols, pvs) = if is_partitioned {
(
vec!["year".into()],
HashMap::from([("year".into(), Some("2024".into()))]),
)
} else {
(vec![], HashMap::new())
};
let wc = make_write_context(cm_mode, cols, pvs, randomize, 2);
let path = wc.write_dir().path().to_string();
let should_prefix = cm_mode != ColumnMappingMode::None || randomize;
if should_prefix {
assert!(
!path.contains("year="),
"random prefix should suppress Hive dirs, got: {path}"
);
let prefix = path
.strip_prefix("/table/")
.unwrap()
.strip_suffix('/')
.unwrap();
assert_eq!(prefix.len(), 2, "expected 2-char prefix, got: {prefix}");
assert!(
prefix.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be alphanumeric, got: {prefix}"
);
} else if is_partitioned {
assert_eq!(path, "/table/year=2024/");
} else {
assert_eq!(path, "/table/");
}
}
#[rstest]
#[case::cm_on_default(ColumnMappingMode::Name, false, 2)]
#[case::cm_on_short(ColumnMappingMode::Name, false, 1)]
#[case::cm_on_long(ColumnMappingMode::Name, false, 16)]
#[case::randomize_short(ColumnMappingMode::None, true, 1)]
#[case::randomize_default(ColumnMappingMode::None, true, 2)]
#[case::randomize_long(ColumnMappingMode::None, true, 16)]
fn test_write_dir_random_prefix_length_property(
#[case] cm_mode: ColumnMappingMode,
#[case] randomize: bool,
#[case] prefix_len: usize,
) {
let wc = make_write_context(cm_mode, vec![], HashMap::new(), randomize, prefix_len);
let path = wc.write_dir().path().to_string();
let prefix = path
.strip_prefix("/table/")
.unwrap()
.strip_suffix('/')
.unwrap();
assert_eq!(
prefix.len(),
prefix_len,
"expected {prefix_len}-char prefix, got: {prefix:?}"
);
assert!(
prefix.chars().all(|c| c.is_ascii_alphanumeric()),
"prefix should be alphanumeric, got: {prefix}"
);
}
#[test]
fn test_write_dir_cm_off_randomize_suppresses_hive() {
let wc = make_write_context(
ColumnMappingMode::None,
vec!["year".into(), "month".into()],
HashMap::from([
("year".into(), Some("2024".into())),
("month".into(), Some("03".into())),
]),
true,
2,
);
let path = wc.write_dir().path().to_string();
assert!(
!path.contains('='),
"randomize=true should suppress Hive dirs, got: {path}"
);
let prefix = path
.strip_prefix("/table/")
.unwrap()
.strip_suffix('/')
.unwrap();
assert_eq!(prefix.len(), 2);
}
#[rstest]
#[case::relative_bare_file("s3://bucket/table/abc.parquet", Ok("abc.parquet"))]
#[case::relative_with_subdirectory(
"s3://bucket/table/year=2024/abc.parquet",
Ok("year=2024/abc.parquet")
)]
#[case::uri_encoded_partition(
"s3://bucket/table/p=a%253Ab/uuid.parquet",
Ok("p=a%253Ab/uuid.parquet")
)]
#[case::double_percent_partition(
"s3://bucket/table/p=100%252525/uuid.parquet",
Ok("p=100%252525/uuid.parquet")
)]
#[case::multi_partition_encoded(
"s3://bucket/table/year=2025/region=US%252FEast/uuid.parquet",
Ok("year=2025/region=US%252FEast/uuid.parquet")
)]
#[case::error_different_scheme("gs://other-bucket/table/abc.parquet", Err(()))]
#[case::error_different_host("s3://other-bucket/table/abc.parquet", Err(()))]
#[case::error_outside_table_root("s3://bucket/other/abc.parquet", Err(()))]
#[test]
fn test_resolve_file_path(#[case] file_url: &str, #[case] expected: Result<&str, ()>) {
let wc = make_write_context(ColumnMappingMode::None, vec![], HashMap::new(), false, 2);
let file = Url::parse(file_url).unwrap();
match expected {
Ok(exp) => assert_eq!(wc.resolve_file_path(&file).unwrap(), exp),
Err(()) => assert!(wc.resolve_file_path(&file).is_err()),
}
}
}