use chrono::{DateTime, NaiveDate, Utc};
use crate::expressions::{DecimalData, Scalar};
use crate::{DeltaResult, Error};
const UNIX_EPOCH_CE_DAYS: i32 = 719_163;
pub fn serialize_partition_value(value: &Scalar) -> DeltaResult<Option<String>> {
match value {
Scalar::Null(_) => Ok(None),
Scalar::String(s) => Ok(if s.is_empty() { None } else { Some(s.clone()) }),
Scalar::Boolean(v) => Ok(Some(v.to_string())),
Scalar::Byte(v) => Ok(Some(v.to_string())),
Scalar::Short(v) => Ok(Some(v.to_string())),
Scalar::Integer(v) => Ok(Some(v.to_string())),
Scalar::Long(v) => Ok(Some(v.to_string())),
Scalar::Float(v) => Ok(Some(format_f32(*v))),
Scalar::Double(v) => Ok(Some(format_f64(*v))),
Scalar::Date(days) => Ok(Some(format_date(*days)?)),
Scalar::Timestamp(us) => Ok(Some(format_timestamp(*us)?)),
Scalar::TimestampNtz(us) => Ok(Some(format_timestamp_ntz(*us)?)),
Scalar::Decimal(d) => Ok(Some(format_decimal(d))),
Scalar::Binary(b) => {
if b.is_empty() {
Ok(None)
} else {
Ok(Some(format_binary(b)?))
}
}
Scalar::Struct(_) | Scalar::Array(_) | Scalar::Map(_) => Err(Error::generic(format!(
"cannot serialize partition value: type {:?} is not a valid partition column type",
value.data_type()
))),
}
}
macro_rules! format_java_float {
($v:expr) => {{
let v = $v;
if v.is_nan() {
return "NaN".into();
}
if v.is_infinite() {
return if v > 0.0 {
"Infinity".into()
} else {
"-Infinity".into()
};
}
if v == 0.0 {
return if v.is_sign_negative() {
"-0.0".into()
} else {
"0.0".into()
};
}
let abs = v.abs();
if (1e-3..1e7).contains(&abs) {
let s = v.to_string();
if s.contains('.') {
s
} else {
format!("{s}.0")
}
} else {
let s = format!("{v:e}").replace('e', "E");
if s.contains('.') {
s
} else {
s.replacen('E', ".0E", 1)
}
}
}};
}
fn format_f32(v: f32) -> String {
format_java_float!(v)
}
fn format_f64(v: f64) -> String {
format_java_float!(v)
}
fn format_date(days: i32) -> DeltaResult<String> {
let ce_days = UNIX_EPOCH_CE_DAYS.checked_add(days).ok_or_else(|| {
Error::generic(format!("date value {days} days from epoch is out of range"))
})?;
NaiveDate::from_num_days_from_ce_opt(ce_days)
.map(|d| d.format("%Y-%m-%d").to_string())
.ok_or_else(|| Error::generic(format!("date value {days} days from epoch is out of range")))
}
fn micros_to_datetime(micros: i64, label: &str) -> DeltaResult<DateTime<Utc>> {
let secs = micros.div_euclid(1_000_000);
let subsec_nanos = (micros.rem_euclid(1_000_000) as u32) * 1000;
DateTime::from_timestamp(secs, subsec_nanos).ok_or_else(|| {
Error::generic(format!(
"{label} value {micros} microseconds from epoch is out of range"
))
})
}
fn format_timestamp(micros: i64) -> DeltaResult<String> {
micros_to_datetime(micros, "timestamp")
.map(|dt| dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string())
}
fn format_timestamp_ntz(micros: i64) -> DeltaResult<String> {
micros_to_datetime(micros, "timestamp_ntz")
.map(|dt| dt.naive_utc().format("%Y-%m-%d %H:%M:%S%.6f").to_string())
}
fn format_decimal(d: &DecimalData) -> String {
let scale = d.scale();
if scale == 0 {
return d.bits().to_string();
}
let sign = if d.bits() < 0 { "-" } else { "" };
let abs = d.bits().unsigned_abs();
let divisor = 10_u128.pow(scale as u32);
let int_part = abs / divisor;
let frac_part = abs % divisor;
format!(
"{sign}{int_part}.{frac_part:0>width$}",
width = scale as usize
)
}
fn format_binary(bytes: &[u8]) -> DeltaResult<String> {
std::str::from_utf8(bytes)
.map(|s| s.to_string())
.map_err(|e| Error::generic(format!("binary partition value is not valid UTF-8: {e}")))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{ArrayData, MapData, Scalar, StructData};
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField};
use rstest::rstest;
#[rstest]
#[case::row05_int(DataType::INTEGER)]
#[case::row08_bigint(DataType::LONG)]
#[case::row11_tinyint(DataType::BYTE)]
#[case::row14_smallint(DataType::SHORT)]
#[case::row22_double(DataType::DOUBLE)]
#[case::row27_float(DataType::FLOAT)]
#[case::row30_boolean(DataType::BOOLEAN)]
#[case::row34_decimal(DataType::decimal(38, 18).unwrap())]
#[case::row39_date(DataType::DATE)]
#[case::row43_timestamp(DataType::TIMESTAMP)]
#[case::row46_timestamp_ntz(DataType::TIMESTAMP_NTZ)]
#[case::row66_string(DataType::STRING)]
#[case::binary(DataType::BINARY)]
fn test_spark_ref_null_returns_none(#[case] dtype: DataType) {
assert_eq!(
serialize_partition_value(&Scalar::Null(dtype)).unwrap(),
None
);
}
#[test]
fn test_null_complex_type_returns_none() {
let val = Scalar::Null(DataType::Array(Box::new(ArrayType::new(
DataType::INTEGER,
false,
))));
assert_eq!(serialize_partition_value(&val).unwrap(), None);
}
#[rstest]
#[case::row65_empty_string(Scalar::String(String::new()))]
#[case::row49_empty_binary(Scalar::Binary(vec![]))]
fn test_spark_ref_empty_value_returns_none(#[case] input: Scalar) {
assert_eq!(serialize_partition_value(&input).unwrap(), None);
}
#[rstest]
#[case::row01_int_zero(Scalar::Integer(0), "0")]
#[case::row02_int_neg(Scalar::Integer(-1), "-1")]
#[case::row03_int_max(Scalar::Integer(i32::MAX), "2147483647")]
#[case::row04_int_min(Scalar::Integer(i32::MIN), "-2147483648")]
#[case::row06_bigint_max(Scalar::Long(i64::MAX), "9223372036854775807")]
#[case::row07_bigint_min(Scalar::Long(i64::MIN), "-9223372036854775808")]
#[case::row09_tinyint_max(Scalar::Byte(i8::MAX), "127")]
#[case::row10_tinyint_min(Scalar::Byte(i8::MIN), "-128")]
#[case::row12_smallint_max(Scalar::Short(i16::MAX), "32767")]
#[case::row13_smallint_min(Scalar::Short(i16::MIN), "-32768")]
fn test_spark_ref_integer_types(#[case] input: Scalar, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&input).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row15_zero(0.0, "0.0")]
#[case::row16_neg_zero(-0.0, "-0.0")] #[case::row17_max(f64::MAX, "1.7976931348623157E308")]
#[case::row17b_min_positive_normal(f64::MIN_POSITIVE, "2.2250738585072014E-308")]
#[case::row18_min_subnormal(5e-324f64, "5.0E-324")]
#[case::row19_nan(f64::NAN, "NaN")]
#[case::row20_inf(f64::INFINITY, "Infinity")]
#[case::row21_neg_inf(f64::NEG_INFINITY, "-Infinity")]
#[case::integer_valued(1.0, "1.0")] fn test_spark_ref_double(#[case] input: f64, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::Double(input)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row23_zero(0.0f32, "0.0")]
#[case::row24_nan(f32::NAN, "NaN")]
#[case::row25_inf(f32::INFINITY, "Infinity")]
#[case::row26_neg_inf(f32::NEG_INFINITY, "-Infinity")]
#[case::neg_zero(-0.0f32, "-0.0")]
#[case::scientific_small(1e-4f32, "1.0E-4")]
#[case::scientific_large(1e8f32, "1.0E8")]
#[case::f32_max(f32::MAX, "3.4028235E38")]
fn test_spark_ref_float(#[case] input: f32, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::Float(input)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row28_true(true, "true")]
#[case::row29_false(false, "false")]
fn test_spark_ref_boolean(#[case] input: bool, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::Boolean(input)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row31_zero(0, 38, 18, "0.000000000000000000")]
#[case::row32_positive(1_230_000_000_000_000_000, 38, 18, "1.230000000000000000")]
#[case::row33_negative(-1_230_000_000_000_000_000, 38, 18, "-1.230000000000000000")]
#[case::scale_zero(42, 5, 0, "42")]
#[case::trailing_zeros(4200, 5, 2, "42.00")]
#[case::neg_between_zero_and_one(-5, 3, 2, "-0.05")]
#[case::neg_with_scale(-12345, 5, 2, "-123.45")]
#[case::pos_with_scale(12345, 5, 2, "123.45")]
fn test_spark_ref_decimal(
#[case] bits: i128,
#[case] precision: u8,
#[case] scale: u8,
#[case] expected: &str,
) {
let d = Scalar::decimal(bits, precision, scale).unwrap();
assert_eq!(
serialize_partition_value(&d).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row35_recent(19723, "2024-01-01")]
#[case::row36_epoch(0, "1970-01-01")]
#[case::row37_year_one(-719_162, "0001-01-01")]
#[case::row38_year_9999(2_932_896, "9999-12-31")]
#[case::pre_epoch(-1, "1969-12-31")]
fn test_spark_ref_date(#[case] days: i32, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::Date(days)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row40_afternoon(1_718_479_845_000_000, "2024-06-15T19:30:45.000000Z")]
#[case::row41_epoch_offset(28_800_000_000, "1970-01-01T08:00:00.000000Z")]
#[case::row42_with_micros(1_718_521_199_999_999, "2024-06-16T06:59:59.999999Z")]
#[case::pre_epoch(-1, "1969-12-31T23:59:59.999999Z")]
fn test_spark_ref_timestamp(#[case] micros: i64, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::Timestamp(micros)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row44_afternoon(1_718_454_645_000_000, "2024-06-15 12:30:45.000000")]
#[case::row45_epoch(0, "1970-01-01 00:00:00.000000")]
fn test_spark_ref_timestamp_ntz(#[case] micros: i64, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::TimestampNtz(micros)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::row47_nul("\x00")]
#[case::row48_embedded_nul("before\x00after")]
#[case::row54_left_brace("a{b")]
#[case::row55_right_brace("a}b")]
#[case::row56_space("hello world")]
#[case::row57_umlaut("M\u{00FC}nchen")]
#[case::row58_cjk("\u{65E5}\u{672C}\u{8A9E}")]
#[case::row59_emoji("\u{1F3B5}\u{1F3B6}")]
#[case::row60_angle_pipe("a<b>c|d")]
#[case::row61_at_bang_parens("a@b!c(d)")]
#[case::row62_special_ascii("a&b+c$d;e,f")]
#[case::row63_slash_percent("Serbia/srb%")]
#[case::row64_percent_literal("100%25")]
#[case::row67_single_space(" ")]
#[case::row68_double_space(" ")]
fn test_spark_ref_string_passthrough(#[case] input: &str) {
assert_eq!(
serialize_partition_value(&Scalar::String(input.to_string())).unwrap(),
Some(input.to_string())
);
}
#[rstest]
#[case::row51_hello(b"HELLO".to_vec(), "HELLO")]
#[case::row53_special_chars(vec![0x2F, 0x3D, 0x25], "/=%")]
fn test_spark_ref_binary_utf8(#[case] input: Vec<u8>, #[case] expected: &str) {
assert_eq!(
serialize_partition_value(&Scalar::Binary(input)).unwrap(),
Some(expected.to_string())
);
}
#[rstest]
#[case::integer(Scalar::Integer(42), PrimitiveType::Integer)]
#[case::integer_neg(Scalar::Integer(-1), PrimitiveType::Integer)]
#[case::long(Scalar::Long(9_876_543_210), PrimitiveType::Long)]
#[case::byte(Scalar::Byte(42), PrimitiveType::Byte)]
#[case::short(Scalar::Short(-1000), PrimitiveType::Short)]
#[case::boolean_true(Scalar::Boolean(true), PrimitiveType::Boolean)]
#[case::boolean_false(Scalar::Boolean(false), PrimitiveType::Boolean)]
#[case::string(Scalar::String("hello".into()), PrimitiveType::String)]
#[case::binary(Scalar::Binary(b"HELLO".to_vec()), PrimitiveType::Binary)]
#[case::date(Scalar::Date(19723), PrimitiveType::Date)]
#[case::timestamp(Scalar::Timestamp(1_718_521_199_999_999), PrimitiveType::Timestamp)]
#[case::timestamp_ntz(
Scalar::TimestampNtz(1_718_454_645_000_000),
PrimitiveType::TimestampNtz
)]
#[case::float(Scalar::Float(3.125), PrimitiveType::Float)]
#[case::float_one(Scalar::Float(1.0), PrimitiveType::Float)]
#[case::float_scientific(Scalar::Float(1e-4), PrimitiveType::Float)]
#[case::double(Scalar::Double(3.125), PrimitiveType::Double)]
#[case::double_max(Scalar::Double(f64::MAX), PrimitiveType::Double)]
#[case::double_min_positive(Scalar::Double(f64::MIN_POSITIVE), PrimitiveType::Double)]
#[case::decimal(
Scalar::decimal(1_230_000_000_000_000_000i128, 38, 18).unwrap(),
PrimitiveType::decimal(38, 18).unwrap()
)]
fn test_roundtrip(#[case] input: Scalar, #[case] ptype: PrimitiveType) {
let serialized = serialize_partition_value(&input).unwrap().unwrap();
let parsed = ptype.parse_scalar(&serialized).unwrap();
assert_eq!(parsed, input);
}
#[test]
fn test_subnormal_double_kernel_and_spark_representations_parse_to_same_value() {
let kernel_str = serialize_partition_value(&Scalar::Double(5e-324f64))
.unwrap()
.unwrap();
let spark_str = "4.9E-324";
let from_kernel = PrimitiveType::Double.parse_scalar(&kernel_str).unwrap();
let from_spark = PrimitiveType::Double.parse_scalar(spark_str).unwrap();
assert_eq!(from_kernel, from_spark);
}
#[rstest]
#[case::date_max(Scalar::Date(i32::MAX))]
#[case::date_min(Scalar::Date(i32::MIN))]
#[case::timestamp(Scalar::Timestamp(i64::MAX))]
#[case::timestamp_ntz(Scalar::TimestampNtz(i64::MAX))]
fn test_temporal_out_of_range_returns_error(#[case] input: Scalar) {
assert!(serialize_partition_value(&input).is_err());
}
#[rstest]
#[case::row50_deadbeef(vec![0xDE, 0xAD, 0xBE, 0xEF])]
#[case::row52_nul_and_high_byte(vec![0x00, 0xFF])]
fn test_non_utf8_binary_returns_error(#[case] input: Vec<u8>) {
assert!(serialize_partition_value(&Scalar::Binary(input)).is_err());
}
#[test]
fn test_non_null_struct_returns_error() {
let data = StructData::try_new(
vec![StructField::new("x", DataType::INTEGER, true)],
vec![Scalar::Integer(1)],
)
.unwrap();
assert!(serialize_partition_value(&Scalar::Struct(data)).is_err());
}
#[test]
fn test_non_null_array_returns_error() {
let data = ArrayData::try_new(ArrayType::new(DataType::INTEGER, false), [1i32]).unwrap();
assert!(serialize_partition_value(&Scalar::Array(data)).is_err());
}
#[test]
fn test_non_null_map_returns_error() {
let data = MapData::try_new(
MapType::new(DataType::STRING, DataType::INTEGER, false),
[("k".to_string(), 1i32)],
)
.unwrap();
assert!(serialize_partition_value(&Scalar::Map(data)).is_err());
}
}