#[derive(Debug, Clone, PartialEq)]
pub enum PartitionValue {
Int(i64),
Float(f64),
Text(String),
}
impl PartitionValue {
fn as_i64(&self) -> i64 {
match self {
PartitionValue::Int(v) => *v,
PartitionValue::Float(v) => *v as i64,
PartitionValue::Text(s) => s.parse::<i64>().unwrap_or(0),
}
}
fn to_string_repr(&self) -> String {
match self {
PartitionValue::Int(v) => v.to_string(),
PartitionValue::Float(v) => v.to_string(),
PartitionValue::Text(s) => s.clone(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum RouteResult {
Routed(String),
NeedsExpansion {
new_table: String,
new_bounds: (i64, i64),
},
}
#[derive(Debug, Clone)]
pub enum PartitionType {
Range {
column: String,
bounds: Vec<(i64, i64)>,
auto_expand_interval: Option<(i64, usize)>,
},
Hash {
column: String,
num_partitions: usize,
},
List {
column: String,
values: Vec<Vec<String>>,
},
}
#[derive(Debug, Clone)]
pub struct PartitionMap {
pub table: String,
pub partition_type: PartitionType,
pub num_partitions: usize,
}
impl PartitionMap {
pub fn route_key(&self, key_value: &PartitionValue) -> String {
let idx = self.partition_index(key_value);
format!("{}__p_part_{}", self.table, idx)
}
fn partition_index(&self, key_value: &PartitionValue) -> usize {
match &self.partition_type {
PartitionType::Hash { num_partitions, .. } => {
let s = key_value.to_string_repr();
let h = fnv1a_hash(s.as_bytes());
h % num_partitions
}
PartitionType::Range { bounds, .. } => {
let v = key_value.as_i64();
bounds
.iter()
.position(|(lo, hi)| v >= *lo && v < *hi)
.unwrap_or(self.num_partitions.saturating_sub(1))
}
PartitionType::List { values, .. } => {
let s = key_value.to_string_repr();
values
.iter()
.position(|group| group.iter().any(|v| v == &s))
.unwrap_or(0)
}
}
}
pub fn pruned_partitions(&self, filter_value: Option<&PartitionValue>) -> Vec<String> {
match filter_value {
None => (0..self.num_partitions)
.map(|i| format!("{}__p_part_{}", self.table, i))
.collect(),
Some(v) => vec![self.route_key(v)],
}
}
pub fn all_partitions(&self) -> Vec<String> {
self.pruned_partitions(None)
}
pub fn route_or_expand(&self, key_value: &PartitionValue) -> RouteResult {
match &self.partition_type {
PartitionType::Range {
bounds,
auto_expand_interval,
..
} => {
let v = key_value.as_i64();
if let Some(pos) = bounds.iter().position(|(lo, hi)| v >= *lo && v < *hi) {
return RouteResult::Routed(format!("{}__p_part_{}", self.table, pos));
}
if let Some((interval, max_parts)) = auto_expand_interval
&& self.num_partitions < *max_parts
{
let last_hi = bounds.last().map(|(_, hi)| *hi).unwrap_or(0);
if v >= last_hi {
let diff = v - last_hi;
let steps = (diff / interval) + 1;
let new_hi = last_hi + steps * interval;
return RouteResult::NeedsExpansion {
new_table: format!("{}__p_part_{}", self.table, self.num_partitions),
new_bounds: (last_hi, new_hi),
};
} else {
return RouteResult::Routed(format!("{}__p_part_0", self.table));
}
}
let idx = self.num_partitions.saturating_sub(1);
RouteResult::Routed(format!("{}__p_part_{}", self.table, idx))
}
_ => RouteResult::Routed(self.route_key(key_value)),
}
}
}
fn fnv1a_hash(data: &[u8]) -> usize {
const FNV_OFFSET: u32 = 2_166_136_261;
const FNV_PRIME: u32 = 16_777_619;
let mut hash = FNV_OFFSET;
for &byte in data {
hash ^= byte as u32;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash as usize
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_partition_routing() {
let map = PartitionMap {
table: "users".into(),
partition_type: PartitionType::Hash {
column: "id".into(),
num_partitions: 4,
},
num_partitions: 4,
};
let t0 = map.route_key(&PartitionValue::Int(0));
let t3 = map.route_key(&PartitionValue::Int(3));
assert!(
t0.contains("part_"),
"서브테이블 이름에 part_ 포함되어야 함"
);
assert!(t0.starts_with("users__p_part_"));
assert!(t3.starts_with("users__p_part_"));
}
#[test]
fn test_hash_partition_all_valid_indices() {
let n = 8usize;
let map = PartitionMap {
table: "logs".into(),
partition_type: PartitionType::Hash {
column: "id".into(),
num_partitions: n,
},
num_partitions: n,
};
for i in 0i64..100 {
let sub = map.route_key(&PartitionValue::Int(i));
let idx: usize = sub.split('_').last().unwrap().parse().unwrap();
assert!(idx < n, "인덱스 {}가 범위 이상", idx);
}
}
#[test]
fn test_range_partition_routing() {
let map = PartitionMap {
table: "orders".into(),
partition_type: PartitionType::Range {
column: "amount".into(),
bounds: vec![(0, 100), (100, 500), (500, 10000)],
auto_expand_interval: None,
},
num_partitions: 3,
};
let p0 = map.route_key(&PartitionValue::Int(50));
let p1 = map.route_key(&PartitionValue::Int(200));
let p2 = map.route_key(&PartitionValue::Int(1000));
assert!(p0.ends_with("_0"));
assert!(p1.ends_with("_1"));
assert!(p2.ends_with("_2"));
}
#[test]
fn test_range_partition_pruning() {
let map = PartitionMap {
table: "logs".into(),
partition_type: PartitionType::Range {
column: "ts".into(),
bounds: vec![(0, 1000), (1000, 2000), (2000, 3000)],
auto_expand_interval: None,
},
num_partitions: 3,
};
let partitions = map.pruned_partitions(Some(&PartitionValue::Int(1500)));
assert_eq!(partitions.len(), 1, "단일 파티션만 스캔해야 함");
assert!(partitions[0].ends_with("_1"), "1000-2000 범위는 파티션 1");
let all = map.pruned_partitions(None);
assert_eq!(all.len(), 3);
}
#[test]
fn test_list_partition_routing() {
let map = PartitionMap {
table: "regions".into(),
partition_type: PartitionType::List {
column: "country".into(),
values: vec![
vec!["KR".into(), "JP".into()],
vec!["US".into(), "CA".into()],
],
},
num_partitions: 2,
};
let kr = map.route_key(&PartitionValue::Text("KR".into()));
let us = map.route_key(&PartitionValue::Text("US".into()));
assert!(kr.ends_with("_0"));
assert!(us.ends_with("_1"));
}
#[test]
fn test_all_partitions() {
let map = PartitionMap {
table: "data".into(),
partition_type: PartitionType::Hash {
column: "id".into(),
num_partitions: 5,
},
num_partitions: 5,
};
let all = map.all_partitions();
assert_eq!(all.len(), 5);
for (i, name) in all.iter().enumerate() {
assert_eq!(*name, format!("data__p_part_{}", i));
}
}
#[test]
fn test_fnv1a_hash_deterministic() {
let h1 = fnv1a_hash(b"hello");
let h2 = fnv1a_hash(b"hello");
assert_eq!(h1, h2, "FNV1a는 결정론적이어야 함");
let h3 = fnv1a_hash(b"world");
assert_ne!(h1, h3, "다른 입력은 다른 해시");
}
}
#[derive(Debug, Clone, Default)]
pub struct PartitionStats {
pub row_count: usize,
pub min_value: i64,
pub max_value: i64,
pub null_count: usize,
pub distinct_count: usize,
}
#[derive(Debug, Clone)]
pub struct PartitionLifecycle {
pub archive_after_days: u32,
pub delete_after_days: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PartitionTierHint {
#[default]
Hot,
Warm,
Cold,
}