use std::time::{SystemTime, UNIX_EPOCH};
use crate::supertable::{
error::{CommitError, ManifestError},
manifest::{SuperfileEntry, list::PartitionStrategy},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PartitionKey {
TimeRange(u64),
Hash(u32),
ColumnRange(u16),
}
pub fn encode_partition_key(key: &PartitionKey) -> Vec<u8> {
match key {
PartitionKey::TimeRange(b) => b.to_le_bytes().to_vec(),
PartitionKey::Hash(b) => b.to_le_bytes().to_vec(),
PartitionKey::ColumnRange(b) => b.to_le_bytes().to_vec(),
}
}
pub fn decode_partition_key(
bytes: &[u8],
strategy: &PartitionStrategy,
) -> Result<PartitionKey, CommitError> {
match strategy {
PartitionStrategy::TimeRange { .. } => {
let arr: [u8; 8] = bytes.try_into().map_err(|_| {
CommitError::PointerParse(format!(
"TimeRange partition_key must be 8 bytes; got {}",
bytes.len()
))
})?;
Ok(PartitionKey::TimeRange(u64::from_le_bytes(arr)))
}
PartitionStrategy::Hash { .. } => {
let arr: [u8; 4] = bytes.try_into().map_err(|_| {
CommitError::PointerParse(format!(
"Hash partition_key must be 4 bytes; got {}",
bytes.len()
))
})?;
Ok(PartitionKey::Hash(u32::from_le_bytes(arr)))
}
PartitionStrategy::ColumnRange { .. } => {
let arr: [u8; 2] = bytes.try_into().map_err(|_| {
CommitError::PointerParse(format!(
"ColumnRange partition_key must be 2 bytes; got {}",
bytes.len()
))
})?;
Ok(PartitionKey::ColumnRange(u16::from_le_bytes(arr)))
}
PartitionStrategy::IngestionTime { .. } => {
let arr: [u8; 8] = bytes.try_into().map_err(|_| {
CommitError::PointerParse(format!(
"IngestionTime partition_key must be 8 bytes; got {}",
bytes.len()
))
})?;
Ok(PartitionKey::TimeRange(u64::from_le_bytes(arr)))
}
}
}
pub fn assign_partition(
seg: &SuperfileEntry,
strategy: &PartitionStrategy,
) -> Result<PartitionKey, ManifestError> {
match strategy {
PartitionStrategy::TimeRange {
column,
granularity_secs,
} => {
if *granularity_secs <= 0 {
return Err(ManifestError::SuperfileSpansPartition {
detail: format!(
"TimeRange granularity_secs must be > 0; got {granularity_secs}"
),
});
}
let (min, max) = scalar_i64_minmax(seg, column)?;
let g = *granularity_secs;
let min_bucket = min.div_euclid(g);
let max_bucket = max.div_euclid(g);
if min_bucket != max_bucket {
return Err(ManifestError::SuperfileSpansPartition {
detail: format!(
"superfile {} column {column:?} [{min}, {max}] spans buckets \
{min_bucket}..={max_bucket}; reduce commit_threshold_size_mb \
or flush at granularity boundaries",
seg.uri.0
),
});
}
Ok(PartitionKey::TimeRange(min_bucket as u64))
}
PartitionStrategy::Hash {
column: _,
n_buckets,
} => {
if *n_buckets <= 1 {
return Ok(PartitionKey::Hash(0));
}
let bucket =
seg.partition_hint
.ok_or_else(|| ManifestError::SuperfileSpansPartition {
detail: format!(
"Hash{{n_buckets:{n_buckets}}} strategy requires pre-sharded \
superfiles; SuperfileEntry.partition_hint must be Some(bucket) \
(superfile {})",
seg.uri.0
),
})?;
if bucket >= *n_buckets {
return Err(ManifestError::SuperfileSpansPartition {
detail: format!(
"Hash{{n_buckets:{n_buckets}}} got partition_hint={bucket} \
(out of range)"
),
});
}
Ok(PartitionKey::Hash(bucket))
}
PartitionStrategy::ColumnRange {
column: _,
boundaries: _,
} => Err(ManifestError::SuperfileSpansPartition {
detail: "ColumnRange partition assignment lands in a follow-up; \
no writer currently emits ColumnRange-partitioned commits"
.into(),
}),
PartitionStrategy::IngestionTime { granularity_secs } => {
if *granularity_secs <= 0 {
return Err(ManifestError::SuperfileSpansPartition {
detail: format!(
"IngestionTime granularity_secs must be > 0; got {granularity_secs}"
),
});
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|_| ManifestError::SuperfileSpansPartition {
detail: "failed to get current system time".into(),
})?
.as_secs() as i64;
let g = *granularity_secs;
let bucket = now.div_euclid(g);
Ok(PartitionKey::TimeRange(bucket as u64))
}
}
}
fn scalar_i64_minmax(seg: &SuperfileEntry, column: &str) -> Result<(i64, i64), ManifestError> {
let agg =
seg.scalar_stats
.get(column)
.ok_or_else(|| ManifestError::SuperfileSpansPartition {
detail: format!(
"TimeRange strategy: superfile {} has no scalar_stats \
for column {column:?}",
seg.uri.0
),
})?;
let min = downcast_i64(agg.min.as_ref(), column, seg)?;
let max = downcast_i64(agg.max.as_ref(), column, seg)?;
Ok((min, max))
}
fn downcast_i64(
arr: &dyn arrow_array::Array,
column: &str,
seg: &SuperfileEntry,
) -> Result<i64, ManifestError> {
use arrow_array::*;
use arrow_schema::DataType;
if arr.is_empty() || arr.is_null(0) {
return Err(ManifestError::SuperfileSpansPartition {
detail: format!(
"TimeRange strategy: superfile {} column {column:?} stats array \
is empty or null at index 0",
seg.uri.0
),
});
}
let v = match arr.data_type() {
DataType::Int64 => arr
.as_any()
.downcast_ref::<Int64Array>()
.map(|a| a.value(0)),
DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => arr
.as_any()
.downcast_ref::<TimestampSecondArray>()
.map(|a| a.value(0)),
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => arr
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.map(|a| a.value(0)),
DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => arr
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.map(|a| a.value(0)),
DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.map(|a| a.value(0)),
other => {
return Err(ManifestError::SuperfileSpansPartition {
detail: format!(
"TimeRange strategy: superfile {} column {column:?} has \
unsupported type {other:?}; expected Int64 or Timestamp*",
seg.uri.0
),
});
}
};
v.ok_or_else(|| ManifestError::SuperfileSpansPartition {
detail: format!(
"TimeRange strategy: superfile {} column {column:?} downcast failed",
seg.uri.0
),
})
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow_array::{
ArrayRef, Int32Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use super::*;
use crate::supertable::manifest::{ScalarStatsAgg, SuperfileEntry, SuperfileUri};
fn empty_seg() -> SuperfileEntry {
SuperfileEntry {
superfile_id: uuid::Uuid::nil(),
uri: SuperfileUri(uuid::Uuid::nil()),
n_docs: 0,
id_min: 0,
id_max: 0,
scalar_stats: HashMap::new(),
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
}
}
fn seg_with_i64(column: &str, min: i64, max: i64) -> SuperfileEntry {
let mut s = empty_seg();
let mn: ArrayRef = Arc::new(Int64Array::from(vec![min]));
let mx: ArrayRef = Arc::new(Int64Array::from(vec![max]));
s.scalar_stats
.insert(column.to_string(), ScalarStatsAgg::from_min_max(mn, mx));
s
}
fn assert_spans_partition(err: ManifestError, needle: &str) {
match err {
ManifestError::SuperfileSpansPartition { detail } => assert!(
detail.contains(needle),
"expected `{needle}` in detail; got: {detail}"
),
other => panic!("expected SuperfileSpansPartition; got {other:?}"),
}
}
#[test]
fn encode_partition_key_time_range_emits_le_u64() {
let bytes = encode_partition_key(&PartitionKey::TimeRange(0x01_02_03_04_05_06_07_08));
assert_eq!(bytes.len(), 8);
assert_eq!(bytes, 0x01_02_03_04_05_06_07_08u64.to_le_bytes().to_vec());
}
#[test]
fn encode_partition_key_hash_emits_le_u32() {
let bytes = encode_partition_key(&PartitionKey::Hash(0xCAFEBABE));
assert_eq!(bytes.len(), 4);
assert_eq!(bytes, 0xCAFEBABEu32.to_le_bytes().to_vec());
}
#[test]
fn encode_partition_key_column_range_emits_le_u16() {
let bytes = encode_partition_key(&PartitionKey::ColumnRange(0xDEAD));
assert_eq!(bytes.len(), 2);
assert_eq!(bytes, 0xDEADu16.to_le_bytes().to_vec());
}
#[test]
fn decode_partition_key_round_trips_time_range() {
let original = PartitionKey::TimeRange(42);
let bytes = encode_partition_key(&original);
let strategy = PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 86_400,
};
let decoded = decode_partition_key(&bytes, &strategy).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn decode_partition_key_round_trips_hash() {
let original = PartitionKey::Hash(5);
let bytes = encode_partition_key(&original);
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 16,
};
let decoded = decode_partition_key(&bytes, &strategy).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn decode_partition_key_round_trips_column_range() {
let original = PartitionKey::ColumnRange(3);
let bytes = encode_partition_key(&original);
let strategy = PartitionStrategy::ColumnRange {
column: "_id".into(),
boundaries: vec![vec![]],
};
let decoded = decode_partition_key(&bytes, &strategy).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn decode_partition_key_rejects_wrong_size_time_range() {
let strategy = PartitionStrategy::TimeRange {
column: "_id".into(),
granularity_secs: 1,
};
let err = decode_partition_key(&[1, 2, 3], &strategy).expect_err("must error");
match err {
CommitError::PointerParse(msg) => {
assert!(msg.contains("TimeRange"), "{msg}");
assert!(msg.contains("8 bytes"), "{msg}");
}
other => panic!("got {other:?}"),
}
}
#[test]
fn decode_partition_key_rejects_wrong_size_hash() {
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 4,
};
let err = decode_partition_key(&[1, 2, 3], &strategy).expect_err("must error");
match err {
CommitError::PointerParse(msg) => {
assert!(msg.contains("Hash"), "{msg}");
assert!(msg.contains("4 bytes"), "{msg}");
}
other => panic!("got {other:?}"),
}
}
#[test]
fn decode_partition_key_rejects_wrong_size_column_range() {
let strategy = PartitionStrategy::ColumnRange {
column: "_id".into(),
boundaries: vec![vec![]],
};
let err = decode_partition_key(&[1, 2, 3], &strategy).expect_err("must error");
match err {
CommitError::PointerParse(msg) => {
assert!(msg.contains("ColumnRange"), "{msg}");
assert!(msg.contains("2 bytes"), "{msg}");
}
other => panic!("got {other:?}"),
}
}
#[test]
fn assign_partition_time_range_single_bucket() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
let seg = seg_with_i64("ts", 100, 80_000);
let key = assign_partition(&seg, &strategy).expect("assign");
assert_eq!(key, PartitionKey::TimeRange(0));
}
#[test]
fn assign_partition_time_range_rejects_superfile_spanning_buckets() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
let seg = seg_with_i64("ts", 100, 100_000);
let err = assign_partition(&seg, &strategy).expect_err("must span");
assert_spans_partition(err, "spans buckets");
}
#[test]
fn assign_partition_time_range_rejects_zero_granularity() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 0,
};
let seg = seg_with_i64("ts", 0, 0);
let err = assign_partition(&seg, &strategy).expect_err("must reject");
assert_spans_partition(err, "granularity_secs must be > 0");
}
#[test]
fn assign_partition_time_range_rejects_negative_granularity() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: -1,
};
let seg = seg_with_i64("ts", 0, 0);
let err = assign_partition(&seg, &strategy).expect_err("must reject");
assert_spans_partition(err, "granularity_secs must be > 0");
}
#[test]
fn assign_partition_time_range_rejects_missing_column_stats() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
let seg = empty_seg(); let err = assign_partition(&seg, &strategy).expect_err("missing stats");
assert_spans_partition(err, "no scalar_stats");
}
#[test]
fn assign_partition_time_range_supports_timestamp_columns() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
let cases: Vec<(ArrayRef, ArrayRef)> = vec![
(
Arc::new(TimestampSecondArray::from(vec![100])),
Arc::new(TimestampSecondArray::from(vec![200])),
),
(
Arc::new(TimestampMillisecondArray::from(vec![100])),
Arc::new(TimestampMillisecondArray::from(vec![200])),
),
(
Arc::new(TimestampMicrosecondArray::from(vec![100])),
Arc::new(TimestampMicrosecondArray::from(vec![200])),
),
(
Arc::new(TimestampNanosecondArray::from(vec![100])),
Arc::new(TimestampNanosecondArray::from(vec![200])),
),
];
for (mn, mx) in cases {
let mut seg = empty_seg();
seg.scalar_stats
.insert("ts".into(), ScalarStatsAgg::from_min_max(mn, mx));
let key = assign_partition(&seg, &strategy).expect("assign");
assert_eq!(key, PartitionKey::TimeRange(0));
}
}
#[test]
fn assign_partition_time_range_rejects_unsupported_column_type() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
let mut seg = empty_seg();
let mn: ArrayRef = Arc::new(Int32Array::from(vec![100]));
let mx: ArrayRef = Arc::new(Int32Array::from(vec![200]));
seg.scalar_stats
.insert("ts".into(), ScalarStatsAgg::from_min_max(mn, mx));
let err = assign_partition(&seg, &strategy).expect_err("unsupported");
assert_spans_partition(err, "unsupported type");
}
#[test]
fn assign_partition_time_range_rejects_null_stats_array() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 86_400,
};
let mut seg = empty_seg();
let nulls: Vec<Option<i64>> = vec![None];
let mn: ArrayRef = Arc::new(Int64Array::from(nulls.clone()));
let mx: ArrayRef = Arc::new(Int64Array::from(nulls));
seg.scalar_stats
.insert("ts".into(), ScalarStatsAgg::from_min_max(mn, mx));
let err = assign_partition(&seg, &strategy).expect_err("null stats");
assert_spans_partition(err, "empty or null at index 0");
}
#[test]
fn assign_partition_time_range_handles_negative_values_with_div_euclid() {
let strategy = PartitionStrategy::TimeRange {
column: "ts".into(),
granularity_secs: 10,
};
let seg = seg_with_i64("ts", -25, -21);
let key = assign_partition(&seg, &strategy).expect("assign");
assert_eq!(key, PartitionKey::TimeRange(-3i64 as u64));
}
#[test]
fn assign_partition_hash_single_bucket_short_circuits() {
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 1,
};
let seg = empty_seg();
let key = assign_partition(&seg, &strategy).expect("assign");
assert_eq!(key, PartitionKey::Hash(0));
}
#[test]
fn assign_partition_hash_zero_buckets_treats_as_one() {
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 0,
};
let seg = empty_seg();
let key = assign_partition(&seg, &strategy).expect("assign");
assert_eq!(key, PartitionKey::Hash(0));
}
#[test]
fn assign_partition_hash_uses_partition_hint() {
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 4,
};
let mut seg = empty_seg();
seg.partition_hint = Some(2);
let key = assign_partition(&seg, &strategy).expect("assign");
assert_eq!(key, PartitionKey::Hash(2));
}
#[test]
fn assign_partition_hash_requires_hint_when_multi_bucket() {
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 4,
};
let seg = empty_seg(); let err = assign_partition(&seg, &strategy).expect_err("must reject");
assert_spans_partition(err, "requires pre-sharded");
}
#[test]
fn assign_partition_hash_rejects_out_of_range_hint() {
let strategy = PartitionStrategy::Hash {
column: "_id".into(),
n_buckets: 4,
};
let mut seg = empty_seg();
seg.partition_hint = Some(4); let err = assign_partition(&seg, &strategy).expect_err("out of range");
assert_spans_partition(err, "out of range");
}
#[test]
fn assign_partition_column_range_is_not_yet_supported() {
let strategy = PartitionStrategy::ColumnRange {
column: "_id".into(),
boundaries: vec![vec![]],
};
let seg = empty_seg();
let err = assign_partition(&seg, &strategy).expect_err("not impl");
assert_spans_partition(err, "ColumnRange partition assignment lands");
}
}