use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, QuotaPolicyId};
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PartitionFamily {
Flow,
Execution,
Budget,
Quota,
}
impl PartitionFamily {
fn prefix(self) -> &'static str {
match self {
Self::Flow | Self::Execution => "fp",
Self::Budget => "b",
Self::Quota => "q",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PartitionConfig {
pub num_flow_partitions: u16,
pub num_budget_partitions: u16,
pub num_quota_partitions: u16,
}
impl Default for PartitionConfig {
fn default() -> Self {
Self {
num_flow_partitions: 256,
num_budget_partitions: 32,
num_quota_partitions: 32,
}
}
}
impl PartitionConfig {
pub fn count_for(&self, family: PartitionFamily) -> u16 {
match family {
PartitionFamily::Flow | PartitionFamily::Execution => self.num_flow_partitions,
PartitionFamily::Budget => self.num_budget_partitions,
PartitionFamily::Quota => self.num_quota_partitions,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct Partition {
pub family: PartitionFamily,
pub index: u16,
}
impl Partition {
pub fn hash_tag(&self) -> String {
format!("{{{prefix}:{index}}}", prefix = self.family.prefix(), index = self.index)
}
}
impl std::fmt::Display for Partition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.hash_tag())
}
}
fn crc16_ccitt(bytes: &[u8]) -> u16 {
crc16::State::<crc16::XMODEM>::calculate(bytes)
}
fn partition_for_uuid(uuid_bytes: &[u8; 16], num_partitions: u16) -> u16 {
assert!(num_partitions > 0, "num_partitions must be > 0 (division by zero)");
crc16_ccitt(uuid_bytes) % num_partitions
}
pub fn execution_partition(eid: &ExecutionId, _config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Execution,
index: eid.partition(),
}
}
pub fn flow_partition(fid: &FlowId, config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Flow,
index: partition_for_uuid(fid.as_bytes(), config.num_flow_partitions),
}
}
pub trait SoloPartitioner: Send + Sync {
fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Crc16SoloPartitioner;
impl SoloPartitioner for Crc16SoloPartitioner {
fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16 {
assert!(
config.num_flow_partitions > 0,
"num_flow_partitions must be > 0 (division by zero)"
);
crc16_ccitt(lane.as_str().as_bytes()) % config.num_flow_partitions
}
}
pub fn solo_partition(lane: &LaneId, config: &PartitionConfig) -> Partition {
solo_partition_with(lane, config, &Crc16SoloPartitioner)
}
pub fn solo_partition_with(
lane: &LaneId,
config: &PartitionConfig,
partitioner: &dyn SoloPartitioner,
) -> Partition {
Partition {
family: PartitionFamily::Execution,
index: partitioner.partition_for_lane(lane, config),
}
}
pub fn budget_partition(bid: &BudgetId, config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Budget,
index: partition_for_uuid(bid.as_bytes(), config.num_budget_partitions),
}
}
pub fn quota_partition(qid: &QuotaPolicyId, config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Quota,
index: partition_for_uuid(qid.as_bytes(), config.num_quota_partitions),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn partition_hash_tag_format() {
let p = Partition { family: PartitionFamily::Flow, index: 7 };
assert_eq!(p.hash_tag(), "{fp:7}");
let p = Partition { family: PartitionFamily::Execution, index: 7 };
assert_eq!(p.hash_tag(), "{fp:7}", "Execution must alias Flow (RFC-011 §11)");
let p = Partition { family: PartitionFamily::Budget, index: 0 };
assert_eq!(p.hash_tag(), "{b:0}");
let p = Partition { family: PartitionFamily::Quota, index: 31 };
assert_eq!(p.hash_tag(), "{q:31}");
}
#[test]
fn execution_family_aliases_flow() {
for index in [0u16, 1, 7, 42, 255, 65535] {
let flow = Partition { family: PartitionFamily::Flow, index };
let exec = Partition { family: PartitionFamily::Execution, index };
assert_eq!(
flow.hash_tag(),
exec.hash_tag(),
"Flow and Execution must produce identical hash-tags at index {index}"
);
}
let config = PartitionConfig::default();
assert_eq!(
config.count_for(PartitionFamily::Flow),
config.count_for(PartitionFamily::Execution),
"count_for(Flow) == count_for(Execution) — both route via num_flow_partitions"
);
let p_exec = Partition { family: PartitionFamily::Execution, index: 42 };
let p_flow = Partition { family: PartitionFamily::Flow, index: 42 };
assert_eq!(p_exec.hash_tag(), p_flow.hash_tag());
assert_eq!(p_exec.hash_tag(), "{fp:42}");
}
#[test]
fn all_families_produce_distinct_tags() {
let tags: Vec<String> = [
PartitionFamily::Flow,
PartitionFamily::Budget,
PartitionFamily::Quota,
]
.iter()
.map(|f| Partition { family: *f, index: 0 }.hash_tag())
.collect();
let unique: std::collections::HashSet<&String> = tags.iter().collect();
assert_eq!(unique.len(), 3, "flow/budget/quota must produce distinct hash tags");
}
#[test]
fn flow_partition_determinism() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let p1 = flow_partition(&fid, &config);
let p2 = flow_partition(&fid, &config);
assert_eq!(p1, p2);
assert_eq!(p1.family, PartitionFamily::Flow);
assert!(p1.index < config.num_flow_partitions);
}
#[test]
fn budget_partition_determinism() {
let config = PartitionConfig::default();
let bid = BudgetId::new();
let p1 = budget_partition(&bid, &config);
let p2 = budget_partition(&bid, &config);
assert_eq!(p1, p2);
assert_eq!(p1.family, PartitionFamily::Budget);
assert!(p1.index < config.num_budget_partitions);
}
#[test]
fn default_config_values() {
let config = PartitionConfig::default();
assert_eq!(config.num_flow_partitions, 256);
assert_eq!(config.num_budget_partitions, 32);
assert_eq!(config.num_quota_partitions, 32);
}
#[test]
fn execution_id_for_flow_determinism() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let a = ExecutionId::for_flow(&fid, &config);
let b = ExecutionId::for_flow(&fid, &config);
assert_eq!(a.partition(), b.partition());
}
#[test]
fn execution_id_solo_determinism() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-a");
let a = ExecutionId::solo(&lane, &config);
let b = ExecutionId::solo(&lane, &config);
assert_eq!(a.partition(), b.partition());
}
#[test]
fn execution_id_partition_matches_flow_partition() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let eid = ExecutionId::for_flow(&fid, &config);
let fp = flow_partition(&fid, &config);
assert_eq!(eid.partition(), fp.index);
let ep = execution_partition(&eid, &config);
assert_eq!(ep.index, fp.index);
assert_eq!(ep.family, PartitionFamily::Execution);
assert_eq!(ep.hash_tag(), fp.hash_tag());
}
#[test]
fn execution_partition_reads_hash_tag_not_uuid() {
let known_uuid = "550e8400-e29b-41d4-a716-446655440000";
let s = format!("{{fp:0}}:{known_uuid}");
let eid = ExecutionId::parse(&s).unwrap();
let config = PartitionConfig::default();
let p = execution_partition(&eid, &config);
assert_eq!(p.index, 0, "must read hash-tag, not re-hash UUID");
}
#[test]
fn execution_partition_ignores_config_value() {
let small = PartitionConfig { num_flow_partitions: 4, ..Default::default() };
let fid = FlowId::new();
let eid = ExecutionId::for_flow(&fid, &small);
let minted_partition = eid.partition();
let big = PartitionConfig { num_flow_partitions: 1024, ..Default::default() };
let p = execution_partition(&eid, &big);
assert_eq!(
p.index, minted_partition,
"hash-tag is authoritative; config value must not change decoding"
);
}
#[test]
fn execution_id_parse_rejects_bare_uuid() {
let bare = "550e8400-e29b-41d4-a716-446655440000";
match ExecutionId::parse(bare) {
Err(crate::types::ExecutionIdParseError::MissingTag(_)) => {}
other => panic!("expected MissingTag, got {other:?}"),
}
}
#[test]
fn execution_id_parse_accepts_wellformed_shape() {
let s = "{fp:42}:550e8400-e29b-41d4-a716-446655440000";
let eid = ExecutionId::parse(s).unwrap();
assert_eq!(eid.partition(), 42);
assert_eq!(eid.as_str(), s);
}
#[test]
fn execution_id_parse_rejects_bad_partition_index() {
match ExecutionId::parse("{fp:xx}:550e8400-e29b-41d4-a716-446655440000") {
Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
other => panic!("expected InvalidPartitionIndex, got {other:?}"),
}
match ExecutionId::parse("{fp:65536}:550e8400-e29b-41d4-a716-446655440000") {
Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
other => panic!("expected InvalidPartitionIndex for u16 overflow, got {other:?}"),
}
}
#[test]
fn execution_id_parse_rejects_bad_uuid() {
match ExecutionId::parse("{fp:0}:not-a-uuid") {
Err(crate::types::ExecutionIdParseError::InvalidUuid(_)) => {}
other => panic!("expected InvalidUuid, got {other:?}"),
}
}
#[test]
fn solo_partition_determinism() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-a");
let p1 = solo_partition(&lane, &config);
let p2 = solo_partition(&lane, &config);
assert_eq!(p1, p2);
assert_eq!(p1.family, PartitionFamily::Execution);
assert!(p1.index < config.num_flow_partitions);
}
#[test]
fn solo_partition_different_lanes_usually_differ() {
let config = PartitionConfig::default();
let mut seen = std::collections::HashSet::new();
for i in 0..100 {
let lane = LaneId::new(format!("lane-{i}"));
let p = solo_partition(&lane, &config);
seen.insert(p.index);
}
assert!(
seen.len() > 50,
"solo_partition distribution too narrow: only {} distinct of 100",
seen.len()
);
}
#[test]
fn crc16_solo_partitioner_matches_legacy_behavior() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-a");
let default_idx = Crc16SoloPartitioner.partition_for_lane(&lane, &config);
let expected = crc16::State::<crc16::XMODEM>::calculate(lane.as_str().as_bytes())
% config.num_flow_partitions;
assert_eq!(default_idx, expected);
}
#[test]
fn solo_partition_with_custom_partitioner_routes_through_trait() {
struct AlwaysZero;
impl SoloPartitioner for AlwaysZero {
fn partition_for_lane(&self, _lane: &LaneId, _config: &PartitionConfig) -> u16 {
0
}
}
let config = PartitionConfig::default();
let lane = LaneId::new("pick-me");
let p = solo_partition_with(&lane, &config, &AlwaysZero);
assert_eq!(p.index, 0);
assert_eq!(p.family, PartitionFamily::Execution);
}
#[test]
fn solo_partition_default_matches_solo_partition_with_crc16() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-b");
let default = solo_partition(&lane, &config);
let explicit = solo_partition_with(&lane, &config, &Crc16SoloPartitioner);
assert_eq!(default, explicit);
}
#[test]
fn execution_id_serde_via_deserialize_validates() {
let json = r#""{fp:0}:550e8400-e29b-41d4-a716-446655440000""#;
let eid: ExecutionId = serde_json::from_str(json).unwrap();
assert_eq!(eid.partition(), 0);
let bare = r#""550e8400-e29b-41d4-a716-446655440000""#;
assert!(serde_json::from_str::<ExecutionId>(bare).is_err());
}
}