Skip to main content

supertable_core/
partition.rs

1//! # SuperTable Partitioning
2//!
3//! This module defines the partitioning strategy for SuperTable, supporting
4//! **Hidden Partitioning**.
5//!
6//! ## Hidden Partitioning
7//!
8//! Unlike Hive-style partitioning where users must explicitly query partition columns
9//! (e.g., `WHERE year = 2024`), SuperTable tracks the relationship between
10//! data columns and partition values. Users query the data column (e.g., `WHERE timestamp > ...`)
11//! and the query engine automatically prunes partitions based on the transform.
12//!
13//! ## Transforms
14//!
15//! Supported transforms:
16//! - **Identity**: Value is used as-is (e.g., `category`)
17//! - **Bucket(N)**: Hash of value % N (e.g., `bucket(user_id, 16)`)
18//! - **Truncate(W)**: Truncate string/binary to width W (e.g., `truncate(name, 1)`)
19//! - **Year/Month/Day/Hour**: Extract date/time component
20
21use serde::{Deserialize, Serialize};
22
23use crate::schema::{Schema, Type};
24
25/// A partition spec defines how a table is partitioned.
26///
27/// Use `PartitionSpec::builder()` to create new specs.
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29#[serde(rename_all = "kebab-case")]
30pub struct PartitionSpec {
31    /// Unique identifier for this partition spec.
32    pub spec_id: i32,
33
34    /// The list of fields that make up the partition tuple.
35    pub fields: Vec<PartitionField>,
36}
37
38/// A field in a partition spec.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40#[serde(rename_all = "kebab-case")]
41pub struct PartitionField {
42    /// The source column ID from the table schema.
43    pub source_id: i32,
44
45    /// A unique ID for this partition field within the spec.
46    pub field_id: i32,
47
48    /// A name for the partition field.
49    pub name: String,
50
51    /// The transform applied to the source column.
52    pub transform: Transform,
53}
54
55/// Supported partition transforms.
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(rename_all = "lowercase")]
58pub enum Transform {
59    /// Use the value as-is.
60    Identity,
61    /// Hash implementation (murmur3 32-bit recommended) % N.
62    Bucket(u32),
63    /// Truncate to width W.
64    Truncate(u32),
65    /// Extract year from date/timestamp.
66    Year,
67    /// Extract month from date/timestamp.
68    Month,
69    /// Extract day from date/timestamp.
70    Day,
71    /// Extract hour from timestamp.
72    Hour,
73    /// Always null (used for evolution when dropping a partition field).
74    Void,
75}
76
77impl PartitionSpec {
78    pub fn builder(schema: &Schema) -> PartitionSpecBuilder<'_> {
79        PartitionSpecBuilder::new(schema)
80    }
81
82    /// Returns true if the table is unpartitioned.
83    pub fn is_unpartitioned(&self) -> bool {
84        self.fields.is_empty()
85    }
86}
87
88pub struct PartitionSpecBuilder<'a> {
89    schema: &'a Schema,
90    fields: Vec<PartitionField>,
91    next_field_id: i32,
92}
93
94impl<'a> PartitionSpecBuilder<'a> {
95    pub fn new(schema: &'a Schema) -> Self {
96        Self {
97            schema,
98            fields: Vec::new(),
99            next_field_id: 1000, // Partition field IDs usually start high to avoid collision
100        }
101    }
102
103    pub fn add_identity(self, source_name: &str) -> Result<Self, String> {
104        self.add_field(source_name, Transform::Identity, None)
105    }
106
107    pub fn add_bucket(self, source_name: &str, num_buckets: u32) -> Result<Self, String> {
108        let name = format!("{}_bucket_{}", source_name, num_buckets);
109        self.add_field(source_name, Transform::Bucket(num_buckets), Some(&name))
110    }
111
112    pub fn add_truncate(self, source_name: &str, width: u32) -> Result<Self, String> {
113        let name = format!("{}_trunc_{}", source_name, width);
114        self.add_field(source_name, Transform::Truncate(width), Some(&name))
115    }
116
117    pub fn add_year(self, source_name: &str) -> Result<Self, String> {
118        let name = format!("{}_year", source_name);
119        self.add_field(source_name, Transform::Year, Some(&name))
120    }
121
122    pub fn add_month(self, source_name: &str) -> Result<Self, String> {
123        let name = format!("{}_month", source_name);
124        self.add_field(source_name, Transform::Month, Some(&name))
125    }
126
127    pub fn add_day(self, source_name: &str) -> Result<Self, String> {
128        let name = format!("{}_day", source_name);
129        self.add_field(source_name, Transform::Day, Some(&name))
130    }
131
132    pub fn add_hour(self, source_name: &str) -> Result<Self, String> {
133        let name = format!("{}_hour", source_name);
134        self.add_field(source_name, Transform::Hour, Some(&name))
135    }
136
137    fn add_field(
138        mut self,
139        source_name: &str,
140        transform: Transform,
141        rename: Option<&str>,
142    ) -> Result<Self, String> {
143        let source_field = self
144            .schema
145            .find_field_by_name(source_name)
146            .ok_or_else(|| format!("Column not found: {}", source_name))?;
147
148        // Validate transform compatibility
149        if !transform.can_apply_to(&source_field.field_type) {
150            return Err(format!(
151                "Cannot apply {:?} to column {} of type {:?}",
152                transform, source_name, source_field.field_type
153            ));
154        }
155
156        let name = rename.unwrap_or(&source_field.name).to_string();
157        let field_id = self.next_field_id;
158        self.next_field_id += 1;
159
160        self.fields.push(PartitionField {
161            source_id: source_field.id,
162            field_id,
163            name,
164            transform,
165        });
166
167        Ok(self)
168    }
169
170    pub fn build(self) -> PartitionSpec {
171        PartitionSpec {
172            spec_id: 0, // Should be assigned by TableMetadata
173            fields: self.fields,
174        }
175    }
176}
177
178impl Transform {
179    /// Returns true if this transform can be applied to the given type.
180    pub fn can_apply_to(&self, _type: &Type) -> bool {
181        match self {
182            Transform::Identity => true,  // Can partition by anything
183            Transform::Bucket(_) => true, // Can hash anything
184            Transform::Truncate(_) => matches!(_type, Type::String | Type::Binary),
185            Transform::Year | Transform::Month | Transform::Day => {
186                matches!(_type, Type::Date | Type::Timestamp { .. })
187            }
188            Transform::Hour => matches!(_type, Type::Timestamp { .. }),
189            Transform::Void => true,
190        }
191    }
192
193    /// Returns the result type of the transform.
194    pub fn result_type(&self, source_type: &Type) -> Type {
195        match self {
196            Transform::Identity => source_type.clone(),
197            Transform::Bucket(_) => Type::Int,
198            Transform::Truncate(_) => source_type.clone(),
199            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => Type::Int,
200            Transform::Void => source_type.clone(), // Or Null?
201        }
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    fn sample_schema() -> Schema {
210        Schema::builder(0)
211            .with_field(1, "id", Type::Long, true)
212            .with_field(
213                2,
214                "ts",
215                Type::Timestamp {
216                    with_timezone: true,
217                },
218                false,
219            )
220            .with_field(3, "category", Type::String, false)
221            .build()
222    }
223
224    #[test]
225    fn test_partition_builder() {
226        let schema = sample_schema();
227        let spec = PartitionSpec::builder(&schema)
228            .add_day("ts")
229            .unwrap()
230            .add_identity("category")
231            .unwrap()
232            .add_bucket("id", 16)
233            .unwrap()
234            .build();
235
236        assert_eq!(spec.fields.len(), 3);
237        assert_eq!(spec.fields[0].transform, Transform::Day);
238        assert_eq!(spec.fields[0].name, "ts_day");
239        assert_eq!(spec.fields[1].transform, Transform::Identity);
240        assert_eq!(spec.fields[2].transform, Transform::Bucket(16));
241    }
242
243    #[test]
244    fn test_invalid_transform() {
245        let schema = sample_schema();
246        let result = PartitionSpec::builder(&schema).add_hour("category"); // String cannot be hour-partitioned
247
248        assert!(result.is_err());
249    }
250}