use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, QuotaPolicyId};
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PartitionFamily {
Flow,
Execution,
Budget,
Quota,
}
impl PartitionFamily {
fn prefix(self) -> &'static str {
match self {
Self::Flow | Self::Execution => "fp",
Self::Budget => "b",
Self::Quota => "q",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PartitionConfig {
pub num_flow_partitions: u16,
pub num_budget_partitions: u16,
pub num_quota_partitions: u16,
}
impl Default for PartitionConfig {
fn default() -> Self {
Self {
num_flow_partitions: 256,
num_budget_partitions: 32,
num_quota_partitions: 32,
}
}
}
impl PartitionConfig {
pub fn count_for(&self, family: PartitionFamily) -> u16 {
match family {
PartitionFamily::Flow | PartitionFamily::Execution => self.num_flow_partitions,
PartitionFamily::Budget => self.num_budget_partitions,
PartitionFamily::Quota => self.num_quota_partitions,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct Partition {
pub family: PartitionFamily,
pub index: u16,
}
impl Partition {
pub fn hash_tag(&self) -> String {
format!("{{{prefix}:{index}}}", prefix = self.family.prefix(), index = self.index)
}
}
impl std::fmt::Display for Partition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.hash_tag())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct PartitionKey(String);
impl PartitionKey {
pub fn as_str(&self) -> &str {
&self.0
}
pub fn parse(&self) -> Result<Partition, PartitionKeyParseError> {
Partition::try_from(self)
}
pub fn as_partition(&self) -> Result<Partition, PartitionKeyParseError> {
self.parse()
}
}
impl std::fmt::Display for PartitionKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl From<&Partition> for PartitionKey {
fn from(p: &Partition) -> Self {
Self(p.hash_tag())
}
}
impl From<Partition> for PartitionKey {
fn from(p: Partition) -> Self {
Self::from(&p)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PartitionKeyParseError {
MissingBraces,
MalformedBody,
UnknownFamilyPrefix(String),
InvalidIndex(String),
}
impl std::fmt::Display for PartitionKeyParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingBraces => {
f.write_str("partition key must be wrapped in '{...}'")
}
Self::MalformedBody => {
f.write_str("partition key body must be '<prefix>:<index>'")
}
Self::UnknownFamilyPrefix(p) => {
write!(f, "unknown partition family prefix '{p}' (expected 'fp', 'b', 'q')")
}
Self::InvalidIndex(s) => {
write!(f, "partition index '{s}' is not a valid u16")
}
}
}
}
impl std::error::Error for PartitionKeyParseError {}
impl TryFrom<&PartitionKey> for Partition {
type Error = PartitionKeyParseError;
fn try_from(key: &PartitionKey) -> Result<Self, Self::Error> {
let s = key.as_str();
let inner = s
.strip_prefix('{')
.and_then(|rest| rest.strip_suffix('}'))
.ok_or(PartitionKeyParseError::MissingBraces)?;
let (prefix, index_s) = inner
.split_once(':')
.ok_or(PartitionKeyParseError::MalformedBody)?;
let family = match prefix {
"fp" => PartitionFamily::Flow,
"b" => PartitionFamily::Budget,
"q" => PartitionFamily::Quota,
other => {
return Err(PartitionKeyParseError::UnknownFamilyPrefix(
other.to_owned(),
));
}
};
let index: u16 = index_s
.parse()
.map_err(|_| PartitionKeyParseError::InvalidIndex(index_s.to_owned()))?;
Ok(Partition { family, index })
}
}
fn crc16_ccitt(bytes: &[u8]) -> u16 {
crc16::State::<crc16::XMODEM>::calculate(bytes)
}
fn partition_for_uuid(uuid_bytes: &[u8; 16], num_partitions: u16) -> u16 {
assert!(num_partitions > 0, "num_partitions must be > 0 (division by zero)");
crc16_ccitt(uuid_bytes) % num_partitions
}
pub fn execution_partition(eid: &ExecutionId, _config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Execution,
index: eid.partition(),
}
}
pub fn flow_partition(fid: &FlowId, config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Flow,
index: partition_for_uuid(fid.as_bytes(), config.num_flow_partitions),
}
}
pub trait SoloPartitioner: Send + Sync {
fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct Crc16SoloPartitioner;
impl SoloPartitioner for Crc16SoloPartitioner {
fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16 {
assert!(
config.num_flow_partitions > 0,
"num_flow_partitions must be > 0 (division by zero)"
);
crc16_ccitt(lane.as_str().as_bytes()) % config.num_flow_partitions
}
}
pub fn solo_partition(lane: &LaneId, config: &PartitionConfig) -> Partition {
solo_partition_with(lane, config, &Crc16SoloPartitioner)
}
pub fn solo_partition_with(
lane: &LaneId,
config: &PartitionConfig,
partitioner: &dyn SoloPartitioner,
) -> Partition {
Partition {
family: PartitionFamily::Execution,
index: partitioner.partition_for_lane(lane, config),
}
}
pub fn budget_partition(bid: &BudgetId, config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Budget,
index: partition_for_uuid(bid.as_bytes(), config.num_budget_partitions),
}
}
pub fn quota_partition(qid: &QuotaPolicyId, config: &PartitionConfig) -> Partition {
Partition {
family: PartitionFamily::Quota,
index: partition_for_uuid(qid.as_bytes(), config.num_quota_partitions),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn partition_hash_tag_format() {
let p = Partition { family: PartitionFamily::Flow, index: 7 };
assert_eq!(p.hash_tag(), "{fp:7}");
let p = Partition { family: PartitionFamily::Execution, index: 7 };
assert_eq!(p.hash_tag(), "{fp:7}", "Execution must alias Flow (RFC-011 §11)");
let p = Partition { family: PartitionFamily::Budget, index: 0 };
assert_eq!(p.hash_tag(), "{b:0}");
let p = Partition { family: PartitionFamily::Quota, index: 31 };
assert_eq!(p.hash_tag(), "{q:31}");
}
#[test]
fn execution_family_aliases_flow() {
for index in [0u16, 1, 7, 42, 255, 65535] {
let flow = Partition { family: PartitionFamily::Flow, index };
let exec = Partition { family: PartitionFamily::Execution, index };
assert_eq!(
flow.hash_tag(),
exec.hash_tag(),
"Flow and Execution must produce identical hash-tags at index {index}"
);
}
let config = PartitionConfig::default();
assert_eq!(
config.count_for(PartitionFamily::Flow),
config.count_for(PartitionFamily::Execution),
"count_for(Flow) == count_for(Execution) — both route via num_flow_partitions"
);
let p_exec = Partition { family: PartitionFamily::Execution, index: 42 };
let p_flow = Partition { family: PartitionFamily::Flow, index: 42 };
assert_eq!(p_exec.hash_tag(), p_flow.hash_tag());
assert_eq!(p_exec.hash_tag(), "{fp:42}");
}
#[test]
fn all_families_produce_distinct_tags() {
let tags: Vec<String> = [
PartitionFamily::Flow,
PartitionFamily::Budget,
PartitionFamily::Quota,
]
.iter()
.map(|f| Partition { family: *f, index: 0 }.hash_tag())
.collect();
let unique: std::collections::HashSet<&String> = tags.iter().collect();
assert_eq!(unique.len(), 3, "flow/budget/quota must produce distinct hash tags");
}
#[test]
fn flow_partition_determinism() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let p1 = flow_partition(&fid, &config);
let p2 = flow_partition(&fid, &config);
assert_eq!(p1, p2);
assert_eq!(p1.family, PartitionFamily::Flow);
assert!(p1.index < config.num_flow_partitions);
}
#[test]
fn budget_partition_determinism() {
let config = PartitionConfig::default();
let bid = BudgetId::new();
let p1 = budget_partition(&bid, &config);
let p2 = budget_partition(&bid, &config);
assert_eq!(p1, p2);
assert_eq!(p1.family, PartitionFamily::Budget);
assert!(p1.index < config.num_budget_partitions);
}
#[test]
fn default_config_values() {
let config = PartitionConfig::default();
assert_eq!(config.num_flow_partitions, 256);
assert_eq!(config.num_budget_partitions, 32);
assert_eq!(config.num_quota_partitions, 32);
}
#[test]
fn execution_id_for_flow_determinism() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let a = ExecutionId::for_flow(&fid, &config);
let b = ExecutionId::for_flow(&fid, &config);
assert_eq!(a.partition(), b.partition());
}
#[test]
fn execution_id_solo_determinism() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-a");
let a = ExecutionId::solo(&lane, &config);
let b = ExecutionId::solo(&lane, &config);
assert_eq!(a.partition(), b.partition());
}
#[test]
fn execution_id_partition_matches_flow_partition() {
let config = PartitionConfig::default();
let fid = FlowId::new();
let eid = ExecutionId::for_flow(&fid, &config);
let fp = flow_partition(&fid, &config);
assert_eq!(eid.partition(), fp.index);
let ep = execution_partition(&eid, &config);
assert_eq!(ep.index, fp.index);
assert_eq!(ep.family, PartitionFamily::Execution);
assert_eq!(ep.hash_tag(), fp.hash_tag());
}
#[test]
fn execution_partition_reads_hash_tag_not_uuid() {
let known_uuid = "550e8400-e29b-41d4-a716-446655440000";
let s = format!("{{fp:0}}:{known_uuid}");
let eid = ExecutionId::parse(&s).unwrap();
let config = PartitionConfig::default();
let p = execution_partition(&eid, &config);
assert_eq!(p.index, 0, "must read hash-tag, not re-hash UUID");
}
#[test]
fn execution_partition_ignores_config_value() {
let small = PartitionConfig { num_flow_partitions: 4, ..Default::default() };
let fid = FlowId::new();
let eid = ExecutionId::for_flow(&fid, &small);
let minted_partition = eid.partition();
let big = PartitionConfig { num_flow_partitions: 1024, ..Default::default() };
let p = execution_partition(&eid, &big);
assert_eq!(
p.index, minted_partition,
"hash-tag is authoritative; config value must not change decoding"
);
}
#[test]
fn execution_id_parse_rejects_bare_uuid() {
let bare = "550e8400-e29b-41d4-a716-446655440000";
match ExecutionId::parse(bare) {
Err(crate::types::ExecutionIdParseError::MissingTag(_)) => {}
other => panic!("expected MissingTag, got {other:?}"),
}
}
#[test]
fn execution_id_parse_accepts_wellformed_shape() {
let s = "{fp:42}:550e8400-e29b-41d4-a716-446655440000";
let eid = ExecutionId::parse(s).unwrap();
assert_eq!(eid.partition(), 42);
assert_eq!(eid.as_str(), s);
}
#[test]
fn execution_id_parse_rejects_bad_partition_index() {
match ExecutionId::parse("{fp:xx}:550e8400-e29b-41d4-a716-446655440000") {
Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
other => panic!("expected InvalidPartitionIndex, got {other:?}"),
}
match ExecutionId::parse("{fp:65536}:550e8400-e29b-41d4-a716-446655440000") {
Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
other => panic!("expected InvalidPartitionIndex for u16 overflow, got {other:?}"),
}
}
#[test]
fn execution_id_parse_rejects_bad_uuid() {
match ExecutionId::parse("{fp:0}:not-a-uuid") {
Err(crate::types::ExecutionIdParseError::InvalidUuid(_)) => {}
other => panic!("expected InvalidUuid, got {other:?}"),
}
}
#[test]
fn solo_partition_determinism() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-a");
let p1 = solo_partition(&lane, &config);
let p2 = solo_partition(&lane, &config);
assert_eq!(p1, p2);
assert_eq!(p1.family, PartitionFamily::Execution);
assert!(p1.index < config.num_flow_partitions);
}
#[test]
fn solo_partition_different_lanes_usually_differ() {
let config = PartitionConfig::default();
let mut seen = std::collections::HashSet::new();
for i in 0..100 {
let lane = LaneId::new(format!("lane-{i}"));
let p = solo_partition(&lane, &config);
seen.insert(p.index);
}
assert!(
seen.len() > 50,
"solo_partition distribution too narrow: only {} distinct of 100",
seen.len()
);
}
#[test]
fn crc16_solo_partitioner_matches_legacy_behavior() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-a");
let default_idx = Crc16SoloPartitioner.partition_for_lane(&lane, &config);
let expected = crc16::State::<crc16::XMODEM>::calculate(lane.as_str().as_bytes())
% config.num_flow_partitions;
assert_eq!(default_idx, expected);
}
#[test]
fn solo_partition_with_custom_partitioner_routes_through_trait() {
struct AlwaysZero;
impl SoloPartitioner for AlwaysZero {
fn partition_for_lane(&self, _lane: &LaneId, _config: &PartitionConfig) -> u16 {
0
}
}
let config = PartitionConfig::default();
let lane = LaneId::new("pick-me");
let p = solo_partition_with(&lane, &config, &AlwaysZero);
assert_eq!(p.index, 0);
assert_eq!(p.family, PartitionFamily::Execution);
}
#[test]
fn solo_partition_default_matches_solo_partition_with_crc16() {
let config = PartitionConfig::default();
let lane = LaneId::new("workers-b");
let default = solo_partition(&lane, &config);
let explicit = solo_partition_with(&lane, &config, &Crc16SoloPartitioner);
assert_eq!(default, explicit);
}
#[test]
fn execution_id_serde_via_deserialize_validates() {
let json = r#""{fp:0}:550e8400-e29b-41d4-a716-446655440000""#;
let eid: ExecutionId = serde_json::from_str(json).unwrap();
assert_eq!(eid.partition(), 0);
let bare = r#""550e8400-e29b-41d4-a716-446655440000""#;
assert!(serde_json::from_str::<ExecutionId>(bare).is_err());
}
#[test]
fn partition_key_from_partition_matches_hash_tag() {
for (family, expected_prefix) in [
(PartitionFamily::Flow, "fp"),
(PartitionFamily::Execution, "fp"),
(PartitionFamily::Budget, "b"),
(PartitionFamily::Quota, "q"),
] {
let p = Partition { family, index: 42 };
let k: PartitionKey = (&p).into();
assert_eq!(k.as_str(), &format!("{{{expected_prefix}:42}}"));
assert_eq!(k.as_str(), p.hash_tag());
}
}
#[test]
fn partition_key_round_trip_flow_budget_quota() {
for family in [
PartitionFamily::Flow,
PartitionFamily::Budget,
PartitionFamily::Quota,
] {
let p = Partition { family, index: 7 };
let k = PartitionKey::from(&p);
let back = k.parse().expect("must parse");
assert_eq!(back, p, "round-trip must be identity for {family:?}");
}
}
#[test]
fn partition_key_collapses_execution_to_flow_on_parse() {
let p_exec = Partition { family: PartitionFamily::Execution, index: 3 };
let k = PartitionKey::from(&p_exec);
assert_eq!(k.as_str(), "{fp:3}");
let back = k.parse().expect("must parse");
assert_eq!(back.family, PartitionFamily::Flow);
assert_eq!(back.index, 3);
assert_eq!(back.hash_tag(), p_exec.hash_tag());
}
#[test]
fn partition_key_serde_transparent() {
let p = Partition { family: PartitionFamily::Flow, index: 9 };
let k = PartitionKey::from(&p);
let json = serde_json::to_string(&k).unwrap();
assert_eq!(json, r#""{fp:9}""#, "must serialise as a bare string");
let parsed: PartitionKey = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, k);
}
#[test]
fn partition_key_parse_rejects_missing_braces() {
let k = PartitionKey("fp:0".to_owned());
assert_eq!(k.parse(), Err(PartitionKeyParseError::MissingBraces));
}
#[test]
fn partition_key_parse_rejects_malformed_body() {
let k = PartitionKey("{fp0}".to_owned());
assert_eq!(k.parse(), Err(PartitionKeyParseError::MalformedBody));
}
#[test]
fn partition_key_parse_rejects_unknown_prefix() {
let k = PartitionKey("{zz:0}".to_owned());
match k.parse() {
Err(PartitionKeyParseError::UnknownFamilyPrefix(p)) => assert_eq!(p, "zz"),
other => panic!("expected UnknownFamilyPrefix, got {other:?}"),
}
}
#[test]
fn partition_key_parse_rejects_invalid_index() {
let k = PartitionKey("{fp:xx}".to_owned());
assert!(matches!(
k.parse(),
Err(PartitionKeyParseError::InvalidIndex(_))
));
let k = PartitionKey("{fp:65536}".to_owned());
assert!(matches!(
k.parse(),
Err(PartitionKeyParseError::InvalidIndex(_))
));
}
}