Skip to main content

ff_core/
partition.rs

1//! Partition families, keys, and routing helpers.
2//!
3//! # Opaque wire keys (issue #91)
4//!
5//! Public wire surfaces carry [`PartitionKey`] — an opaque
6//! `#[serde(transparent)]` newtype over the hash-tag literal
7//! (`"{fp:7}"`) — rather than the internal [`Partition`] struct. The
8//! [`PartitionFamily`] enum (including its `Execution` / `Flow`
9//! alias) remains a public `ff-core` type — in-tree consumers that
10//! construct a [`Partition`] directly still name it — but it is no
11//! longer part of the HTTP / SDK wire DTOs, so external consumers
12//! never deserialize it off the wire.
13//!
14//! ## Migration note (0.2 → 0.3)
15//!
16//! The HTTP response body for `POST /v1/workers/{id}/claim` changed
17//! from
18//!
19//! ```json
20//! {
21//!   "execution_id": "{fp:7}:...",
22//!   "partition_family": "flow",
23//!   "partition_index": 7,
24//!   "grant_key": "...",
25//!   "expires_at_ms": 0
26//! }
27//! ```
28//!
29//! to
30//!
31//! ```json
32//! {
33//!   "execution_id": "{fp:7}:...",
34//!   "partition_key": "{fp:7}",
35//!   "grant_key": "...",
36//!   "expires_at_ms": 0
37//! }
38//! ```
39//!
40//! Consumers that need the parsed family/index call
41//! [`PartitionKey::parse`] (or [`crate::contracts::ClaimGrant::partition`] /
42//! [`crate::contracts::ReclaimGrant::partition`]) — these return a
43//! typed [`Partition`] so routing code stays unchanged.
44//!
45//! The `Execution` family label collapses to `Flow` on round-trip
46//! (both produce `{fp:N}` under RFC-011 §11 co-location). Consumers
47//! that read `grant.partition()?.family` for LOGGING will see
48//! `Flow` where `Execution` previously appeared — cosmetically
49//! visible, routing-equivalent. See [`PartitionKey`] rustdoc for
50//! the full contract.
51
52use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, QuotaPolicyId};
53use serde::{Deserialize, Serialize};
54
55/// The partition families in FlowFabric.
56///
57/// Post-RFC-011: `Execution` and `Flow` are now routing **aliases** —
58/// both produce the same `{fp:N}` hash-tag, because execution keys
59/// co-locate with their parent flow's partition under hash-tag
60/// co-location. The `Execution` variant is **deliberately retained**
61/// per RFC-011 §11 Non-goals ("Deleting `PartitionFamily::Execution`
62/// from the public API. The variant stays for API compatibility; only
63/// its routing behaviour changes."), so downstream crates like
64/// cairn-fabric that construct `Partition { family: Execution, .. }`
65/// continue to compile and route correctly without source changes.
66///
67/// New FF-internal code should prefer `PartitionFamily::Flow` for
68/// clarity — the `Execution` alias exists solely to preserve the
69/// public-API contract promised by RFC-011. The logical distinction
70/// between exec-scoped and flow-scoped keys continues to live in the
71/// key-name prefix (`ff:exec:*` vs `ff:flow:*`), not in the
72/// `PartitionFamily` discriminator.
73#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
74#[serde(rename_all = "snake_case")]
75pub enum PartitionFamily {
76    /// Flow-structural + execution partition: `{fp:N}` — flow topology
77    /// and all per-execution keys co-located with their parent flow.
78    Flow,
79    /// Execution partition — **routing alias for [`PartitionFamily::Flow`]**
80    /// under RFC-011 co-location. Produces `{fp:N}` hash-tags identical to
81    /// `Flow` and indexes into `num_flow_partitions`. Kept as a distinct
82    /// variant per RFC-011 §11 for downstream-API compatibility (cairn-
83    /// fabric and other consumers that construct this variant directly).
84    Execution,
85    /// Budget partition: `{b:M}` — budget definitions and usage.
86    Budget,
87    /// Quota partition: `{q:K}` — quota policies and sliding windows.
88    Quota,
89}
90
91impl PartitionFamily {
92    /// Hash tag prefix for this family.
93    ///
94    /// `Flow` and `Execution` are aliases and both return `"fp"` — see
95    /// the enum-level rustdoc for the RFC-011 §11 compatibility rationale.
96    fn prefix(self) -> &'static str {
97        match self {
98            Self::Flow | Self::Execution => "fp",
99            Self::Budget => "b",
100            Self::Quota => "q",
101        }
102    }
103}
104
105/// Partition counts for each family. Fixed at deployment time.
106///
107/// Post-RFC-011: `num_execution_partitions` is retired. All execution
108/// keys route via `num_flow_partitions` (exec + flow share a slot under
109/// hash-tag co-location). Default bumped 64 → 256 to preserve today's
110/// total keyspace fanout.
111#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
112pub struct PartitionConfig {
113    pub num_flow_partitions: u16,
114    pub num_budget_partitions: u16,
115    pub num_quota_partitions: u16,
116}
117
118impl Default for PartitionConfig {
119    fn default() -> Self {
120        Self {
121            num_flow_partitions: 256,
122            num_budget_partitions: 32,
123            num_quota_partitions: 32,
124        }
125    }
126}
127
128impl PartitionConfig {
129    /// Get the partition count for a given family.
130    ///
131    /// `Flow` and `Execution` return the same value (`num_flow_partitions`)
132    /// — they are routing aliases under RFC-011 co-location.
133    pub fn count_for(&self, family: PartitionFamily) -> u16 {
134        match family {
135            PartitionFamily::Flow | PartitionFamily::Execution => self.num_flow_partitions,
136            PartitionFamily::Budget => self.num_budget_partitions,
137            PartitionFamily::Quota => self.num_quota_partitions,
138        }
139    }
140}
141
142/// A resolved partition within a family.
143#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
144pub struct Partition {
145    pub family: PartitionFamily,
146    pub index: u16,
147}
148
149impl Partition {
150    /// Returns the Valkey hash tag for this partition, e.g. `{fp:7}`, `{b:0}`.
151    pub fn hash_tag(&self) -> String {
152        format!("{{{prefix}:{index}}}", prefix = self.family.prefix(), index = self.index)
153    }
154}
155
156impl std::fmt::Display for Partition {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        write!(f, "{}", self.hash_tag())
159    }
160}
161
162/// Opaque routing handle for a partition, as exchanged on public wire
163/// surfaces.
164///
165/// `PartitionKey` is the hash-tag literal (`"{fp:7}"`, `"{b:0}"`, ...)
166/// wrapped in a `#[serde(transparent)]` newtype. It is the wire form
167/// of [`Partition`] on the public API — crates like `ff-server` and
168/// `ff-sdk` exchange `PartitionKey`, not [`Partition`], so the
169/// internal `PartitionFamily` enum (with its RFC-011 §11 alias
170/// between `Flow` and `Execution`) is never exposed on the wire.
171///
172/// # Opaque-ness contract
173///
174/// Consumers MUST treat a `PartitionKey` as opaque: pass it back to
175/// FlowFabric on subsequent calls, but do NOT parse the interior hash
176/// tag to make routing or policy decisions. Compatibility is only
177/// guaranteed for opaque round-tripping of keys PRODUCED by
178/// FlowFabric — consumers must not hand-construct hash-tag strings
179/// nor rely on non-canonical shapes being accepted. FlowFabric may
180/// narrow the accepted shape (e.g. hash-tag alphabet, length bounds)
181/// in future minor releases for producer-minted keys without a
182/// semver bump, because every such key still round-trips under the
183/// new rules. Consumers that need the parsed form call
184/// [`PartitionKey::parse`] / [`Self::as_partition`], which returns a
185/// typed error on malformed input.
186///
187/// # Round-trip + alias collapse
188///
189/// `From<&Partition> -> PartitionKey` is infallible (hash-tag
190/// construction never fails). `PartitionKey::parse` → `Partition` is
191/// fallible (rejects malformed keys). The round trip
192/// `Partition -> PartitionKey -> Partition` is **lossy for the
193/// `Execution` alias** — a `Partition { family: Execution, .. }`
194/// produces `"{fp:N}"`, which parses back to
195/// `Partition { family: Flow, .. }`. Routing and key construction are
196/// unaffected (both families emit identical hash tags under RFC-011
197/// §11), but consumers that read `grant.partition()?.family` for
198/// logging will see `Flow` where `Execution` previously appeared.
199#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
200#[serde(transparent)]
201pub struct PartitionKey(String);
202
203impl PartitionKey {
204    /// Return the underlying hash-tag literal (`"{fp:7}"`). Intended
205    /// for consumers that pass the key through to another FlowFabric
206    /// call; NOT for parsing out the partition family/index.
207    pub fn as_str(&self) -> &str {
208        &self.0
209    }
210
211    /// Parse a hash-tag literal into a typed [`Partition`]. Accepts
212    /// the exact shape produced by [`Partition::hash_tag`] (`{fp:N}`,
213    /// `{b:N}`, `{q:N}` where `N` is a `u16`).
214    pub fn parse(&self) -> Result<Partition, PartitionKeyParseError> {
215        Partition::try_from(self)
216    }
217
218    /// Convenience alias for [`Self::parse`]. Present so callers
219    /// reading a `ClaimGrant.partition()` helper can chain without
220    /// naming the conversion explicitly.
221    pub fn as_partition(&self) -> Result<Partition, PartitionKeyParseError> {
222        self.parse()
223    }
224}
225
226impl std::fmt::Display for PartitionKey {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        f.write_str(&self.0)
229    }
230}
231
232impl From<&Partition> for PartitionKey {
233    fn from(p: &Partition) -> Self {
234        Self(p.hash_tag())
235    }
236}
237
238impl From<Partition> for PartitionKey {
239    fn from(p: Partition) -> Self {
240        Self::from(&p)
241    }
242}
243
244/// Errors produced when parsing a [`PartitionKey`] back into a
245/// [`Partition`].
246///
247/// The key is treated as opaque on the wire; parsing failures surface
248/// as `Err` so malformed producer output fails loudly instead of
249/// silently routing to a ghost partition.
250#[derive(Clone, Debug, PartialEq, Eq)]
251pub enum PartitionKeyParseError {
252    /// Key did not start with `{` and end with `}` (e.g. missing braces).
253    MissingBraces,
254    /// Interior did not split into exactly `<prefix>:<index>`.
255    MalformedBody,
256    /// `<prefix>` did not match a known family (`fp`, `b`, `q`).
257    UnknownFamilyPrefix(String),
258    /// `<index>` did not parse as a `u16`.
259    InvalidIndex(String),
260}
261
262impl std::fmt::Display for PartitionKeyParseError {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        match self {
265            Self::MissingBraces => {
266                f.write_str("partition key must be wrapped in '{...}'")
267            }
268            Self::MalformedBody => {
269                f.write_str("partition key body must be '<prefix>:<index>'")
270            }
271            Self::UnknownFamilyPrefix(p) => {
272                write!(f, "unknown partition family prefix '{p}' (expected 'fp', 'b', 'q')")
273            }
274            Self::InvalidIndex(s) => {
275                write!(f, "partition index '{s}' is not a valid u16")
276            }
277        }
278    }
279}
280
281impl std::error::Error for PartitionKeyParseError {}
282
283impl TryFrom<&PartitionKey> for Partition {
284    type Error = PartitionKeyParseError;
285
286    fn try_from(key: &PartitionKey) -> Result<Self, Self::Error> {
287        let s = key.as_str();
288        let inner = s
289            .strip_prefix('{')
290            .and_then(|rest| rest.strip_suffix('}'))
291            .ok_or(PartitionKeyParseError::MissingBraces)?;
292        let (prefix, index_s) = inner
293            .split_once(':')
294            .ok_or(PartitionKeyParseError::MalformedBody)?;
295        let family = match prefix {
296            // `fp` collapses to `Flow`; the `Execution` alias is lost
297            // on round-trip by design (RFC-011 §11 — both produce the
298            // same hash tag, so routing is preserved; only the
299            // metadata-layer family label diverges).
300            "fp" => PartitionFamily::Flow,
301            "b" => PartitionFamily::Budget,
302            "q" => PartitionFamily::Quota,
303            other => {
304                return Err(PartitionKeyParseError::UnknownFamilyPrefix(
305                    other.to_owned(),
306                ));
307            }
308        };
309        let index: u16 = index_s
310            .parse()
311            .map_err(|_| PartitionKeyParseError::InvalidIndex(index_s.to_owned()))?;
312        Ok(Partition { family, index })
313    }
314}
315
316/// Compute CRC16-CCITT of the given bytes, same algorithm as Valkey Cluster.
317fn crc16_ccitt(bytes: &[u8]) -> u16 {
318    crc16::State::<crc16::XMODEM>::calculate(bytes)
319}
320
321/// Compute the partition index for a UUID-based entity.
322/// Panics if `num_partitions` is 0 — this is a configuration error.
323fn partition_for_uuid(uuid_bytes: &[u8; 16], num_partitions: u16) -> u16 {
324    assert!(num_partitions > 0, "num_partitions must be > 0 (division by zero)");
325    crc16_ccitt(uuid_bytes) % num_partitions
326}
327
328/// Compute the partition for an execution ID.
329///
330/// Post-RFC-011: decodes the hash-tag prefix out of the id string; does NOT
331/// re-hash the UUID. The partition index is baked into the id at mint time
332/// via [`ExecutionId::for_flow`] / [`ExecutionId::solo`]. The `config` arg
333/// is retained for API symmetry with the other partition functions, but is
334/// unused — the exec id carries its partition intrinsically.
335pub fn execution_partition(eid: &ExecutionId, _config: &PartitionConfig) -> Partition {
336    // `Execution` family preserves exec-scoped semantics at the metadata
337    // layer (logs, tracing spans, metric labels) even though routing is
338    // aliased to `Flow`'s hash-tag prefix under RFC-011 co-location.
339    // See PartitionFamily enum rustdoc for the alias contract.
340    Partition {
341        family: PartitionFamily::Execution,
342        index: eid.partition(),
343    }
344}
345
346/// Compute the partition for a flow ID.
347pub fn flow_partition(fid: &FlowId, config: &PartitionConfig) -> Partition {
348    Partition {
349        family: PartitionFamily::Flow,
350        index: partition_for_uuid(fid.as_bytes(), config.num_flow_partitions),
351    }
352}
353
354/// Strategy for picking a solo execution's partition from its lane id.
355///
356/// RFC-011 §5.6 defines the birthday-paradox traffic-amplification
357/// mitigation: solo execs hash their lane id to a flow partition with
358/// crc16, which can collide with a flow's own partition. Operators
359/// that hit a persistent collision install a custom strategy at boot
360/// time via [`solo_partition_with`] instead of rebuilding `ff-server`.
361///
362/// The default impl [`Crc16SoloPartitioner`] matches the algorithm
363/// used by every other partition family. Replace only when a real
364/// collision surfaces via `ff-server admin partition-collisions`.
365pub trait SoloPartitioner: Send + Sync {
366    /// Return the partition index for a solo execution on the given lane.
367    ///
368    /// Must return a value in `0..config.num_flow_partitions`.
369    /// Must be deterministic — the same `(lane, config)` always produces
370    /// the same index (violated determinism → exec keys would route
371    /// differently on each mint, breaking all cross-lookup invariants).
372    fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16;
373}
374
375/// Default [`SoloPartitioner`]: `crc16_ccitt(lane_utf8) % num_flow_partitions`.
376///
377/// Matches the hashing used by [`flow_partition`], [`budget_partition`],
378/// [`quota_partition`] — same crc16-CCITT algorithm Valkey Cluster uses
379/// for slot assignment.
380#[derive(Clone, Copy, Debug, Default)]
381pub struct Crc16SoloPartitioner;
382
383impl SoloPartitioner for Crc16SoloPartitioner {
384    fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16 {
385        assert!(
386            config.num_flow_partitions > 0,
387            "num_flow_partitions must be > 0 (division by zero)"
388        );
389        crc16_ccitt(lane.as_str().as_bytes()) % config.num_flow_partitions
390    }
391}
392
393/// Compute the partition for a solo (flow-less) execution's lane shard.
394///
395/// Uses the default [`Crc16SoloPartitioner`]. Deployments that hit a
396/// traffic-amplification hotspot (per RFC-011 §5.6) should call
397/// [`solo_partition_with`] with a custom partitioner instead.
398pub fn solo_partition(lane: &LaneId, config: &PartitionConfig) -> Partition {
399    solo_partition_with(lane, config, &Crc16SoloPartitioner)
400}
401
402/// Compute the partition for a solo execution using a custom
403/// [`SoloPartitioner`] strategy.
404///
405/// The operator-facing escape hatch for RFC-011 §5.6 traffic-amplification
406/// collisions. A deployment that observes a collision via
407/// `ff-server admin partition-collisions` instantiates an alternate
408/// [`SoloPartitioner`] impl and routes solo mint paths through this
409/// function. The default [`Crc16SoloPartitioner`] is used by
410/// [`solo_partition`] and [`ExecutionId::solo`] — neither signature
411/// changes under this extension point.
412pub fn solo_partition_with(
413    lane: &LaneId,
414    config: &PartitionConfig,
415    partitioner: &dyn SoloPartitioner,
416) -> Partition {
417    // Solo execs are execution-scoped — preserve `Execution` family at
418    // the metadata layer. Routing is aliased to `Flow`'s `{fp:N}` tag
419    // under RFC-011 co-location; see PartitionFamily rustdoc for the
420    // alias contract.
421    Partition {
422        family: PartitionFamily::Execution,
423        index: partitioner.partition_for_lane(lane, config),
424    }
425}
426
427/// Compute the partition for a budget ID.
428pub fn budget_partition(bid: &BudgetId, config: &PartitionConfig) -> Partition {
429    Partition {
430        family: PartitionFamily::Budget,
431        index: partition_for_uuid(bid.as_bytes(), config.num_budget_partitions),
432    }
433}
434
435/// Compute the partition for a quota policy (by scope ID).
436pub fn quota_partition(qid: &QuotaPolicyId, config: &PartitionConfig) -> Partition {
437    Partition {
438        family: PartitionFamily::Quota,
439        index: partition_for_uuid(qid.as_bytes(), config.num_quota_partitions),
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    #[test]
448    fn partition_hash_tag_format() {
449        let p = Partition { family: PartitionFamily::Flow, index: 7 };
450        assert_eq!(p.hash_tag(), "{fp:7}");
451
452        let p = Partition { family: PartitionFamily::Execution, index: 7 };
453        assert_eq!(p.hash_tag(), "{fp:7}", "Execution must alias Flow (RFC-011 §11)");
454
455        let p = Partition { family: PartitionFamily::Budget, index: 0 };
456        assert_eq!(p.hash_tag(), "{b:0}");
457
458        let p = Partition { family: PartitionFamily::Quota, index: 31 };
459        assert_eq!(p.hash_tag(), "{q:31}");
460    }
461
462    /// Execution and Flow are deliberate routing aliases post-RFC-011 §11.
463    /// This test pins the alias contract so a future edit that diverges the
464    /// two routes fails loudly rather than silently breaking co-location.
465    #[test]
466    fn execution_family_aliases_flow() {
467        // Same hash-tag at every index.
468        for index in [0u16, 1, 7, 42, 255, 65535] {
469            let flow = Partition { family: PartitionFamily::Flow, index };
470            let exec = Partition { family: PartitionFamily::Execution, index };
471            assert_eq!(
472                flow.hash_tag(),
473                exec.hash_tag(),
474                "Flow and Execution must produce identical hash-tags at index {index}"
475            );
476        }
477
478        // count_for returns the same value.
479        let config = PartitionConfig::default();
480        assert_eq!(
481            config.count_for(PartitionFamily::Flow),
482            config.count_for(PartitionFamily::Execution),
483            "count_for(Flow) == count_for(Execution) — both route via num_flow_partitions"
484        );
485
486        // A Partition minted with family=Execution produces the same
487        // hash-tag as one minted with family=Flow, given the same index.
488        // This is the key property cairn-fabric (and any other consumer
489        // that constructs `Partition { family: Execution, .. }`) depends on.
490        let p_exec = Partition { family: PartitionFamily::Execution, index: 42 };
491        let p_flow = Partition { family: PartitionFamily::Flow, index: 42 };
492        assert_eq!(p_exec.hash_tag(), p_flow.hash_tag());
493        assert_eq!(p_exec.hash_tag(), "{fp:42}");
494    }
495
496    #[test]
497    fn all_families_produce_distinct_tags() {
498        // Post-RFC-011: Flow and Execution deliberately share `{fp:N}` — see
499        // `execution_family_aliases_flow`. The three *distinct* tag spaces are
500        // {fp:N}, {b:N}, {q:N}. This test asserts that alias-after-collapse.
501        let tags: Vec<String> = [
502            PartitionFamily::Flow,
503            PartitionFamily::Budget,
504            PartitionFamily::Quota,
505        ]
506        .iter()
507        .map(|f| Partition { family: *f, index: 0 }.hash_tag())
508        .collect();
509        let unique: std::collections::HashSet<&String> = tags.iter().collect();
510        assert_eq!(unique.len(), 3, "flow/budget/quota must produce distinct hash tags");
511    }
512
513    #[test]
514    fn flow_partition_determinism() {
515        let config = PartitionConfig::default();
516        let fid = FlowId::new();
517        let p1 = flow_partition(&fid, &config);
518        let p2 = flow_partition(&fid, &config);
519        assert_eq!(p1, p2);
520        assert_eq!(p1.family, PartitionFamily::Flow);
521        assert!(p1.index < config.num_flow_partitions);
522    }
523
524    #[test]
525    fn budget_partition_determinism() {
526        let config = PartitionConfig::default();
527        let bid = BudgetId::new();
528        let p1 = budget_partition(&bid, &config);
529        let p2 = budget_partition(&bid, &config);
530        assert_eq!(p1, p2);
531        assert_eq!(p1.family, PartitionFamily::Budget);
532        assert!(p1.index < config.num_budget_partitions);
533    }
534
535    #[test]
536    fn default_config_values() {
537        let config = PartitionConfig::default();
538        assert_eq!(config.num_flow_partitions, 256);
539        assert_eq!(config.num_budget_partitions, 32);
540        assert_eq!(config.num_quota_partitions, 32);
541    }
542
543    // ── RFC-011 phase-1 tests ──
544
545    #[test]
546    fn execution_id_for_flow_determinism() {
547        let config = PartitionConfig::default();
548        let fid = FlowId::new();
549        let a = ExecutionId::for_flow(&fid, &config);
550        let b = ExecutionId::for_flow(&fid, &config);
551        // Same flow → same partition (UUID suffix differs).
552        assert_eq!(a.partition(), b.partition());
553    }
554
555    #[test]
556    fn execution_id_solo_determinism() {
557        let config = PartitionConfig::default();
558        let lane = LaneId::new("workers-a");
559        let a = ExecutionId::solo(&lane, &config);
560        let b = ExecutionId::solo(&lane, &config);
561        // Same lane → same partition.
562        assert_eq!(a.partition(), b.partition());
563    }
564
565    #[test]
566    fn execution_id_partition_matches_flow_partition() {
567        let config = PartitionConfig::default();
568        let fid = FlowId::new();
569        let eid = ExecutionId::for_flow(&fid, &config);
570        let fp = flow_partition(&fid, &config);
571        assert_eq!(eid.partition(), fp.index);
572        let ep = execution_partition(&eid, &config);
573        // Indices match (RFC-011 co-location) but family is Execution —
574        // preserves exec-scoped semantics at the metadata layer.
575        assert_eq!(ep.index, fp.index);
576        assert_eq!(ep.family, PartitionFamily::Execution);
577        // Alias contract: Execution and Flow produce identical hash-tags.
578        assert_eq!(ep.hash_tag(), fp.hash_tag());
579    }
580
581    #[test]
582    fn execution_partition_reads_hash_tag_not_uuid() {
583        // Construct an ExecutionId with a KNOWN partition (0) and a UUID
584        // whose crc16-partition lands somewhere else. execution_partition
585        // must pick the hash-tag (0), not re-hash the UUID.
586        let known_uuid = "550e8400-e29b-41d4-a716-446655440000";
587        let s = format!("{{fp:0}}:{known_uuid}");
588        let eid = ExecutionId::parse(&s).unwrap();
589        let config = PartitionConfig::default();
590        let p = execution_partition(&eid, &config);
591        assert_eq!(p.index, 0, "must read hash-tag, not re-hash UUID");
592    }
593
594    #[test]
595    fn execution_partition_ignores_config_value() {
596        // Safety test per manager ask #7 (edge case 3):
597        // An id minted in one config must decode to the same partition
598        // in any other config — the partition is baked into the id,
599        // not computed from the config.
600        let small = PartitionConfig { num_flow_partitions: 4, ..Default::default() };
601        let fid = FlowId::new();
602        let eid = ExecutionId::for_flow(&fid, &small);
603        let minted_partition = eid.partition();
604
605        // Decode via a different config — must match.
606        let big = PartitionConfig { num_flow_partitions: 1024, ..Default::default() };
607        let p = execution_partition(&eid, &big);
608        assert_eq!(
609            p.index, minted_partition,
610            "hash-tag is authoritative; config value must not change decoding"
611        );
612    }
613
614    #[test]
615    fn execution_id_parse_rejects_bare_uuid() {
616        let bare = "550e8400-e29b-41d4-a716-446655440000";
617        match ExecutionId::parse(bare) {
618            Err(crate::types::ExecutionIdParseError::MissingTag(_)) => {}
619            other => panic!("expected MissingTag, got {other:?}"),
620        }
621    }
622
623    #[test]
624    fn execution_id_parse_accepts_wellformed_shape() {
625        let s = "{fp:42}:550e8400-e29b-41d4-a716-446655440000";
626        let eid = ExecutionId::parse(s).unwrap();
627        assert_eq!(eid.partition(), 42);
628        assert_eq!(eid.as_str(), s);
629    }
630
631    #[test]
632    fn execution_id_parse_rejects_bad_partition_index() {
633        // Non-integer partition
634        match ExecutionId::parse("{fp:xx}:550e8400-e29b-41d4-a716-446655440000") {
635            Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
636            other => panic!("expected InvalidPartitionIndex, got {other:?}"),
637        }
638        // u16-overflow partition (65536)
639        match ExecutionId::parse("{fp:65536}:550e8400-e29b-41d4-a716-446655440000") {
640            Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
641            other => panic!("expected InvalidPartitionIndex for u16 overflow, got {other:?}"),
642        }
643    }
644
645    #[test]
646    fn execution_id_parse_rejects_bad_uuid() {
647        match ExecutionId::parse("{fp:0}:not-a-uuid") {
648            Err(crate::types::ExecutionIdParseError::InvalidUuid(_)) => {}
649            other => panic!("expected InvalidUuid, got {other:?}"),
650        }
651    }
652
653    #[test]
654    fn solo_partition_determinism() {
655        let config = PartitionConfig::default();
656        let lane = LaneId::new("workers-a");
657        let p1 = solo_partition(&lane, &config);
658        let p2 = solo_partition(&lane, &config);
659        assert_eq!(p1, p2);
660        // Solo execs are execution-scoped — family is Execution, routing
661        // is aliased to Flow's `{fp:N}` via PartitionFamily's prefix map.
662        assert_eq!(p1.family, PartitionFamily::Execution);
663        assert!(p1.index < config.num_flow_partitions);
664    }
665
666    #[test]
667    fn solo_partition_different_lanes_usually_differ() {
668        // Not strict — collisions are possible (and §5.6 acknowledges) —
669        // but over 100 distinct lane names we expect most to be distinct.
670        let config = PartitionConfig::default();
671        let mut seen = std::collections::HashSet::new();
672        for i in 0..100 {
673            let lane = LaneId::new(format!("lane-{i}"));
674            let p = solo_partition(&lane, &config);
675            seen.insert(p.index);
676        }
677        // At 256 partitions with 100 inputs, expect >50 distinct.
678        assert!(
679            seen.len() > 50,
680            "solo_partition distribution too narrow: only {} distinct of 100",
681            seen.len()
682        );
683    }
684
685    // ── SoloPartitioner trait (RFC-011 §5.6 mitigation #3) ──
686
687    #[test]
688    fn crc16_solo_partitioner_matches_legacy_behavior() {
689        // The default Crc16SoloPartitioner must produce the same index as
690        // the pre-trait solo_partition() function — otherwise installing
691        // the trait under an existing deployment would silently re-route
692        // every solo exec.
693        let config = PartitionConfig::default();
694        let lane = LaneId::new("workers-a");
695        let default_idx = Crc16SoloPartitioner.partition_for_lane(&lane, &config);
696        let expected = crc16::State::<crc16::XMODEM>::calculate(lane.as_str().as_bytes())
697            % config.num_flow_partitions;
698        assert_eq!(default_idx, expected);
699    }
700
701    #[test]
702    fn solo_partition_with_custom_partitioner_routes_through_trait() {
703        // Stub partitioner always returns index 0; a custom impl must
704        // override the default routing.
705        struct AlwaysZero;
706        impl SoloPartitioner for AlwaysZero {
707            fn partition_for_lane(&self, _lane: &LaneId, _config: &PartitionConfig) -> u16 {
708                0
709            }
710        }
711        let config = PartitionConfig::default();
712        let lane = LaneId::new("pick-me");
713        let p = solo_partition_with(&lane, &config, &AlwaysZero);
714        assert_eq!(p.index, 0);
715        // Solo execs → Execution family (aliases Flow for routing).
716        assert_eq!(p.family, PartitionFamily::Execution);
717    }
718
719    #[test]
720    fn solo_partition_default_matches_solo_partition_with_crc16() {
721        // solo_partition() and solo_partition_with(Crc16SoloPartitioner)
722        // must produce identical Partitions — this pins the default impl
723        // as the identity override.
724        let config = PartitionConfig::default();
725        let lane = LaneId::new("workers-b");
726        let default = solo_partition(&lane, &config);
727        let explicit = solo_partition_with(&lane, &config, &Crc16SoloPartitioner);
728        assert_eq!(default, explicit);
729    }
730
731    #[test]
732    fn execution_id_serde_via_deserialize_validates() {
733        // Valid shape deserialises.
734        let json = r#""{fp:0}:550e8400-e29b-41d4-a716-446655440000""#;
735        let eid: ExecutionId = serde_json::from_str(json).unwrap();
736        assert_eq!(eid.partition(), 0);
737
738        // Bare UUID fails Deserialize.
739        let bare = r#""550e8400-e29b-41d4-a716-446655440000""#;
740        assert!(serde_json::from_str::<ExecutionId>(bare).is_err());
741    }
742
743    // ── PartitionKey (issue #91) ──
744
745    #[test]
746    fn partition_key_from_partition_matches_hash_tag() {
747        for (family, expected_prefix) in [
748            (PartitionFamily::Flow, "fp"),
749            (PartitionFamily::Execution, "fp"),
750            (PartitionFamily::Budget, "b"),
751            (PartitionFamily::Quota, "q"),
752        ] {
753            let p = Partition { family, index: 42 };
754            let k: PartitionKey = (&p).into();
755            assert_eq!(k.as_str(), &format!("{{{expected_prefix}:42}}"));
756            assert_eq!(k.as_str(), p.hash_tag());
757        }
758    }
759
760    #[test]
761    fn partition_key_round_trip_flow_budget_quota() {
762        // Exact round-trip for non-alias families.
763        for family in [
764            PartitionFamily::Flow,
765            PartitionFamily::Budget,
766            PartitionFamily::Quota,
767        ] {
768            let p = Partition { family, index: 7 };
769            let k = PartitionKey::from(&p);
770            let back = k.parse().expect("must parse");
771            assert_eq!(back, p, "round-trip must be identity for {family:?}");
772        }
773    }
774
775    /// RFC-011 §11 alias collapse: `Execution` round-trips to `Flow`.
776    /// Routing is preserved (same hash tag, same `count_for`), only
777    /// the metadata-layer label differs. This test pins the documented
778    /// collapse so consumers can't quietly depend on `Execution`
779    /// surviving a wire hop.
780    #[test]
781    fn partition_key_collapses_execution_to_flow_on_parse() {
782        let p_exec = Partition { family: PartitionFamily::Execution, index: 3 };
783        let k = PartitionKey::from(&p_exec);
784        assert_eq!(k.as_str(), "{fp:3}");
785        let back = k.parse().expect("must parse");
786        // Alias collapse: index preserved, family normalises to Flow.
787        assert_eq!(back.family, PartitionFamily::Flow);
788        assert_eq!(back.index, 3);
789        // Routing equivalence: both produce identical hash tags.
790        assert_eq!(back.hash_tag(), p_exec.hash_tag());
791    }
792
793    #[test]
794    fn partition_key_serde_transparent() {
795        let p = Partition { family: PartitionFamily::Flow, index: 9 };
796        let k = PartitionKey::from(&p);
797        let json = serde_json::to_string(&k).unwrap();
798        assert_eq!(json, r#""{fp:9}""#, "must serialise as a bare string");
799        let parsed: PartitionKey = serde_json::from_str(&json).unwrap();
800        assert_eq!(parsed, k);
801    }
802
803    #[test]
804    fn partition_key_parse_rejects_missing_braces() {
805        let k = PartitionKey("fp:0".to_owned());
806        assert_eq!(k.parse(), Err(PartitionKeyParseError::MissingBraces));
807    }
808
809    #[test]
810    fn partition_key_parse_rejects_malformed_body() {
811        let k = PartitionKey("{fp0}".to_owned());
812        assert_eq!(k.parse(), Err(PartitionKeyParseError::MalformedBody));
813    }
814
815    #[test]
816    fn partition_key_parse_rejects_unknown_prefix() {
817        let k = PartitionKey("{zz:0}".to_owned());
818        match k.parse() {
819            Err(PartitionKeyParseError::UnknownFamilyPrefix(p)) => assert_eq!(p, "zz"),
820            other => panic!("expected UnknownFamilyPrefix, got {other:?}"),
821        }
822    }
823
824    #[test]
825    fn partition_key_parse_rejects_invalid_index() {
826        // Non-numeric
827        let k = PartitionKey("{fp:xx}".to_owned());
828        assert!(matches!(
829            k.parse(),
830            Err(PartitionKeyParseError::InvalidIndex(_))
831        ));
832        // u16 overflow
833        let k = PartitionKey("{fp:65536}".to_owned());
834        assert!(matches!(
835            k.parse(),
836            Err(PartitionKeyParseError::InvalidIndex(_))
837        ));
838    }
839}