1use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, QuotaPolicyId};
2use serde::{Deserialize, Serialize};
3
4#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum PartitionFamily {
25 Flow,
28 Execution,
34 Budget,
36 Quota,
38}
39
40impl PartitionFamily {
41 fn prefix(self) -> &'static str {
46 match self {
47 Self::Flow | Self::Execution => "fp",
48 Self::Budget => "b",
49 Self::Quota => "q",
50 }
51 }
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct PartitionConfig {
62 pub num_flow_partitions: u16,
63 pub num_budget_partitions: u16,
64 pub num_quota_partitions: u16,
65}
66
67impl Default for PartitionConfig {
68 fn default() -> Self {
69 Self {
70 num_flow_partitions: 256,
71 num_budget_partitions: 32,
72 num_quota_partitions: 32,
73 }
74 }
75}
76
77impl PartitionConfig {
78 pub fn count_for(&self, family: PartitionFamily) -> u16 {
83 match family {
84 PartitionFamily::Flow | PartitionFamily::Execution => self.num_flow_partitions,
85 PartitionFamily::Budget => self.num_budget_partitions,
86 PartitionFamily::Quota => self.num_quota_partitions,
87 }
88 }
89}
90
91#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
93pub struct Partition {
94 pub family: PartitionFamily,
95 pub index: u16,
96}
97
98impl Partition {
99 pub fn hash_tag(&self) -> String {
101 format!("{{{prefix}:{index}}}", prefix = self.family.prefix(), index = self.index)
102 }
103}
104
105impl std::fmt::Display for Partition {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "{}", self.hash_tag())
108 }
109}
110
111fn crc16_ccitt(bytes: &[u8]) -> u16 {
113 crc16::State::<crc16::XMODEM>::calculate(bytes)
114}
115
116fn partition_for_uuid(uuid_bytes: &[u8; 16], num_partitions: u16) -> u16 {
119 assert!(num_partitions > 0, "num_partitions must be > 0 (division by zero)");
120 crc16_ccitt(uuid_bytes) % num_partitions
121}
122
123pub fn execution_partition(eid: &ExecutionId, _config: &PartitionConfig) -> Partition {
131 Partition {
136 family: PartitionFamily::Execution,
137 index: eid.partition(),
138 }
139}
140
141pub fn flow_partition(fid: &FlowId, config: &PartitionConfig) -> Partition {
143 Partition {
144 family: PartitionFamily::Flow,
145 index: partition_for_uuid(fid.as_bytes(), config.num_flow_partitions),
146 }
147}
148
149pub trait SoloPartitioner: Send + Sync {
161 fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16;
168}
169
170#[derive(Clone, Copy, Debug, Default)]
176pub struct Crc16SoloPartitioner;
177
178impl SoloPartitioner for Crc16SoloPartitioner {
179 fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16 {
180 assert!(
181 config.num_flow_partitions > 0,
182 "num_flow_partitions must be > 0 (division by zero)"
183 );
184 crc16_ccitt(lane.as_str().as_bytes()) % config.num_flow_partitions
185 }
186}
187
188pub fn solo_partition(lane: &LaneId, config: &PartitionConfig) -> Partition {
194 solo_partition_with(lane, config, &Crc16SoloPartitioner)
195}
196
197pub fn solo_partition_with(
208 lane: &LaneId,
209 config: &PartitionConfig,
210 partitioner: &dyn SoloPartitioner,
211) -> Partition {
212 Partition {
217 family: PartitionFamily::Execution,
218 index: partitioner.partition_for_lane(lane, config),
219 }
220}
221
222pub fn budget_partition(bid: &BudgetId, config: &PartitionConfig) -> Partition {
224 Partition {
225 family: PartitionFamily::Budget,
226 index: partition_for_uuid(bid.as_bytes(), config.num_budget_partitions),
227 }
228}
229
230pub fn quota_partition(qid: &QuotaPolicyId, config: &PartitionConfig) -> Partition {
232 Partition {
233 family: PartitionFamily::Quota,
234 index: partition_for_uuid(qid.as_bytes(), config.num_quota_partitions),
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn partition_hash_tag_format() {
244 let p = Partition { family: PartitionFamily::Flow, index: 7 };
245 assert_eq!(p.hash_tag(), "{fp:7}");
246
247 let p = Partition { family: PartitionFamily::Execution, index: 7 };
248 assert_eq!(p.hash_tag(), "{fp:7}", "Execution must alias Flow (RFC-011 §11)");
249
250 let p = Partition { family: PartitionFamily::Budget, index: 0 };
251 assert_eq!(p.hash_tag(), "{b:0}");
252
253 let p = Partition { family: PartitionFamily::Quota, index: 31 };
254 assert_eq!(p.hash_tag(), "{q:31}");
255 }
256
257 #[test]
261 fn execution_family_aliases_flow() {
262 for index in [0u16, 1, 7, 42, 255, 65535] {
264 let flow = Partition { family: PartitionFamily::Flow, index };
265 let exec = Partition { family: PartitionFamily::Execution, index };
266 assert_eq!(
267 flow.hash_tag(),
268 exec.hash_tag(),
269 "Flow and Execution must produce identical hash-tags at index {index}"
270 );
271 }
272
273 let config = PartitionConfig::default();
275 assert_eq!(
276 config.count_for(PartitionFamily::Flow),
277 config.count_for(PartitionFamily::Execution),
278 "count_for(Flow) == count_for(Execution) — both route via num_flow_partitions"
279 );
280
281 let p_exec = Partition { family: PartitionFamily::Execution, index: 42 };
286 let p_flow = Partition { family: PartitionFamily::Flow, index: 42 };
287 assert_eq!(p_exec.hash_tag(), p_flow.hash_tag());
288 assert_eq!(p_exec.hash_tag(), "{fp:42}");
289 }
290
291 #[test]
292 fn all_families_produce_distinct_tags() {
293 let tags: Vec<String> = [
297 PartitionFamily::Flow,
298 PartitionFamily::Budget,
299 PartitionFamily::Quota,
300 ]
301 .iter()
302 .map(|f| Partition { family: *f, index: 0 }.hash_tag())
303 .collect();
304 let unique: std::collections::HashSet<&String> = tags.iter().collect();
305 assert_eq!(unique.len(), 3, "flow/budget/quota must produce distinct hash tags");
306 }
307
308 #[test]
309 fn flow_partition_determinism() {
310 let config = PartitionConfig::default();
311 let fid = FlowId::new();
312 let p1 = flow_partition(&fid, &config);
313 let p2 = flow_partition(&fid, &config);
314 assert_eq!(p1, p2);
315 assert_eq!(p1.family, PartitionFamily::Flow);
316 assert!(p1.index < config.num_flow_partitions);
317 }
318
319 #[test]
320 fn budget_partition_determinism() {
321 let config = PartitionConfig::default();
322 let bid = BudgetId::new();
323 let p1 = budget_partition(&bid, &config);
324 let p2 = budget_partition(&bid, &config);
325 assert_eq!(p1, p2);
326 assert_eq!(p1.family, PartitionFamily::Budget);
327 assert!(p1.index < config.num_budget_partitions);
328 }
329
330 #[test]
331 fn default_config_values() {
332 let config = PartitionConfig::default();
333 assert_eq!(config.num_flow_partitions, 256);
334 assert_eq!(config.num_budget_partitions, 32);
335 assert_eq!(config.num_quota_partitions, 32);
336 }
337
338 #[test]
341 fn execution_id_for_flow_determinism() {
342 let config = PartitionConfig::default();
343 let fid = FlowId::new();
344 let a = ExecutionId::for_flow(&fid, &config);
345 let b = ExecutionId::for_flow(&fid, &config);
346 assert_eq!(a.partition(), b.partition());
348 }
349
350 #[test]
351 fn execution_id_solo_determinism() {
352 let config = PartitionConfig::default();
353 let lane = LaneId::new("workers-a");
354 let a = ExecutionId::solo(&lane, &config);
355 let b = ExecutionId::solo(&lane, &config);
356 assert_eq!(a.partition(), b.partition());
358 }
359
360 #[test]
361 fn execution_id_partition_matches_flow_partition() {
362 let config = PartitionConfig::default();
363 let fid = FlowId::new();
364 let eid = ExecutionId::for_flow(&fid, &config);
365 let fp = flow_partition(&fid, &config);
366 assert_eq!(eid.partition(), fp.index);
367 let ep = execution_partition(&eid, &config);
368 assert_eq!(ep.index, fp.index);
371 assert_eq!(ep.family, PartitionFamily::Execution);
372 assert_eq!(ep.hash_tag(), fp.hash_tag());
374 }
375
376 #[test]
377 fn execution_partition_reads_hash_tag_not_uuid() {
378 let known_uuid = "550e8400-e29b-41d4-a716-446655440000";
382 let s = format!("{{fp:0}}:{known_uuid}");
383 let eid = ExecutionId::parse(&s).unwrap();
384 let config = PartitionConfig::default();
385 let p = execution_partition(&eid, &config);
386 assert_eq!(p.index, 0, "must read hash-tag, not re-hash UUID");
387 }
388
389 #[test]
390 fn execution_partition_ignores_config_value() {
391 let small = PartitionConfig { num_flow_partitions: 4, ..Default::default() };
396 let fid = FlowId::new();
397 let eid = ExecutionId::for_flow(&fid, &small);
398 let minted_partition = eid.partition();
399
400 let big = PartitionConfig { num_flow_partitions: 1024, ..Default::default() };
402 let p = execution_partition(&eid, &big);
403 assert_eq!(
404 p.index, minted_partition,
405 "hash-tag is authoritative; config value must not change decoding"
406 );
407 }
408
409 #[test]
410 fn execution_id_parse_rejects_bare_uuid() {
411 let bare = "550e8400-e29b-41d4-a716-446655440000";
412 match ExecutionId::parse(bare) {
413 Err(crate::types::ExecutionIdParseError::MissingTag(_)) => {}
414 other => panic!("expected MissingTag, got {other:?}"),
415 }
416 }
417
418 #[test]
419 fn execution_id_parse_accepts_wellformed_shape() {
420 let s = "{fp:42}:550e8400-e29b-41d4-a716-446655440000";
421 let eid = ExecutionId::parse(s).unwrap();
422 assert_eq!(eid.partition(), 42);
423 assert_eq!(eid.as_str(), s);
424 }
425
426 #[test]
427 fn execution_id_parse_rejects_bad_partition_index() {
428 match ExecutionId::parse("{fp:xx}:550e8400-e29b-41d4-a716-446655440000") {
430 Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
431 other => panic!("expected InvalidPartitionIndex, got {other:?}"),
432 }
433 match ExecutionId::parse("{fp:65536}:550e8400-e29b-41d4-a716-446655440000") {
435 Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
436 other => panic!("expected InvalidPartitionIndex for u16 overflow, got {other:?}"),
437 }
438 }
439
440 #[test]
441 fn execution_id_parse_rejects_bad_uuid() {
442 match ExecutionId::parse("{fp:0}:not-a-uuid") {
443 Err(crate::types::ExecutionIdParseError::InvalidUuid(_)) => {}
444 other => panic!("expected InvalidUuid, got {other:?}"),
445 }
446 }
447
448 #[test]
449 fn solo_partition_determinism() {
450 let config = PartitionConfig::default();
451 let lane = LaneId::new("workers-a");
452 let p1 = solo_partition(&lane, &config);
453 let p2 = solo_partition(&lane, &config);
454 assert_eq!(p1, p2);
455 assert_eq!(p1.family, PartitionFamily::Execution);
458 assert!(p1.index < config.num_flow_partitions);
459 }
460
461 #[test]
462 fn solo_partition_different_lanes_usually_differ() {
463 let config = PartitionConfig::default();
466 let mut seen = std::collections::HashSet::new();
467 for i in 0..100 {
468 let lane = LaneId::new(format!("lane-{i}"));
469 let p = solo_partition(&lane, &config);
470 seen.insert(p.index);
471 }
472 assert!(
474 seen.len() > 50,
475 "solo_partition distribution too narrow: only {} distinct of 100",
476 seen.len()
477 );
478 }
479
480 #[test]
483 fn crc16_solo_partitioner_matches_legacy_behavior() {
484 let config = PartitionConfig::default();
489 let lane = LaneId::new("workers-a");
490 let default_idx = Crc16SoloPartitioner.partition_for_lane(&lane, &config);
491 let expected = crc16::State::<crc16::XMODEM>::calculate(lane.as_str().as_bytes())
492 % config.num_flow_partitions;
493 assert_eq!(default_idx, expected);
494 }
495
496 #[test]
497 fn solo_partition_with_custom_partitioner_routes_through_trait() {
498 struct AlwaysZero;
501 impl SoloPartitioner for AlwaysZero {
502 fn partition_for_lane(&self, _lane: &LaneId, _config: &PartitionConfig) -> u16 {
503 0
504 }
505 }
506 let config = PartitionConfig::default();
507 let lane = LaneId::new("pick-me");
508 let p = solo_partition_with(&lane, &config, &AlwaysZero);
509 assert_eq!(p.index, 0);
510 assert_eq!(p.family, PartitionFamily::Execution);
512 }
513
514 #[test]
515 fn solo_partition_default_matches_solo_partition_with_crc16() {
516 let config = PartitionConfig::default();
520 let lane = LaneId::new("workers-b");
521 let default = solo_partition(&lane, &config);
522 let explicit = solo_partition_with(&lane, &config, &Crc16SoloPartitioner);
523 assert_eq!(default, explicit);
524 }
525
526 #[test]
527 fn execution_id_serde_via_deserialize_validates() {
528 let json = r#""{fp:0}:550e8400-e29b-41d4-a716-446655440000""#;
530 let eid: ExecutionId = serde_json::from_str(json).unwrap();
531 assert_eq!(eid.partition(), 0);
532
533 let bare = r#""550e8400-e29b-41d4-a716-446655440000""#;
535 assert!(serde_json::from_str::<ExecutionId>(bare).is_err());
536 }
537}