1use serde::{Deserialize, Serialize};
22
23use crate::schema::{Schema, Type};
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29#[serde(rename_all = "kebab-case")]
30pub struct PartitionSpec {
31 pub spec_id: i32,
33
34 pub fields: Vec<PartitionField>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40#[serde(rename_all = "kebab-case")]
41pub struct PartitionField {
42 pub source_id: i32,
44
45 pub field_id: i32,
47
48 pub name: String,
50
51 pub transform: Transform,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58pub enum Transform {
59 Identity,
61 Bucket(u32),
63 Truncate(u32),
65 Year,
67 Month,
69 Day,
71 Hour,
73 Void,
75}
76
77impl PartitionSpec {
78 pub fn builder(schema: &Schema) -> PartitionSpecBuilder<'_> {
79 PartitionSpecBuilder::new(schema)
80 }
81
82 pub fn is_unpartitioned(&self) -> bool {
84 self.fields.is_empty()
85 }
86}
87
88pub struct PartitionSpecBuilder<'a> {
89 schema: &'a Schema,
90 fields: Vec<PartitionField>,
91 next_field_id: i32,
92}
93
94impl<'a> PartitionSpecBuilder<'a> {
95 pub fn new(schema: &'a Schema) -> Self {
96 Self {
97 schema,
98 fields: Vec::new(),
99 next_field_id: 1000, }
101 }
102
103 pub fn add_identity(self, source_name: &str) -> Result<Self, String> {
104 self.add_field(source_name, Transform::Identity, None)
105 }
106
107 pub fn add_bucket(self, source_name: &str, num_buckets: u32) -> Result<Self, String> {
108 let name = format!("{}_bucket_{}", source_name, num_buckets);
109 self.add_field(source_name, Transform::Bucket(num_buckets), Some(&name))
110 }
111
112 pub fn add_truncate(self, source_name: &str, width: u32) -> Result<Self, String> {
113 let name = format!("{}_trunc_{}", source_name, width);
114 self.add_field(source_name, Transform::Truncate(width), Some(&name))
115 }
116
117 pub fn add_year(self, source_name: &str) -> Result<Self, String> {
118 let name = format!("{}_year", source_name);
119 self.add_field(source_name, Transform::Year, Some(&name))
120 }
121
122 pub fn add_month(self, source_name: &str) -> Result<Self, String> {
123 let name = format!("{}_month", source_name);
124 self.add_field(source_name, Transform::Month, Some(&name))
125 }
126
127 pub fn add_day(self, source_name: &str) -> Result<Self, String> {
128 let name = format!("{}_day", source_name);
129 self.add_field(source_name, Transform::Day, Some(&name))
130 }
131
132 pub fn add_hour(self, source_name: &str) -> Result<Self, String> {
133 let name = format!("{}_hour", source_name);
134 self.add_field(source_name, Transform::Hour, Some(&name))
135 }
136
137 fn add_field(
138 mut self,
139 source_name: &str,
140 transform: Transform,
141 rename: Option<&str>,
142 ) -> Result<Self, String> {
143 let source_field = self
144 .schema
145 .find_field_by_name(source_name)
146 .ok_or_else(|| format!("Column not found: {}", source_name))?;
147
148 if !transform.can_apply_to(&source_field.field_type) {
150 return Err(format!(
151 "Cannot apply {:?} to column {} of type {:?}",
152 transform, source_name, source_field.field_type
153 ));
154 }
155
156 let name = rename.unwrap_or(&source_field.name).to_string();
157 let field_id = self.next_field_id;
158 self.next_field_id += 1;
159
160 self.fields.push(PartitionField {
161 source_id: source_field.id,
162 field_id,
163 name,
164 transform,
165 });
166
167 Ok(self)
168 }
169
170 pub fn build(self) -> PartitionSpec {
171 PartitionSpec {
172 spec_id: 0, fields: self.fields,
174 }
175 }
176}
177
178impl Transform {
179 pub fn can_apply_to(&self, _type: &Type) -> bool {
181 match self {
182 Transform::Identity => true, Transform::Bucket(_) => true, Transform::Truncate(_) => matches!(_type, Type::String | Type::Binary),
185 Transform::Year | Transform::Month | Transform::Day => {
186 matches!(_type, Type::Date | Type::Timestamp { .. })
187 }
188 Transform::Hour => matches!(_type, Type::Timestamp { .. }),
189 Transform::Void => true,
190 }
191 }
192
193 pub fn result_type(&self, source_type: &Type) -> Type {
195 match self {
196 Transform::Identity => source_type.clone(),
197 Transform::Bucket(_) => Type::Int,
198 Transform::Truncate(_) => source_type.clone(),
199 Transform::Year | Transform::Month | Transform::Day | Transform::Hour => Type::Int,
200 Transform::Void => source_type.clone(), }
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 fn sample_schema() -> Schema {
210 Schema::builder(0)
211 .with_field(1, "id", Type::Long, true)
212 .with_field(
213 2,
214 "ts",
215 Type::Timestamp {
216 with_timezone: true,
217 },
218 false,
219 )
220 .with_field(3, "category", Type::String, false)
221 .build()
222 }
223
224 #[test]
225 fn test_partition_builder() {
226 let schema = sample_schema();
227 let spec = PartitionSpec::builder(&schema)
228 .add_day("ts")
229 .unwrap()
230 .add_identity("category")
231 .unwrap()
232 .add_bucket("id", 16)
233 .unwrap()
234 .build();
235
236 assert_eq!(spec.fields.len(), 3);
237 assert_eq!(spec.fields[0].transform, Transform::Day);
238 assert_eq!(spec.fields[0].name, "ts_day");
239 assert_eq!(spec.fields[1].transform, Transform::Identity);
240 assert_eq!(spec.fields[2].transform, Transform::Bucket(16));
241 }
242
243 #[test]
244 fn test_invalid_transform() {
245 let schema = sample_schema();
246 let result = PartitionSpec::builder(&schema).add_hour("category"); assert!(result.is_err());
249 }
250}