Skip to main content

ff_core/
partition.rs

1use crate::types::{BudgetId, ExecutionId, FlowId, LaneId, QuotaPolicyId};
2use serde::{Deserialize, Serialize};
3
4/// The partition families in FlowFabric.
5///
6/// Post-RFC-011: `Execution` and `Flow` are now routing **aliases** —
7/// both produce the same `{fp:N}` hash-tag, because execution keys
8/// co-locate with their parent flow's partition under hash-tag
9/// co-location. The `Execution` variant is **deliberately retained**
10/// per RFC-011 §11 Non-goals ("Deleting `PartitionFamily::Execution`
11/// from the public API. The variant stays for API compatibility; only
12/// its routing behaviour changes."), so downstream crates like
13/// cairn-fabric that construct `Partition { family: Execution, .. }`
14/// continue to compile and route correctly without source changes.
15///
16/// New FF-internal code should prefer `PartitionFamily::Flow` for
17/// clarity — the `Execution` alias exists solely to preserve the
18/// public-API contract promised by RFC-011. The logical distinction
19/// between exec-scoped and flow-scoped keys continues to live in the
20/// key-name prefix (`ff:exec:*` vs `ff:flow:*`), not in the
21/// `PartitionFamily` discriminator.
22#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum PartitionFamily {
25    /// Flow-structural + execution partition: `{fp:N}` — flow topology
26    /// and all per-execution keys co-located with their parent flow.
27    Flow,
28    /// Execution partition — **routing alias for [`PartitionFamily::Flow`]**
29    /// under RFC-011 co-location. Produces `{fp:N}` hash-tags identical to
30    /// `Flow` and indexes into `num_flow_partitions`. Kept as a distinct
31    /// variant per RFC-011 §11 for downstream-API compatibility (cairn-
32    /// fabric and other consumers that construct this variant directly).
33    Execution,
34    /// Budget partition: `{b:M}` — budget definitions and usage.
35    Budget,
36    /// Quota partition: `{q:K}` — quota policies and sliding windows.
37    Quota,
38}
39
40impl PartitionFamily {
41    /// Hash tag prefix for this family.
42    ///
43    /// `Flow` and `Execution` are aliases and both return `"fp"` — see
44    /// the enum-level rustdoc for the RFC-011 §11 compatibility rationale.
45    fn prefix(self) -> &'static str {
46        match self {
47            Self::Flow | Self::Execution => "fp",
48            Self::Budget => "b",
49            Self::Quota => "q",
50        }
51    }
52}
53
54/// Partition counts for each family. Fixed at deployment time.
55///
56/// Post-RFC-011: `num_execution_partitions` is retired. All execution
57/// keys route via `num_flow_partitions` (exec + flow share a slot under
58/// hash-tag co-location). Default bumped 64 → 256 to preserve today's
59/// total keyspace fanout.
60#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct PartitionConfig {
62    pub num_flow_partitions: u16,
63    pub num_budget_partitions: u16,
64    pub num_quota_partitions: u16,
65}
66
67impl Default for PartitionConfig {
68    fn default() -> Self {
69        Self {
70            num_flow_partitions: 256,
71            num_budget_partitions: 32,
72            num_quota_partitions: 32,
73        }
74    }
75}
76
77impl PartitionConfig {
78    /// Get the partition count for a given family.
79    ///
80    /// `Flow` and `Execution` return the same value (`num_flow_partitions`)
81    /// — they are routing aliases under RFC-011 co-location.
82    pub fn count_for(&self, family: PartitionFamily) -> u16 {
83        match family {
84            PartitionFamily::Flow | PartitionFamily::Execution => self.num_flow_partitions,
85            PartitionFamily::Budget => self.num_budget_partitions,
86            PartitionFamily::Quota => self.num_quota_partitions,
87        }
88    }
89}
90
91/// A resolved partition within a family.
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
93pub struct Partition {
94    pub family: PartitionFamily,
95    pub index: u16,
96}
97
98impl Partition {
99    /// Returns the Valkey hash tag for this partition, e.g. `{fp:7}`, `{b:0}`.
100    pub fn hash_tag(&self) -> String {
101        format!("{{{prefix}:{index}}}", prefix = self.family.prefix(), index = self.index)
102    }
103}
104
105impl std::fmt::Display for Partition {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "{}", self.hash_tag())
108    }
109}
110
111/// Compute CRC16-CCITT of the given bytes, same algorithm as Valkey Cluster.
112fn crc16_ccitt(bytes: &[u8]) -> u16 {
113    crc16::State::<crc16::XMODEM>::calculate(bytes)
114}
115
116/// Compute the partition index for a UUID-based entity.
117/// Panics if `num_partitions` is 0 — this is a configuration error.
118fn partition_for_uuid(uuid_bytes: &[u8; 16], num_partitions: u16) -> u16 {
119    assert!(num_partitions > 0, "num_partitions must be > 0 (division by zero)");
120    crc16_ccitt(uuid_bytes) % num_partitions
121}
122
123/// Compute the partition for an execution ID.
124///
125/// Post-RFC-011: decodes the hash-tag prefix out of the id string; does NOT
126/// re-hash the UUID. The partition index is baked into the id at mint time
127/// via [`ExecutionId::for_flow`] / [`ExecutionId::solo`]. The `config` arg
128/// is retained for API symmetry with the other partition functions, but is
129/// unused — the exec id carries its partition intrinsically.
130pub fn execution_partition(eid: &ExecutionId, _config: &PartitionConfig) -> Partition {
131    // `Execution` family preserves exec-scoped semantics at the metadata
132    // layer (logs, tracing spans, metric labels) even though routing is
133    // aliased to `Flow`'s hash-tag prefix under RFC-011 co-location.
134    // See PartitionFamily enum rustdoc for the alias contract.
135    Partition {
136        family: PartitionFamily::Execution,
137        index: eid.partition(),
138    }
139}
140
141/// Compute the partition for a flow ID.
142pub fn flow_partition(fid: &FlowId, config: &PartitionConfig) -> Partition {
143    Partition {
144        family: PartitionFamily::Flow,
145        index: partition_for_uuid(fid.as_bytes(), config.num_flow_partitions),
146    }
147}
148
149/// Strategy for picking a solo execution's partition from its lane id.
150///
151/// RFC-011 §5.6 defines the birthday-paradox traffic-amplification
152/// mitigation: solo execs hash their lane id to a flow partition with
153/// crc16, which can collide with a flow's own partition. Operators
154/// that hit a persistent collision install a custom strategy at boot
155/// time via [`solo_partition_with`] instead of rebuilding `ff-server`.
156///
157/// The default impl [`Crc16SoloPartitioner`] matches the algorithm
158/// used by every other partition family. Replace only when a real
159/// collision surfaces via `ff-server admin partition-collisions`.
160pub trait SoloPartitioner: Send + Sync {
161    /// Return the partition index for a solo execution on the given lane.
162    ///
163    /// Must return a value in `0..config.num_flow_partitions`.
164    /// Must be deterministic — the same `(lane, config)` always produces
165    /// the same index (violated determinism → exec keys would route
166    /// differently on each mint, breaking all cross-lookup invariants).
167    fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16;
168}
169
170/// Default [`SoloPartitioner`]: `crc16_ccitt(lane_utf8) % num_flow_partitions`.
171///
172/// Matches the hashing used by [`flow_partition`], [`budget_partition`],
173/// [`quota_partition`] — same crc16-CCITT algorithm Valkey Cluster uses
174/// for slot assignment.
175#[derive(Clone, Copy, Debug, Default)]
176pub struct Crc16SoloPartitioner;
177
178impl SoloPartitioner for Crc16SoloPartitioner {
179    fn partition_for_lane(&self, lane: &LaneId, config: &PartitionConfig) -> u16 {
180        assert!(
181            config.num_flow_partitions > 0,
182            "num_flow_partitions must be > 0 (division by zero)"
183        );
184        crc16_ccitt(lane.as_str().as_bytes()) % config.num_flow_partitions
185    }
186}
187
188/// Compute the partition for a solo (flow-less) execution's lane shard.
189///
190/// Uses the default [`Crc16SoloPartitioner`]. Deployments that hit a
191/// traffic-amplification hotspot (per RFC-011 §5.6) should call
192/// [`solo_partition_with`] with a custom partitioner instead.
193pub fn solo_partition(lane: &LaneId, config: &PartitionConfig) -> Partition {
194    solo_partition_with(lane, config, &Crc16SoloPartitioner)
195}
196
197/// Compute the partition for a solo execution using a custom
198/// [`SoloPartitioner`] strategy.
199///
200/// The operator-facing escape hatch for RFC-011 §5.6 traffic-amplification
201/// collisions. A deployment that observes a collision via
202/// `ff-server admin partition-collisions` instantiates an alternate
203/// [`SoloPartitioner`] impl and routes solo mint paths through this
204/// function. The default [`Crc16SoloPartitioner`] is used by
205/// [`solo_partition`] and [`ExecutionId::solo`] — neither signature
206/// changes under this extension point.
207pub fn solo_partition_with(
208    lane: &LaneId,
209    config: &PartitionConfig,
210    partitioner: &dyn SoloPartitioner,
211) -> Partition {
212    // Solo execs are execution-scoped — preserve `Execution` family at
213    // the metadata layer. Routing is aliased to `Flow`'s `{fp:N}` tag
214    // under RFC-011 co-location; see PartitionFamily rustdoc for the
215    // alias contract.
216    Partition {
217        family: PartitionFamily::Execution,
218        index: partitioner.partition_for_lane(lane, config),
219    }
220}
221
222/// Compute the partition for a budget ID.
223pub fn budget_partition(bid: &BudgetId, config: &PartitionConfig) -> Partition {
224    Partition {
225        family: PartitionFamily::Budget,
226        index: partition_for_uuid(bid.as_bytes(), config.num_budget_partitions),
227    }
228}
229
230/// Compute the partition for a quota policy (by scope ID).
231pub fn quota_partition(qid: &QuotaPolicyId, config: &PartitionConfig) -> Partition {
232    Partition {
233        family: PartitionFamily::Quota,
234        index: partition_for_uuid(qid.as_bytes(), config.num_quota_partitions),
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn partition_hash_tag_format() {
244        let p = Partition { family: PartitionFamily::Flow, index: 7 };
245        assert_eq!(p.hash_tag(), "{fp:7}");
246
247        let p = Partition { family: PartitionFamily::Execution, index: 7 };
248        assert_eq!(p.hash_tag(), "{fp:7}", "Execution must alias Flow (RFC-011 §11)");
249
250        let p = Partition { family: PartitionFamily::Budget, index: 0 };
251        assert_eq!(p.hash_tag(), "{b:0}");
252
253        let p = Partition { family: PartitionFamily::Quota, index: 31 };
254        assert_eq!(p.hash_tag(), "{q:31}");
255    }
256
257    /// Execution and Flow are deliberate routing aliases post-RFC-011 §11.
258    /// This test pins the alias contract so a future edit that diverges the
259    /// two routes fails loudly rather than silently breaking co-location.
260    #[test]
261    fn execution_family_aliases_flow() {
262        // Same hash-tag at every index.
263        for index in [0u16, 1, 7, 42, 255, 65535] {
264            let flow = Partition { family: PartitionFamily::Flow, index };
265            let exec = Partition { family: PartitionFamily::Execution, index };
266            assert_eq!(
267                flow.hash_tag(),
268                exec.hash_tag(),
269                "Flow and Execution must produce identical hash-tags at index {index}"
270            );
271        }
272
273        // count_for returns the same value.
274        let config = PartitionConfig::default();
275        assert_eq!(
276            config.count_for(PartitionFamily::Flow),
277            config.count_for(PartitionFamily::Execution),
278            "count_for(Flow) == count_for(Execution) — both route via num_flow_partitions"
279        );
280
281        // A Partition minted with family=Execution produces the same
282        // hash-tag as one minted with family=Flow, given the same index.
283        // This is the key property cairn-fabric (and any other consumer
284        // that constructs `Partition { family: Execution, .. }`) depends on.
285        let p_exec = Partition { family: PartitionFamily::Execution, index: 42 };
286        let p_flow = Partition { family: PartitionFamily::Flow, index: 42 };
287        assert_eq!(p_exec.hash_tag(), p_flow.hash_tag());
288        assert_eq!(p_exec.hash_tag(), "{fp:42}");
289    }
290
291    #[test]
292    fn all_families_produce_distinct_tags() {
293        // Post-RFC-011: Flow and Execution deliberately share `{fp:N}` — see
294        // `execution_family_aliases_flow`. The three *distinct* tag spaces are
295        // {fp:N}, {b:N}, {q:N}. This test asserts that alias-after-collapse.
296        let tags: Vec<String> = [
297            PartitionFamily::Flow,
298            PartitionFamily::Budget,
299            PartitionFamily::Quota,
300        ]
301        .iter()
302        .map(|f| Partition { family: *f, index: 0 }.hash_tag())
303        .collect();
304        let unique: std::collections::HashSet<&String> = tags.iter().collect();
305        assert_eq!(unique.len(), 3, "flow/budget/quota must produce distinct hash tags");
306    }
307
308    #[test]
309    fn flow_partition_determinism() {
310        let config = PartitionConfig::default();
311        let fid = FlowId::new();
312        let p1 = flow_partition(&fid, &config);
313        let p2 = flow_partition(&fid, &config);
314        assert_eq!(p1, p2);
315        assert_eq!(p1.family, PartitionFamily::Flow);
316        assert!(p1.index < config.num_flow_partitions);
317    }
318
319    #[test]
320    fn budget_partition_determinism() {
321        let config = PartitionConfig::default();
322        let bid = BudgetId::new();
323        let p1 = budget_partition(&bid, &config);
324        let p2 = budget_partition(&bid, &config);
325        assert_eq!(p1, p2);
326        assert_eq!(p1.family, PartitionFamily::Budget);
327        assert!(p1.index < config.num_budget_partitions);
328    }
329
330    #[test]
331    fn default_config_values() {
332        let config = PartitionConfig::default();
333        assert_eq!(config.num_flow_partitions, 256);
334        assert_eq!(config.num_budget_partitions, 32);
335        assert_eq!(config.num_quota_partitions, 32);
336    }
337
338    // ── RFC-011 phase-1 tests ──
339
340    #[test]
341    fn execution_id_for_flow_determinism() {
342        let config = PartitionConfig::default();
343        let fid = FlowId::new();
344        let a = ExecutionId::for_flow(&fid, &config);
345        let b = ExecutionId::for_flow(&fid, &config);
346        // Same flow → same partition (UUID suffix differs).
347        assert_eq!(a.partition(), b.partition());
348    }
349
350    #[test]
351    fn execution_id_solo_determinism() {
352        let config = PartitionConfig::default();
353        let lane = LaneId::new("workers-a");
354        let a = ExecutionId::solo(&lane, &config);
355        let b = ExecutionId::solo(&lane, &config);
356        // Same lane → same partition.
357        assert_eq!(a.partition(), b.partition());
358    }
359
360    #[test]
361    fn execution_id_partition_matches_flow_partition() {
362        let config = PartitionConfig::default();
363        let fid = FlowId::new();
364        let eid = ExecutionId::for_flow(&fid, &config);
365        let fp = flow_partition(&fid, &config);
366        assert_eq!(eid.partition(), fp.index);
367        let ep = execution_partition(&eid, &config);
368        // Indices match (RFC-011 co-location) but family is Execution —
369        // preserves exec-scoped semantics at the metadata layer.
370        assert_eq!(ep.index, fp.index);
371        assert_eq!(ep.family, PartitionFamily::Execution);
372        // Alias contract: Execution and Flow produce identical hash-tags.
373        assert_eq!(ep.hash_tag(), fp.hash_tag());
374    }
375
376    #[test]
377    fn execution_partition_reads_hash_tag_not_uuid() {
378        // Construct an ExecutionId with a KNOWN partition (0) and a UUID
379        // whose crc16-partition lands somewhere else. execution_partition
380        // must pick the hash-tag (0), not re-hash the UUID.
381        let known_uuid = "550e8400-e29b-41d4-a716-446655440000";
382        let s = format!("{{fp:0}}:{known_uuid}");
383        let eid = ExecutionId::parse(&s).unwrap();
384        let config = PartitionConfig::default();
385        let p = execution_partition(&eid, &config);
386        assert_eq!(p.index, 0, "must read hash-tag, not re-hash UUID");
387    }
388
389    #[test]
390    fn execution_partition_ignores_config_value() {
391        // Safety test per manager ask #7 (edge case 3):
392        // An id minted in one config must decode to the same partition
393        // in any other config — the partition is baked into the id,
394        // not computed from the config.
395        let small = PartitionConfig { num_flow_partitions: 4, ..Default::default() };
396        let fid = FlowId::new();
397        let eid = ExecutionId::for_flow(&fid, &small);
398        let minted_partition = eid.partition();
399
400        // Decode via a different config — must match.
401        let big = PartitionConfig { num_flow_partitions: 1024, ..Default::default() };
402        let p = execution_partition(&eid, &big);
403        assert_eq!(
404            p.index, minted_partition,
405            "hash-tag is authoritative; config value must not change decoding"
406        );
407    }
408
409    #[test]
410    fn execution_id_parse_rejects_bare_uuid() {
411        let bare = "550e8400-e29b-41d4-a716-446655440000";
412        match ExecutionId::parse(bare) {
413            Err(crate::types::ExecutionIdParseError::MissingTag(_)) => {}
414            other => panic!("expected MissingTag, got {other:?}"),
415        }
416    }
417
418    #[test]
419    fn execution_id_parse_accepts_wellformed_shape() {
420        let s = "{fp:42}:550e8400-e29b-41d4-a716-446655440000";
421        let eid = ExecutionId::parse(s).unwrap();
422        assert_eq!(eid.partition(), 42);
423        assert_eq!(eid.as_str(), s);
424    }
425
426    #[test]
427    fn execution_id_parse_rejects_bad_partition_index() {
428        // Non-integer partition
429        match ExecutionId::parse("{fp:xx}:550e8400-e29b-41d4-a716-446655440000") {
430            Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
431            other => panic!("expected InvalidPartitionIndex, got {other:?}"),
432        }
433        // u16-overflow partition (65536)
434        match ExecutionId::parse("{fp:65536}:550e8400-e29b-41d4-a716-446655440000") {
435            Err(crate::types::ExecutionIdParseError::InvalidPartitionIndex(_)) => {}
436            other => panic!("expected InvalidPartitionIndex for u16 overflow, got {other:?}"),
437        }
438    }
439
440    #[test]
441    fn execution_id_parse_rejects_bad_uuid() {
442        match ExecutionId::parse("{fp:0}:not-a-uuid") {
443            Err(crate::types::ExecutionIdParseError::InvalidUuid(_)) => {}
444            other => panic!("expected InvalidUuid, got {other:?}"),
445        }
446    }
447
448    #[test]
449    fn solo_partition_determinism() {
450        let config = PartitionConfig::default();
451        let lane = LaneId::new("workers-a");
452        let p1 = solo_partition(&lane, &config);
453        let p2 = solo_partition(&lane, &config);
454        assert_eq!(p1, p2);
455        // Solo execs are execution-scoped — family is Execution, routing
456        // is aliased to Flow's `{fp:N}` via PartitionFamily's prefix map.
457        assert_eq!(p1.family, PartitionFamily::Execution);
458        assert!(p1.index < config.num_flow_partitions);
459    }
460
461    #[test]
462    fn solo_partition_different_lanes_usually_differ() {
463        // Not strict — collisions are possible (and §5.6 acknowledges) —
464        // but over 100 distinct lane names we expect most to be distinct.
465        let config = PartitionConfig::default();
466        let mut seen = std::collections::HashSet::new();
467        for i in 0..100 {
468            let lane = LaneId::new(format!("lane-{i}"));
469            let p = solo_partition(&lane, &config);
470            seen.insert(p.index);
471        }
472        // At 256 partitions with 100 inputs, expect >50 distinct.
473        assert!(
474            seen.len() > 50,
475            "solo_partition distribution too narrow: only {} distinct of 100",
476            seen.len()
477        );
478    }
479
480    // ── SoloPartitioner trait (RFC-011 §5.6 mitigation #3) ──
481
482    #[test]
483    fn crc16_solo_partitioner_matches_legacy_behavior() {
484        // The default Crc16SoloPartitioner must produce the same index as
485        // the pre-trait solo_partition() function — otherwise installing
486        // the trait under an existing deployment would silently re-route
487        // every solo exec.
488        let config = PartitionConfig::default();
489        let lane = LaneId::new("workers-a");
490        let default_idx = Crc16SoloPartitioner.partition_for_lane(&lane, &config);
491        let expected = crc16::State::<crc16::XMODEM>::calculate(lane.as_str().as_bytes())
492            % config.num_flow_partitions;
493        assert_eq!(default_idx, expected);
494    }
495
496    #[test]
497    fn solo_partition_with_custom_partitioner_routes_through_trait() {
498        // Stub partitioner always returns index 0; a custom impl must
499        // override the default routing.
500        struct AlwaysZero;
501        impl SoloPartitioner for AlwaysZero {
502            fn partition_for_lane(&self, _lane: &LaneId, _config: &PartitionConfig) -> u16 {
503                0
504            }
505        }
506        let config = PartitionConfig::default();
507        let lane = LaneId::new("pick-me");
508        let p = solo_partition_with(&lane, &config, &AlwaysZero);
509        assert_eq!(p.index, 0);
510        // Solo execs → Execution family (aliases Flow for routing).
511        assert_eq!(p.family, PartitionFamily::Execution);
512    }
513
514    #[test]
515    fn solo_partition_default_matches_solo_partition_with_crc16() {
516        // solo_partition() and solo_partition_with(Crc16SoloPartitioner)
517        // must produce identical Partitions — this pins the default impl
518        // as the identity override.
519        let config = PartitionConfig::default();
520        let lane = LaneId::new("workers-b");
521        let default = solo_partition(&lane, &config);
522        let explicit = solo_partition_with(&lane, &config, &Crc16SoloPartitioner);
523        assert_eq!(default, explicit);
524    }
525
526    #[test]
527    fn execution_id_serde_via_deserialize_validates() {
528        // Valid shape deserialises.
529        let json = r#""{fp:0}:550e8400-e29b-41d4-a716-446655440000""#;
530        let eid: ExecutionId = serde_json::from_str(json).unwrap();
531        assert_eq!(eid.partition(), 0);
532
533        // Bare UUID fails Deserialize.
534        let bare = r#""550e8400-e29b-41d4-a716-446655440000""#;
535        assert!(serde_json::from_str::<ExecutionId>(bare).is_err());
536    }
537}