use serde::{
de::{Error as SerdeError, IntoDeserializer},
Deserialize, Deserializer, Serialize, Serializer,
};
use derive_builder::Builder;
use crate::error::Error;
use super::types::{StructType, Type};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "lowercase", remote = "Self")]
pub enum Transform {
Identity,
#[serde(serialize_with = "serialize_bucket")]
Bucket(u32),
#[serde(serialize_with = "serialize_truncate")]
Truncate(u32),
Year,
Month,
Day,
Hour,
Void,
}
impl<'de> Deserialize<'de> for Transform {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if s.starts_with("bucket") {
deserialize_bucket(s.into_deserializer())
} else if s.starts_with("truncate") {
deserialize_truncate(s.into_deserializer())
} else {
Transform::deserialize(s.into_deserializer())
}
}
}
impl Serialize for Transform {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
Transform::serialize(self, serializer)
}
}
fn deserialize_bucket<'de, D>(deserializer: D) -> Result<Transform, D::Error>
where
D: Deserializer<'de>,
{
let bucket = String::deserialize(deserializer)?
.trim_start_matches(r"bucket[")
.trim_end_matches(']')
.to_owned();
bucket
.parse()
.map(Transform::Bucket)
.map_err(D::Error::custom)
}
fn serialize_bucket<S>(value: &u32, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("bucket[{value}]"))
}
fn deserialize_truncate<'de, D>(deserializer: D) -> Result<Transform, D::Error>
where
D: Deserializer<'de>,
{
let truncate = String::deserialize(deserializer)?
.trim_start_matches(r"truncate[")
.trim_end_matches(']')
.to_owned();
truncate
.parse()
.map(Transform::Truncate)
.map_err(D::Error::custom)
}
fn serialize_truncate<S>(value: &u32, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("truncate[{value}]"))
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionField {
pub source_id: i32,
pub field_id: i32,
pub name: String,
pub transform: Transform,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionSpec {
pub spec_id: i32,
#[builder(setter(each(name = "with_partition_field")))]
pub fields: Vec<PartitionField>,
}
impl PartitionSpec {
pub fn data_types(&self, schema: &StructType) -> Result<Vec<Type>, Error> {
self.fields
.iter()
.map(|field| {
schema
.get(field.source_id as usize)
.map(|x| x.field_type.clone())
})
.collect::<Option<Vec<_>>>()
.ok_or(Error::InvalidFormat("partition spec".to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn partition_spec() {
let sort_order = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;
let partition_spec: PartitionSpec = serde_json::from_str(sort_order).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(1001, partition_spec.fields[1].field_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);
assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(1002, partition_spec.fields[2].field_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}
}