ff_core/partition.rs
1//! Partition families, keys, and routing helpers.
2//!
3//! # Opaque wire keys (issue #91)
4//!
5//! Public wire surfaces carry [`PartitionKey`] — an opaque
6//! `#[serde(transparent)]` newtype over the hash-tag literal
7//! (`"{fp:7}"`) — rather than the internal [`Partition`] struct. The
8//! [`PartitionFamily`] enum (including its `Execution` / `Flow`
9//! alias) remains a public `ff-core` type — in-tree consumers that
10//! construct a [`Partition`] directly still name it — but it is no
11//! longer part of the HTTP / SDK wire DTOs, so external consumers
12//! never deserialize it off the wire.
13//!
14//! ## Migration note (0.2 → 0.3)
15//!
16//! The HTTP response body for `POST /v1/workers/{id}/claim` changed
17//! from
18//!
19//! ```json
20//! {
21//! "execution_id": "{fp:7}:...",
22//! "partition_family": "flow",
23//! "partition_index": 7,
24//! "grant_key": "...",
25//! "expires_at_ms": 0
26//! }
27//! ```
28//!
29//! to
30//!
31//! ```json
32//! {
33//! "execution_id": "{fp:7}:...",
34//! "partition_key": "{fp:7}",
35//! "grant_key": "...",
36//! "expires_at_ms": 0
37//! }
38//! ```
39//!
40//! Consumers that need the parsed family/index call
41//! [`PartitionKey::parse`] (or [`crate::contracts::ClaimGrant::partition`] /
42//! [`crate::contracts::ReclaimGrant::partition`]) — these return a
43//! typed [`Partition`] so routing code stays unchanged.
44//!
45//! The `Execution` family label collapses to `Flow` on round-trip
46//! (both produce `{fp:N}` under RFC-011 §11 co-location). Consumers
47//! that read `grant.partition()?.family` for LOGGING will see
48//! `Flow` where `Execution` previously appeared — cosmetically
49//! visible, routing-equivalent. See [`PartitionKey`] rustdoc for
50//! the full contract.
51
52use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, QuotaPolicyId};
53use serde::{Deserialize, Serialize};
54
55/// The partition families in FlowFabric.
56///
57/// Post-RFC-011: `Execution` and `Flow` are now routing **aliases** —
58/// both produce the same `{fp:N}` hash-tag, because execution keys
59/// co-locate with their parent flow's partition under hash-tag
60/// co-location. The `Execution` variant is **deliberately retained**
61/// per RFC-011 §11 Non-goals ("Deleting `PartitionFamily::Execution`
62/// from the public API. The variant stays for API compatibility; only
63/// its routing behaviour changes."), so downstream crates like
64/// cairn-fabric that construct `Partition { family: Execution, .. }`
65/// continue to compile and route correctly without source changes.
66///
67/// New FF-internal code should prefer `PartitionFamily::Flow` for
68/// clarity — the `Execution` alias exists solely to preserve the
69/// public-API contract promised by RFC-011. The logical distinction
70/// between exec-scoped and flow-scoped keys continues to live in the
71/// key-name prefix (`ff:exec:*` vs `ff:flow:*`), not in the
72/// `PartitionFamily` discriminator.
73#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum PartitionFamily {
76 /// Flow-structural + execution partition: `{fp:N}` — flow topology
77 /// and all per-execution keys co-located with their parent flow.
78 Flow,
79 /// Execution partition — **routing alias for [`PartitionFamily::Flow`]**
80 /// under RFC-011 co-location. Produces `{fp:N}` hash-tags identical to
81 /// `Flow` and indexes into `num_flow_partitions`. Kept as a distinct
82 /// variant per RFC-011 §11 for downstream-API compatibility (cairn-
83 /// fabric and other consumers that construct this variant directly).
84 Execution,
85 /// Budget partition: `{b:M}` — budget definitions and usage.
86 Budget,
87 /// Quota partition: `{q:K}` — quota policies and sliding windows.
88 Quota,
89}
90
91impl PartitionFamily {
92 /// Hash tag prefix for this family.
93 ///
94 /// `Flow` and `Execution` are aliases and both return `"fp"` — see
95 /// the enum-level rustdoc for the RFC-011 §11 compatibility rationale.
96 fn prefix(self) -> &'static str {
97 match self {
98 Self::Flow | Self::Execution => "fp",
99 Self::Budget => "b",
100 Self::Quota => "q",
101 }
102 }
103}
104
105/// Partition counts for each family. Fixed at deployment time.
106///
107/// Post-RFC-011: `num_execution_partitions` is retired. All execution
108/// keys route via `num_flow_partitions` (exec + flow share a slot under
109/// hash-tag co-location). Default bumped 64 → 256 to preserve today's
110/// total keyspace fanout.
111#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
112pub struct PartitionConfig {
113 pub num_flow_partitions: u16,
114 pub num_budget_partitions: u16,
115 pub num_quota_partitions: u16,
116}
117
118impl Default for PartitionConfig {
119 fn default() -> Self {
120 Self {
121 num_flow_partitions: 256,
122 num_budget_partitions: 32,
123 num_quota_partitions: 32,
124 }
125 }
126}
127
128impl PartitionConfig {
129 /// Get the partition count for a given family.
130 ///
131 /// `Flow` and `Execution` return the same value (`num_flow_partitions`)
132 /// — they are routing aliases under RFC-011 co-location.
133 pub fn count_for(&self, family: PartitionFamily) -> u16 {
134 match family {
135 PartitionFamily::Flow | PartitionFamily::Execution => self.num_flow_partitions,
136 PartitionFamily::Budget => self.num_budget_partitions,
137 PartitionFamily::Quota => self.num_quota_partitions,
138 }
139 }
140}
141
142/// A resolved partition within a family.
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
144pub struct Partition {
145 pub family: PartitionFamily,
146 pub index: u16,
147}
148
149impl Partition {
150 /// Returns the Valkey hash tag for this partition, e.g. `{fp:7}`, `{b:0}`.
151 pub fn hash_tag(&self) -> String {
152 format!("{{{prefix}:{index}}}", prefix = self.family.prefix(), index = self.index)
153 }
154}
155
156impl std::fmt::Display for Partition {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 write!(f, "{}", self.hash_tag())
159 }
160}
161
162/// Opaque routing handle for a partition, as exchanged on public wire
163/// surfaces.
164///
165/// `PartitionKey` is the hash-tag literal (`"{fp:7}"`, `"{b:0}"`, ...)
166/// wrapped in a `#[serde(transparent)]` newtype. It is the wire form
167/// of [`Partition`] on the public API — crates like `ff-server` and
168/// `ff-sdk` exchange `PartitionKey`, not [`Partition`], so the
169/// internal `PartitionFamily` enum (with its RFC-011 §11 alias
170/// between `Flow` and `Execution`) is never exposed on the wire.
171///
172/// # Opaque-ness contract
173///
174/// Consumers MUST treat a `PartitionKey` as opaque: pass it back to
175/// FlowFabric on subsequent calls, but do NOT parse the interior hash
176/// tag to make routing or policy decisions. Compatibility is only
177/// guaranteed for opaque round-tripping of keys PRODUCED by
178/// FlowFabric — consumers must not hand-construct hash-tag strings
179/// nor rely on non-canonical shapes being accepted. FlowFabric may
180/// narrow the accepted shape (e.g. hash-tag alphabet, length bounds)
181/// in future minor releases for producer-minted keys without a
182/// semver bump, because every such key still round-trips under the
183/// new rules. Consumers that need the parsed form call
184/// [`PartitionKey::parse`] / [`Self::as_partition`], which returns a
185/// typed error on malformed input.
186///
187/// # Round-trip + alias collapse
188///
189/// `From<&Partition> -> PartitionKey` is infallible (hash-tag
190/// construction never fails). `PartitionKey::parse` → `Partition` is
191/// fallible (rejects malformed keys). The round trip
192/// `Partition -> PartitionKey -> Partition` is **lossy for the
193/// `Execution` alias** — a `Partition { family: Execution, .. }`
194/// produces `"{fp:N}"`, which parses back to
195/// `Partition { family: Flow, .. }`. Routing and key construction are
196/// unaffected (both families emit identical hash tags under RFC-011
197/// §11), but consumers that read `grant.partition()?.family` for
198/// logging will see `Flow` where `Execution` previously appeared.
199#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
200#[serde(transparent)]
201pub struct PartitionKey(String);
202
203impl PartitionKey {
204 /// Return the underlying hash-tag literal (`"{fp:7}"`). Intended
205 /// for consumers that pass the key through to another FlowFabric
206 /// call; NOT for parsing out the partition family/index.
207 pub fn as_str(&self) -> &str {
208 &self.0
209 }
210
211 /// Parse a hash-tag literal into a typed [`Partition`]. Accepts
212 /// the exact shape produced by [`Partition::hash_tag`] (`{fp:N}`,
213 /// `{b:N}`, `{q:N}` where `N` is a `u16`).
214 pub fn parse(&self) -> Result<Partition, PartitionKeyParseError> {
215 Partition::try_from(self)
216 }
217
218 /// Convenience alias for [`Self::parse`]. Present so callers
219 /// reading a `ClaimGrant.partition()` helper can chain without
220 /// naming the conversion explicitly.
221 pub fn as_partition(&self) -> Result<Partition, PartitionKeyParseError> {
222 self.parse()
223 }
224}
225
226impl std::fmt::Display for PartitionKey {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 f.write_str(&self.0)
229 }
230}
231
232impl From<&Partition> for PartitionKey {
233 fn from(p: &Partition) -> Self {
234 Self(p.hash_tag())
235 }
236}
237
238impl From<Partition> for PartitionKey {
239 fn from(p: Partition) -> Self {
240 Self::from(&p)
241 }
242}
243
244/// Errors produced when parsing a [`PartitionKey`] back into a
245/// [`Partition`].
246///
247/// The key is treated as opaque on the wire; parsing failures surface
248/// as `Err` so malformed producer output fails loudly instead of
249/// silently routing to a ghost partition.
250#[derive(Clone, Debug, PartialEq, Eq)]
251pub enum PartitionKeyParseError {
252 /// Key did not start with `{` and end with `}` (e.g. missing braces).
253 MissingBraces,
254 /// Interior did not split into exactly `<prefix>:<index>`.
255 MalformedBody,
256 /// `<prefix>` did not match a known family (`fp`, `b`, `q`).
257 UnknownFamilyPrefix(String),
258 /// `<index>` did not parse as a `u16`.
259 InvalidIndex(String),
260}
261
262impl std::fmt::Display for PartitionKeyParseError {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 match self {
265 Self::MissingBraces => {
266 f.write_str("partition key must be wrapped in '{...}'")
267 }
268 Self::MalformedBody => {
269 f.write_str("partition key body must be '<prefix>:<index>'")
270 }
271 Self::UnknownFamilyPrefix(p) => {
272 write!(f, "unknown partition family prefix '{p}' (expected 'fp', 'b', 'q')")
273 }
274 Self::InvalidIndex(s) => {
275 write!(f, "partition index '{s}' is not a valid u16")
276 }
277 }
278 }
279}
280
281impl std::error::Error for PartitionKeyParseError {}
282
283impl TryFrom<&PartitionKey> for Partition {
284 type Error = PartitionKeyParseError;
285
286 fn try_from(key: &PartitionKey) -> Result<Self, Self::Error> {
287 let s = key.as_str();
288 let inner = s
289 .strip_prefix('{')
290 .and_then(|rest| rest.strip_suffix('}'))
291 .ok_or(PartitionKeyParseError::MissingBraces)?;
292 let (prefix, index_s) = inner
293 .split_once(':')
294 .ok_or(PartitionKeyParseError::MalformedBody)?;
295 let family = match prefix {
296 // `fp` collapses to `Flow`; the `Execution` alias is lost
297 // on round-trip by design (RFC-011 §11 — both produce the
298 // same hash tag, so routing is preserved; only the
299 // metadata-layer family label diverges).
300 "fp" => PartitionFamily::Flow,
301 "b" => PartitionFamily::Budget,
302 "q" => PartitionFamily::Quota,
303 other => {
304 return Err(PartitionKeyParseError::UnknownFamilyPrefix(
305 other.to_owned(),
306 ));
307 }
308 };
309 let index: u16 = index_s
310 .parse()
311 .map_err(|_| PartitionKeyParseError::InvalidIndex(index_s.to_owned()))?;
312 Ok(Partition { family, index })
313 }
314}
315
316/// Compute CRC16-CCITT of the given bytes, same algorithm as Valkey Cluster.
317fn crc16_ccitt(bytes: &[u8]) -> u16 {
318 crc16::State::<crc16::XMODEM>::calculate(bytes)
319}
320
321/// Compute the partition index for a UUID-based entity.
322/// Panics if `num_partitions` is 0 — this is a configuration error.
323fn partition_for_uuid(uuid_bytes: &[u8; 16], num_partitions: u16) -> u16 {
324 assert!(num_partitions > 0, "num_partitions must be > 0 (division by zero)");
325 crc16_ccitt(uuid_bytes) % num_partitions
326}
327
328/// Compute the partition for an execution ID.
329///
330/// Post-RFC-011: decodes the hash-tag prefix out of the id string; does NOT
331/// re-hash the UUID. The partition index is baked into the id at mint time
332/// via [`ExecutionId::for_flow`] / [`ExecutionId::solo`]. The `config` arg
333/// is retained for API symmetry with the other partition functions, but is
334/// unused — the exec id carries its partition intrinsically.
335pub fn execution_partition(eid: &ExecutionId, _config: &PartitionConfig) -> Partition {
336 // `Execution` family preserves exec-scoped semantics at the metadata
337 // layer (logs, tracing spans, metric labels) even though routing is
338 // aliased to `Flow`'s hash-tag prefix under RFC-011 co-location.
339 // See PartitionFamily enum rustdoc for the alias contract.
340 Partition {
341 family: PartitionFamily::Execution,
342 index: eid.partition(),
343 }
344}
345
346/// Compute the partition for a flow ID.
347pub fn flow_partition(fid: &FlowId, config: &PartitionConfig) -> Partition {
348 Partition {
349 family: PartitionFamily::Flow,
350 index: partition_for_uuid(fid.as_bytes(), config.num_flow_partitions),
351 }
352}
353
354/// Strategy for picking a solo execution's partition from its lane id.
355///
356/// RFC-011 §5.6 defines the birthday-paradox traffic-amplification
357/// mitigation: solo execs hash their lane id to a flow partition with
358/// crc16, which can collide with a flow's own partition. Operators
359/// that hit a persistent collision install a custom strategy at boot
360/// time via [`solo_partition_with`] instead of rebuilding `ff-server`.
361///
362/// The default impl [`Crc16SoloPartitioner`] matches the algorithm
363/// used by every other partition family. Replace only when a real
364/// collision surfaces via `ff-server admin partition-collisions`.
365pub trait SoloPartitioner: Send + Sync {
366 /// Return the partition index for a solo execution on the given lane.
367 ///
368 /// Must return a value in `0..config.num_flow_partitions`.
369 /// Must be deterministic — the same `(lane, config)` always produces
370 /// the same index (violated determinism → exec keys would route
371 /// differently on each mint, breaking all cross-lookup invariants).
372 fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16;
373}
374
375/// Default [`SoloPartitioner`]: `crc16_ccitt(lane_utf8) % num_flow_partitions`.
376///
377/// Matches the hashing used by [`flow_partition`], [`budget_partition`],
378/// [`quota_partition`] — same crc16-CCITT algorithm Valkey Cluster uses
379/// for slot assignment.
380#[derive(Clone, Copy, Debug, Default)]
381pub struct Crc16SoloPartitioner;
382
383impl SoloPartitioner for Crc16SoloPartitioner {
384 fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16 {
385 assert!(
386 config.num_flow_partitions > 0,
387 "num_flow_partitions must be > 0 (division by zero)"
388 );
389 crc16_ccitt(lane.as_str().as_bytes()) % config.num_flow_partitions
390 }
391}
392
393/// Compute the partition for a solo (flow-less) execution's lane shard.
394///
395/// Uses the default [`Crc16SoloPartitioner`]. Deployments that hit a
396/// traffic-amplification hotspot (per RFC-011 §5.6) should call
397/// [`solo_partition_with`] with a custom partitioner instead.
398pub fn solo_partition(lane: &LaneId, config: &PartitionConfig) -> Partition {
399 solo_partition_with(lane, config, &Crc16SoloPartitioner)
400}
401
402/// Compute the partition for a solo execution using a custom
403/// [`SoloPartitioner`] strategy.
404///
405/// The operator-facing escape hatch for RFC-011 §5.6 traffic-amplification
406/// collisions. A deployment that observes a collision via
407/// `ff-server admin partition-collisions` instantiates an alternate
408/// [`SoloPartitioner`] impl and routes solo mint paths through this
409/// function. The default [`Crc16SoloPartitioner`] is used by
410/// [`solo_partition`] and [`ExecutionId::solo`] — neither signature
411/// changes under this extension point.
412pub fn solo_partition_with(
413 lane: &LaneId,
414 config: &PartitionConfig,
415 partitioner: &dyn SoloPartitioner,
416) -> Partition {
417 // Solo execs are execution-scoped — preserve `Execution` family at
418 // the metadata layer. Routing is aliased to `Flow`'s `{fp:N}` tag
419 // under RFC-011 co-location; see PartitionFamily rustdoc for the
420 // alias contract.
421 Partition {
422 family: PartitionFamily::Execution,
423 index: partitioner.partition_for_lane(lane, config),
424 }
425}
426
427/// Compute the partition for a budget ID.
428pub fn budget_partition(bid: &BudgetId, config: &PartitionConfig) -> Partition {
429 Partition {
430 family: PartitionFamily::Budget,
431 index: partition_for_uuid(bid.as_bytes(), config.num_budget_partitions),
432 }
433}
434
435/// Compute the partition for a quota policy (by scope ID).
436pub fn quota_partition(qid: &QuotaPolicyId, config: &PartitionConfig) -> Partition {
437 Partition {
438 family: PartitionFamily::Quota,
439 index: partition_for_uuid(qid.as_bytes(), config.num_quota_partitions),
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446
447 #[test]
448 fn partition_hash_tag_format() {
449 let p = Partition { family: PartitionFamily::Flow, index: 7 };
450 assert_eq!(p.hash_tag(), "{fp:7}");
451
452 let p = Partition { family: PartitionFamily::Execution, index: 7 };
453 assert_eq!(p.hash_tag(), "{fp:7}", "Execution must alias Flow (RFC-011 §11)");
454
455 let p = Partition { family: PartitionFamily::Budget, index: 0 };
456 assert_eq!(p.hash_tag(), "{b:0}");
457
458 let p = Partition { family: PartitionFamily::Quota, index: 31 };
459 assert_eq!(p.hash_tag(), "{q:31}");
460 }
461
462 /// Execution and Flow are deliberate routing aliases post-RFC-011 §11.
463 /// This test pins the alias contract so a future edit that diverges the
464 /// two routes fails loudly rather than silently breaking co-location.
465 #[test]
466 fn execution_family_aliases_flow() {
467 // Same hash-tag at every index.
468 for index in [0u16, 1, 7, 42, 255, 65535] {
469 let flow = Partition { family: PartitionFamily::Flow, index };
470 let exec = Partition { family: PartitionFamily::Execution, index };
471 assert_eq!(
472 flow.hash_tag(),
473 exec.hash_tag(),
474 "Flow and Execution must produce identical hash-tags at index {index}"
475 );
476 }
477
478 // count_for returns the same value.
479 let config = PartitionConfig::default();
480 assert_eq!(
481 config.count_for(PartitionFamily::Flow),
482 config.count_for(PartitionFamily::Execution),
483 "count_for(Flow) == count_for(Execution) — both route via num_flow_partitions"
484 );
485
486 // A Partition minted with family=Execution produces the same
487 // hash-tag as one minted with family=Flow, given the same index.
488 // This is the key property cairn-fabric (and any other consumer
489 // that constructs `Partition { family: Execution, .. }`) depends on.
490 let p_exec = Partition { family: PartitionFamily::Execution, index: 42 };
491 let p_flow = Partition { family: PartitionFamily::Flow, index: 42 };
492 assert_eq!(p_exec.hash_tag(), p_flow.hash_tag());
493 assert_eq!(p_exec.hash_tag(), "{fp:42}");
494 }
495
496 #[test]
497 fn all_families_produce_distinct_tags() {
498 // Post-RFC-011: Flow and Execution deliberately share `{fp:N}` — see
499 // `execution_family_aliases_flow`. The three *distinct* tag spaces are
500 // {fp:N}, {b:N}, {q:N}. This test asserts that alias-after-collapse.
501 let tags: Vec<String> = [
502 PartitionFamily::Flow,
503 PartitionFamily::Budget,
504 PartitionFamily::Quota,
505 ]
506 .iter()
507 .map(|f| Partition { family: *f, index: 0 }.hash_tag())
508 .collect();
509 let unique: std::collections::HashSet<&String> = tags.iter().collect();
510 assert_eq!(unique.len(), 3, "flow/budget/quota must produce distinct hash tags");
511 }
512
513 #[test]
514 fn flow_partition_determinism() {
515 let config = PartitionConfig::default();
516 let fid = FlowId::new();
517 let p1 = flow_partition(&fid, &config);
518 let p2 = flow_partition(&fid, &config);
519 assert_eq!(p1, p2);
520 assert_eq!(p1.family, PartitionFamily::Flow);
521 assert!(p1.index < config.num_flow_partitions);
522 }
523
524 #[test]
525 fn budget_partition_determinism() {
526 let config = PartitionConfig::default();
527 let bid = BudgetId::new();
528 let p1 = budget_partition(&bid, &config);
529 let p2 = budget_partition(&bid, &config);
530 assert_eq!(p1, p2);
531 assert_eq!(p1.family, PartitionFamily::Budget);
532 assert!(p1.index < config.num_budget_partitions);
533 }
534
535 #[test]
536 fn default_config_values() {
537 let config = PartitionConfig::default();
538 assert_eq!(config.num_flow_partitions, 256);
539 assert_eq!(config.num_budget_partitions, 32);
540 assert_eq!(config.num_quota_partitions, 32);
541 }
542
543 // ── RFC-011 phase-1 tests ──
544
545 #[test]
546 fn execution_id_for_flow_determinism() {
547 let config = PartitionConfig::default();
548 let fid = FlowId::new();
549 let a = ExecutionId::for_flow(&fid, &config);
550 let b = ExecutionId::for_flow(&fid, &config);
551 // Same flow → same partition (UUID suffix differs).
552 assert_eq!(a.partition(), b.partition());
553 }
554
555 #[test]
556 fn execution_id_solo_determinism() {
557 let config = PartitionConfig::default();
558 let lane = LaneId::new("workers-a");
559 let a = ExecutionId::solo(&lane, &config);
560 let b = ExecutionId::solo(&lane, &config);
561 // Same lane → same partition.
562 assert_eq!(a.partition(), b.partition());
563 }
564
565 #[test]
566 fn execution_id_partition_matches_flow_partition() {
567 let config = PartitionConfig::default();
568 let fid = FlowId::new();
569 let eid = ExecutionId::for_flow(&fid, &config);
570 let fp = flow_partition(&fid, &config);
571 assert_eq!(eid.partition(), fp.index);
572 let ep = execution_partition(&eid, &config);
573 // Indices match (RFC-011 co-location) but family is Execution —
574 // preserves exec-scoped semantics at the metadata layer.
575 assert_eq!(ep.index, fp.index);
576 assert_eq!(ep.family, PartitionFamily::Execution);
577 // Alias contract: Execution and Flow produce identical hash-tags.
578 assert_eq!(ep.hash_tag(), fp.hash_tag());
579 }
580
581 #[test]
582 fn execution_partition_reads_hash_tag_not_uuid() {
583 // Construct an ExecutionId with a KNOWN partition (0) and a UUID
584 // whose crc16-partition lands somewhere else. execution_partition
585 // must pick the hash-tag (0), not re-hash the UUID.
586 let known_uuid = "550e8400-e29b-41d4-a716-446655440000";
587 let s = format!("{{fp:0}}:{known_uuid}");
588 let eid = ExecutionId::parse(&s).unwrap();
589 let config = PartitionConfig::default();
590 let p = execution_partition(&eid, &config);
591 assert_eq!(p.index, 0, "must read hash-tag, not re-hash UUID");
592 }
593
594 #[test]
595 fn execution_partition_ignores_config_value() {
596 // Safety test per manager ask #7 (edge case 3):
597 // An id minted in one config must decode to the same partition
598 // in any other config — the partition is baked into the id,
599 // not computed from the config.
600 let small = PartitionConfig { num_flow_partitions: 4, ..Default::default() };
601 let fid = FlowId::new();
602 let eid = ExecutionId::for_flow(&fid, &small);
603 let minted_partition = eid.partition();
604
605 // Decode via a different config — must match.
606 let big = PartitionConfig { num_flow_partitions: 1024, ..Default::default() };
607 let p = execution_partition(&eid, &big);
608 assert_eq!(
609 p.index, minted_partition,
610 "hash-tag is authoritative; config value must not change decoding"
611 );
612 }
613
614 #[test]
615 fn execution_id_parse_rejects_bare_uuid() {
616 let bare = "550e8400-e29b-41d4-a716-446655440000";
617 match ExecutionId::parse(bare) {
618 Err(crate::types::ExecutionIdParseError::MissingTag(_)) => {}
619 other => panic!("expected MissingTag, got {other:?}"),
620 }
621 }
622
623 #[test]
624 fn execution_id_parse_accepts_wellformed_shape() {
625 let s = "{fp:42}:550e8400-e29b-41d4-a716-446655440000";
626 let eid = ExecutionId::parse(s).unwrap();
627 assert_eq!(eid.partition(), 42);
628 assert_eq!(eid.as_str(), s);
629 }
630
631 #[test]
632 fn execution_id_parse_rejects_bad_partition_index() {
633 // Non-integer partition
634 match ExecutionId::parse("{fp:xx}:550e8400-e29b-41d4-a716-446655440000") {
635 Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
636 other => panic!("expected InvalidPartitionIndex, got {other:?}"),
637 }
638 // u16-overflow partition (65536)
639 match ExecutionId::parse("{fp:65536}:550e8400-e29b-41d4-a716-446655440000") {
640 Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
641 other => panic!("expected InvalidPartitionIndex for u16 overflow, got {other:?}"),
642 }
643 }
644
645 #[test]
646 fn execution_id_parse_rejects_bad_uuid() {
647 match ExecutionId::parse("{fp:0}:not-a-uuid") {
648 Err(crate::types::ExecutionIdParseError::InvalidUuid(_)) => {}
649 other => panic!("expected InvalidUuid, got {other:?}"),
650 }
651 }
652
653 #[test]
654 fn solo_partition_determinism() {
655 let config = PartitionConfig::default();
656 let lane = LaneId::new("workers-a");
657 let p1 = solo_partition(&lane, &config);
658 let p2 = solo_partition(&lane, &config);
659 assert_eq!(p1, p2);
660 // Solo execs are execution-scoped — family is Execution, routing
661 // is aliased to Flow's `{fp:N}` via PartitionFamily's prefix map.
662 assert_eq!(p1.family, PartitionFamily::Execution);
663 assert!(p1.index < config.num_flow_partitions);
664 }
665
666 #[test]
667 fn solo_partition_different_lanes_usually_differ() {
668 // Not strict — collisions are possible (and §5.6 acknowledges) —
669 // but over 100 distinct lane names we expect most to be distinct.
670 let config = PartitionConfig::default();
671 let mut seen = std::collections::HashSet::new();
672 for i in 0..100 {
673 let lane = LaneId::new(format!("lane-{i}"));
674 let p = solo_partition(&lane, &config);
675 seen.insert(p.index);
676 }
677 // At 256 partitions with 100 inputs, expect >50 distinct.
678 assert!(
679 seen.len() > 50,
680 "solo_partition distribution too narrow: only {} distinct of 100",
681 seen.len()
682 );
683 }
684
685 // ── SoloPartitioner trait (RFC-011 §5.6 mitigation #3) ──
686
687 #[test]
688 fn crc16_solo_partitioner_matches_legacy_behavior() {
689 // The default Crc16SoloPartitioner must produce the same index as
690 // the pre-trait solo_partition() function — otherwise installing
691 // the trait under an existing deployment would silently re-route
692 // every solo exec.
693 let config = PartitionConfig::default();
694 let lane = LaneId::new("workers-a");
695 let default_idx = Crc16SoloPartitioner.partition_for_lane(&lane, &config);
696 let expected = crc16::State::<crc16::XMODEM>::calculate(lane.as_str().as_bytes())
697 % config.num_flow_partitions;
698 assert_eq!(default_idx, expected);
699 }
700
701 #[test]
702 fn solo_partition_with_custom_partitioner_routes_through_trait() {
703 // Stub partitioner always returns index 0; a custom impl must
704 // override the default routing.
705 struct AlwaysZero;
706 impl SoloPartitioner for AlwaysZero {
707 fn partition_for_lane(&self, _lane: &LaneId, _config: &PartitionConfig) -> u16 {
708 0
709 }
710 }
711 let config = PartitionConfig::default();
712 let lane = LaneId::new("pick-me");
713 let p = solo_partition_with(&lane, &config, &AlwaysZero);
714 assert_eq!(p.index, 0);
715 // Solo execs → Execution family (aliases Flow for routing).
716 assert_eq!(p.family, PartitionFamily::Execution);
717 }
718
719 #[test]
720 fn solo_partition_default_matches_solo_partition_with_crc16() {
721 // solo_partition() and solo_partition_with(Crc16SoloPartitioner)
722 // must produce identical Partitions — this pins the default impl
723 // as the identity override.
724 let config = PartitionConfig::default();
725 let lane = LaneId::new("workers-b");
726 let default = solo_partition(&lane, &config);
727 let explicit = solo_partition_with(&lane, &config, &Crc16SoloPartitioner);
728 assert_eq!(default, explicit);
729 }
730
731 #[test]
732 fn execution_id_serde_via_deserialize_validates() {
733 // Valid shape deserialises.
734 let json = r#""{fp:0}:550e8400-e29b-41d4-a716-446655440000""#;
735 let eid: ExecutionId = serde_json::from_str(json).unwrap();
736 assert_eq!(eid.partition(), 0);
737
738 // Bare UUID fails Deserialize.
739 let bare = r#""550e8400-e29b-41d4-a716-446655440000""#;
740 assert!(serde_json::from_str::<ExecutionId>(bare).is_err());
741 }
742
743 // ── PartitionKey (issue #91) ──
744
745 #[test]
746 fn partition_key_from_partition_matches_hash_tag() {
747 for (family, expected_prefix) in [
748 (PartitionFamily::Flow, "fp"),
749 (PartitionFamily::Execution, "fp"),
750 (PartitionFamily::Budget, "b"),
751 (PartitionFamily::Quota, "q"),
752 ] {
753 let p = Partition { family, index: 42 };
754 let k: PartitionKey = (&p).into();
755 assert_eq!(k.as_str(), &format!("{{{expected_prefix}:42}}"));
756 assert_eq!(k.as_str(), p.hash_tag());
757 }
758 }
759
760 #[test]
761 fn partition_key_round_trip_flow_budget_quota() {
762 // Exact round-trip for non-alias families.
763 for family in [
764 PartitionFamily::Flow,
765 PartitionFamily::Budget,
766 PartitionFamily::Quota,
767 ] {
768 let p = Partition { family, index: 7 };
769 let k = PartitionKey::from(&p);
770 let back = k.parse().expect("must parse");
771 assert_eq!(back, p, "round-trip must be identity for {family:?}");
772 }
773 }
774
775 /// RFC-011 §11 alias collapse: `Execution` round-trips to `Flow`.
776 /// Routing is preserved (same hash tag, same `count_for`), only
777 /// the metadata-layer label differs. This test pins the documented
778 /// collapse so consumers can't quietly depend on `Execution`
779 /// surviving a wire hop.
780 #[test]
781 fn partition_key_collapses_execution_to_flow_on_parse() {
782 let p_exec = Partition { family: PartitionFamily::Execution, index: 3 };
783 let k = PartitionKey::from(&p_exec);
784 assert_eq!(k.as_str(), "{fp:3}");
785 let back = k.parse().expect("must parse");
786 // Alias collapse: index preserved, family normalises to Flow.
787 assert_eq!(back.family, PartitionFamily::Flow);
788 assert_eq!(back.index, 3);
789 // Routing equivalence: both produce identical hash tags.
790 assert_eq!(back.hash_tag(), p_exec.hash_tag());
791 }
792
793 #[test]
794 fn partition_key_serde_transparent() {
795 let p = Partition { family: PartitionFamily::Flow, index: 9 };
796 let k = PartitionKey::from(&p);
797 let json = serde_json::to_string(&k).unwrap();
798 assert_eq!(json, r#""{fp:9}""#, "must serialise as a bare string");
799 let parsed: PartitionKey = serde_json::from_str(&json).unwrap();
800 assert_eq!(parsed, k);
801 }
802
803 #[test]
804 fn partition_key_parse_rejects_missing_braces() {
805 let k = PartitionKey("fp:0".to_owned());
806 assert_eq!(k.parse(), Err(PartitionKeyParseError::MissingBraces));
807 }
808
809 #[test]
810 fn partition_key_parse_rejects_malformed_body() {
811 let k = PartitionKey("{fp0}".to_owned());
812 assert_eq!(k.parse(), Err(PartitionKeyParseError::MalformedBody));
813 }
814
815 #[test]
816 fn partition_key_parse_rejects_unknown_prefix() {
817 let k = PartitionKey("{zz:0}".to_owned());
818 match k.parse() {
819 Err(PartitionKeyParseError::UnknownFamilyPrefix(p)) => assert_eq!(p, "zz"),
820 other => panic!("expected UnknownFamilyPrefix, got {other:?}"),
821 }
822 }
823
824 #[test]
825 fn partition_key_parse_rejects_invalid_index() {
826 // Non-numeric
827 let k = PartitionKey("{fp:xx}".to_owned());
828 assert!(matches!(
829 k.parse(),
830 Err(PartitionKeyParseError::InvalidIndex(_))
831 ));
832 // u16 overflow
833 let k = PartitionKey("{fp:65536}".to_owned());
834 assert!(matches!(
835 k.parse(),
836 Err(PartitionKeyParseError::InvalidIndex(_))
837 ));
838 }
839}