use serde::{Deserialize, Serialize};
use crate::schema::{Schema, Type};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionSpec {
pub spec_id: i32,
pub fields: Vec<PartitionField>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionField {
pub source_id: i32,
pub field_id: i32,
pub name: String,
pub transform: Transform,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Transform {
Identity,
Bucket(u32),
Truncate(u32),
Year,
Month,
Day,
Hour,
Void,
}
impl PartitionSpec {
pub fn builder(schema: &Schema) -> PartitionSpecBuilder<'_> {
PartitionSpecBuilder::new(schema)
}
pub fn is_unpartitioned(&self) -> bool {
self.fields.is_empty()
}
}
pub struct PartitionSpecBuilder<'a> {
schema: &'a Schema,
fields: Vec<PartitionField>,
next_field_id: i32,
}
impl<'a> PartitionSpecBuilder<'a> {
pub fn new(schema: &'a Schema) -> Self {
Self {
schema,
fields: Vec::new(),
next_field_id: 1000, }
}
pub fn add_identity(self, source_name: &str) -> Result<Self, String> {
self.add_field(source_name, Transform::Identity, None)
}
pub fn add_bucket(self, source_name: &str, num_buckets: u32) -> Result<Self, String> {
let name = format!("{}_bucket_{}", source_name, num_buckets);
self.add_field(source_name, Transform::Bucket(num_buckets), Some(&name))
}
pub fn add_truncate(self, source_name: &str, width: u32) -> Result<Self, String> {
let name = format!("{}_trunc_{}", source_name, width);
self.add_field(source_name, Transform::Truncate(width), Some(&name))
}
pub fn add_year(self, source_name: &str) -> Result<Self, String> {
let name = format!("{}_year", source_name);
self.add_field(source_name, Transform::Year, Some(&name))
}
pub fn add_month(self, source_name: &str) -> Result<Self, String> {
let name = format!("{}_month", source_name);
self.add_field(source_name, Transform::Month, Some(&name))
}
pub fn add_day(self, source_name: &str) -> Result<Self, String> {
let name = format!("{}_day", source_name);
self.add_field(source_name, Transform::Day, Some(&name))
}
pub fn add_hour(self, source_name: &str) -> Result<Self, String> {
let name = format!("{}_hour", source_name);
self.add_field(source_name, Transform::Hour, Some(&name))
}
fn add_field(
mut self,
source_name: &str,
transform: Transform,
rename: Option<&str>,
) -> Result<Self, String> {
let source_field = self
.schema
.find_field_by_name(source_name)
.ok_or_else(|| format!("Column not found: {}", source_name))?;
if !transform.can_apply_to(&source_field.field_type) {
return Err(format!(
"Cannot apply {:?} to column {} of type {:?}",
transform, source_name, source_field.field_type
));
}
let name = rename.unwrap_or(&source_field.name).to_string();
let field_id = self.next_field_id;
self.next_field_id += 1;
self.fields.push(PartitionField {
source_id: source_field.id,
field_id,
name,
transform,
});
Ok(self)
}
pub fn build(self) -> PartitionSpec {
PartitionSpec {
spec_id: 0, fields: self.fields,
}
}
}
impl Transform {
pub fn can_apply_to(&self, _type: &Type) -> bool {
match self {
Transform::Identity => true, Transform::Bucket(_) => true, Transform::Truncate(_) => matches!(_type, Type::String | Type::Binary),
Transform::Year | Transform::Month | Transform::Day => {
matches!(_type, Type::Date | Type::Timestamp { .. })
}
Transform::Hour => matches!(_type, Type::Timestamp { .. }),
Transform::Void => true,
}
}
pub fn result_type(&self, source_type: &Type) -> Type {
match self {
Transform::Identity => source_type.clone(),
Transform::Bucket(_) => Type::Int,
Transform::Truncate(_) => source_type.clone(),
Transform::Year | Transform::Month | Transform::Day | Transform::Hour => Type::Int,
Transform::Void => source_type.clone(), }
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_schema() -> Schema {
Schema::builder(0)
.with_field(1, "id", Type::Long, true)
.with_field(
2,
"ts",
Type::Timestamp {
with_timezone: true,
},
false,
)
.with_field(3, "category", Type::String, false)
.build()
}
#[test]
fn test_partition_builder() {
let schema = sample_schema();
let spec = PartitionSpec::builder(&schema)
.add_day("ts")
.unwrap()
.add_identity("category")
.unwrap()
.add_bucket("id", 16)
.unwrap()
.build();
assert_eq!(spec.fields.len(), 3);
assert_eq!(spec.fields[0].transform, Transform::Day);
assert_eq!(spec.fields[0].name, "ts_day");
assert_eq!(spec.fields[1].transform, Transform::Identity);
assert_eq!(spec.fields[2].transform, Transform::Bucket(16));
}
#[test]
fn test_invalid_transform() {
let schema = sample_schema();
let result = PartitionSpec::builder(&schema).add_hour("category");
assert!(result.is_err());
}
}