Skip to main content

ff_server/
admin.rs

1//! Administrative subcommands for operator use.
2//!
3//! Invoked as `ff-server admin <subcommand> [args]`. Subcommands are
4//! read-only probes that share the `ff-server` binary + env-var conventions
5//! but load only the minimal config subset they need (see
6//! [`load_probe_inputs`]). They do not connect to Valkey, start HTTP, or
7//! spawn scanners. Each probe completes synchronously and exits.
8//!
9//! # Subcommands
10//!
11//! - `partition-collisions` — RFC-011 §5.6 observability. Computes the
12//!   solo-partition assignment for every configured lane and reports any
13//!   that share a partition with another lane (birthday-paradox collision).
14
15use ff_core::partition::{solo_partition, PartitionConfig};
16use ff_core::types::LaneId;
17
18/// Load the minimal config subset the probe needs, directly from env.
19///
20/// Unlike [`crate::config::ServerConfig::from_env`], this skips the HMAC
21/// secret, CORS, listener address, and engine intervals — a probe doesn't
22/// bind HTTP, start scanners, or touch signalling, so demanding those
23/// variables just creates operator-facing friction. Reads:
24///
25/// - `FF_LANES` (default `"default"`)
26/// - `FF_FLOW_PARTITIONS` (default `256`)
27/// - `FF_BUDGET_PARTITIONS` (default `32`) — present for struct symmetry, unused by the collisions probe
28/// - `FF_QUOTA_PARTITIONS` (default `32`) — same
29///
30/// Returns `Err(String)` with an operator-actionable message on invalid
31/// values (empty `FF_LANES`, non-positive partition count, etc.).
32pub fn load_probe_inputs() -> Result<(Vec<LaneId>, PartitionConfig), String> {
33    // Parse + validate each lane name via try_new (length, ASCII-printable
34    // shape per types.rs LANE_ID_MAX_BYTES). Reject duplicates so the
35    // collision table is not polluted by a miscounted total.
36    let raw = std::env::var("FF_LANES").unwrap_or_else(|_| "default".to_string());
37    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
38    let mut lanes: Vec<LaneId> = Vec::new();
39    for token in raw.split(',') {
40        let trimmed = token.trim();
41        if trimmed.is_empty() {
42            continue;
43        }
44        let lane = LaneId::try_new(trimmed).map_err(|e| {
45            format!("FF_LANES: invalid lane name '{trimmed}': {e}")
46        })?;
47        if !seen.insert(lane.as_str().to_string()) {
48            return Err(format!(
49                "FF_LANES: duplicate lane name '{trimmed}' — remove one of the entries"
50            ));
51        }
52        lanes.push(lane);
53    }
54    if lanes.is_empty() {
55        return Err(
56            "FF_LANES: at least one non-empty lane name is required".to_string(),
57        );
58    }
59
60    let num_flow_partitions = parse_u16_positive("FF_FLOW_PARTITIONS", 256)?;
61    let num_budget_partitions = parse_u16_positive("FF_BUDGET_PARTITIONS", 32)?;
62    let num_quota_partitions = parse_u16_positive("FF_QUOTA_PARTITIONS", 32)?;
63
64    Ok((
65        lanes,
66        PartitionConfig {
67            num_flow_partitions,
68            num_budget_partitions,
69            num_quota_partitions,
70        },
71    ))
72}
73
74fn parse_u16_positive(var: &str, default: u16) -> Result<u16, String> {
75    match std::env::var(var) {
76        Ok(s) => {
77            let n: u16 = s.parse().map_err(|_| {
78                format!("{var}: '{s}' is not a valid u16 (1-65535)")
79            })?;
80            if n == 0 {
81                return Err(format!("{var}: must be > 0"));
82            }
83            Ok(n)
84        }
85        Err(_) => Ok(default),
86    }
87}
88
89/// Result of a single lane's partition assignment during the
90/// partition-collisions probe.
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct LanePartition {
93    /// The lane id as configured.
94    pub lane: LaneId,
95    /// The partition index the lane routes to (0..num_flow_partitions).
96    pub index: u16,
97    /// Lanes that collide on this same index (excluding `self`). Empty if
98    /// the lane is the sole occupant of its partition.
99    pub collides_with: Vec<LaneId>,
100}
101
102/// Severity classification for a collision report. Matches the runbook's
103/// thresholds at `docs/rfc011-operator-runbook.md`.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub enum CollisionSeverity {
106    /// No collisions; every lane is alone in its partition.
107    Clean,
108    /// Some collisions but under 5% of lanes affected. Watch, don't remediate yet.
109    Watch,
110    /// 5-15% of lanes collide. Worth remediating (rename a lane or bump partitions).
111    Elevated,
112    /// >15% of lanes collide. Remediate; hot-spot risk is real under load.
113    Remediate,
114}
115
116/// Aggregated output of the partition-collisions probe.
117#[derive(Debug, Clone)]
118pub struct PartitionCollisionsReport {
119    pub partitions: u16,
120    pub total_lanes: usize,
121    pub colliding_lanes: usize,
122    pub severity: CollisionSeverity,
123    /// Per-lane results, sorted by partition index (then by lane name within
124    /// a partition) for deterministic output.
125    pub entries: Vec<LanePartition>,
126}
127
128impl PartitionCollisionsReport {
129    /// Compute the report for a set of configured lanes under the given
130    /// partition config.
131    ///
132    /// Pure function — no Valkey connection, no IO. Uses
133    /// [`ff_core::partition::solo_partition`] (and therefore
134    /// `ff_core::partition::Crc16SoloPartitioner`). Deployments that have
135    /// installed a custom `ff_core::partition::SoloPartitioner` at boot
136    /// time need to feed the alternate partitioner in explicitly — not
137    /// yet wired through this subcommand; the current probe assumes the
138    /// default.
139    pub fn compute(lanes: &[LaneId], config: &PartitionConfig) -> Self {
140        // Group lane indices by the partition they hash to.
141        let mut by_partition: std::collections::BTreeMap<u16, Vec<LaneId>> =
142            std::collections::BTreeMap::new();
143        for lane in lanes {
144            let p = solo_partition(lane, config);
145            by_partition.entry(p.index).or_default().push(lane.clone());
146        }
147
148        // Build per-lane entries. Sort each partition's siblings ONCE
149        // up-front (O(M log M) per partition), then derive the
150        // collides_with set by filtering the already-sorted slice — an
151        // O(M) walk per lane. Previous shape re-sorted the siblings slice
152        // for every lane in the partition, hitting O(M² log M) when a
153        // partition holds many lanes. For normal deployments where most
154        // partitions have at most 1 lane, both shapes are fast; the fix
155        // matters only under severe collision density, but the cheaper
156        // shape is also clearer to read.
157        // Guard against a caller passing a lane list with duplicate names.
158        // The CLI path already rejects duplicates in [`load_probe_inputs`],
159        // but `compute` is reachable from library callers too. For each
160        // lane, build `collides_with` by filtering the sorted sibling
161        // slice but skipping exactly ONE occurrence of the same name —
162        // via a `seen_self` guard. Any remaining matching entries are
163        // real duplicates and correctly appear in collides_with.
164        let mut entries: Vec<LanePartition> = Vec::with_capacity(lanes.len());
165        let mut colliding_lanes = 0usize;
166        for (index, siblings) in &by_partition {
167            let mut sorted_siblings: Vec<LaneId> = siblings.clone();
168            sorted_siblings.sort_by(|a, b| a.as_str().cmp(b.as_str()));
169            for lane in siblings {
170                let mut seen_self = false;
171                let others: Vec<LaneId> = sorted_siblings
172                    .iter()
173                    .filter(|sib| {
174                        if sib.as_str() == lane.as_str() && !seen_self {
175                            // Skip the first occurrence only.
176                            seen_self = true;
177                            false
178                        } else {
179                            true
180                        }
181                    })
182                    .cloned()
183                    .collect();
184                if !others.is_empty() {
185                    colliding_lanes += 1;
186                }
187                entries.push(LanePartition {
188                    lane: lane.clone(),
189                    index: *index,
190                    collides_with: others,
191                });
192            }
193        }
194        // Sort by (index, lane) for deterministic output. BTreeMap above
195        // already orders by index; within an index we need lane ordering.
196        entries.sort_by(|a, b| {
197            a.index
198                .cmp(&b.index)
199                .then_with(|| a.lane.as_str().cmp(b.lane.as_str()))
200        });
201
202        let severity = classify_severity(colliding_lanes, lanes.len());
203
204        Self {
205            partitions: config.num_flow_partitions,
206            total_lanes: lanes.len(),
207            colliding_lanes,
208            severity,
209            entries,
210        }
211    }
212
213    /// Render the report as a plain-text table, deterministic and
214    /// operator-friendly. Emits to stdout in the CLI; returned as a
215    /// `String` for unit testing.
216    pub fn format_plain(&self) -> String {
217        let mut out = String::new();
218        out.push_str(&format!(
219            "FlowFabric partition-collisions probe (RFC-011 §5.6)\n\
220             \n\
221             num_flow_partitions: {partitions}\n\
222             lanes configured:    {total}\n\
223             lanes colliding:     {colliding} ({pct:.1}%)\n\
224             severity:            {severity:?}\n\
225             \n",
226            partitions = self.partitions,
227            total = self.total_lanes,
228            colliding = self.colliding_lanes,
229            pct = if self.total_lanes == 0 {
230                0.0
231            } else {
232                100.0 * self.colliding_lanes as f64 / self.total_lanes as f64
233            },
234            severity = self.severity,
235        ));
236
237        // Lane column width adapts to the longest configured lane name
238        // so tables stay aligned even with long names (LaneId::try_new
239        // permits up to 64 bytes). Minimum width 16 keeps the table
240        // readable on deployments with very short names.
241        let lane_width = self
242            .entries
243            .iter()
244            .map(|e| e.lane.as_str().len())
245            .max()
246            .unwrap_or(0)
247            .max(16);
248        out.push_str(&format!(
249            "{:>9} | {:<width$} | collides_with\n",
250            "partition",
251            "lane",
252            width = lane_width,
253        ));
254        out.push_str(&format!(
255            "{} | {} | {}\n",
256            "-".repeat(9),
257            "-".repeat(lane_width),
258            "-".repeat(40),
259        ));
260        for entry in &self.entries {
261            let collides = if entry.collides_with.is_empty() {
262                "—".to_string()
263            } else {
264                entry
265                    .collides_with
266                    .iter()
267                    .map(|l| l.as_str().to_string())
268                    .collect::<Vec<_>>()
269                    .join(", ")
270            };
271            out.push_str(&format!(
272                "{:>9} | {:<width$} | {}\n",
273                entry.index,
274                entry.lane.as_str(),
275                collides,
276                width = lane_width,
277            ));
278        }
279
280        if self.colliding_lanes > 0 {
281            // Backslash line-continuation strips leading whitespace, so the
282            // numbered list is spelled as explicit "\n  1. ..." entries
283            // (NOT multi-line raw strings with visually-indented lines).
284            out.push('\n');
285            out.push_str("Remediation (see docs/rfc011-operator-runbook.md §Partition-collision observability):\n");
286            out.push_str("  1. Rename a colliding lane to hash differently (cheapest).\n");
287            out.push_str("  2. Bump FF_FLOW_PARTITIONS to halve collision probability (requires clean state).\n");
288            out.push_str("  3. Install a custom SoloPartitioner via solo_partition_with (advanced; requires fork).\n");
289        }
290        out
291    }
292}
293
294fn classify_severity(colliding: usize, total: usize) -> CollisionSeverity {
295    if colliding == 0 {
296        return CollisionSeverity::Clean;
297    }
298    if total == 0 {
299        return CollisionSeverity::Clean;
300    }
301    // Integer arithmetic to avoid floating-point boundary surprises
302    // (exactly 5% and exactly 15% previously landed on the stricter
303    // bucket due to float comparison with `ratio < 0.05` / `< 0.15`).
304    // Runbook contract:
305    //   <5%    → Watch       (colliding * 100 < 5 * total)
306    //   5-15%  → Elevated    (5 * total <= colliding * 100 <= 15 * total)
307    //   >15%   → Remediate   (colliding * 100 > 15 * total)
308    let colliding_bp = colliding.saturating_mul(100); // basis points ×100 (not bp proper)
309    let five_pct = total.saturating_mul(5);
310    let fifteen_pct = total.saturating_mul(15);
311    if colliding_bp < five_pct {
312        CollisionSeverity::Watch
313    } else if colliding_bp <= fifteen_pct {
314        CollisionSeverity::Elevated
315    } else {
316        CollisionSeverity::Remediate
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    fn cfg(num_flow: u16) -> PartitionConfig {
325        PartitionConfig {
326            num_flow_partitions: num_flow,
327            num_budget_partitions: 32,
328            num_quota_partitions: 32,
329        }
330    }
331
332    fn lane(name: &str) -> LaneId {
333        LaneId::try_new(name).expect("valid lane id")
334    }
335
336    #[test]
337    fn zero_lanes_is_clean() {
338        let r = PartitionCollisionsReport::compute(&[], &cfg(256));
339        assert_eq!(r.total_lanes, 0);
340        assert_eq!(r.colliding_lanes, 0);
341        assert_eq!(r.severity, CollisionSeverity::Clean);
342        assert!(r.entries.is_empty());
343    }
344
345    #[test]
346    fn single_lane_is_clean() {
347        let lanes = vec![lane("default")];
348        let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
349        assert_eq!(r.colliding_lanes, 0);
350        assert_eq!(r.severity, CollisionSeverity::Clean);
351        assert_eq!(r.entries.len(), 1);
352        assert!(r.entries[0].collides_with.is_empty());
353    }
354
355    #[test]
356    fn forced_collision_via_tiny_partition_count() {
357        // 3 lanes on 1 partition — all collide.
358        let lanes = vec![lane("a"), lane("b"), lane("c")];
359        let r = PartitionCollisionsReport::compute(&lanes, &cfg(1));
360        assert_eq!(r.colliding_lanes, 3);
361        assert_eq!(r.severity, CollisionSeverity::Remediate);
362        // Every entry has the other two listed.
363        for entry in &r.entries {
364            assert_eq!(entry.index, 0);
365            assert_eq!(entry.collides_with.len(), 2);
366        }
367    }
368
369    #[test]
370    fn severity_thresholds() {
371        // 0% collision → Clean
372        assert_eq!(classify_severity(0, 100), CollisionSeverity::Clean);
373        // 4% → Watch
374        assert_eq!(classify_severity(4, 100), CollisionSeverity::Watch);
375        // 10% → Elevated
376        assert_eq!(classify_severity(10, 100), CollisionSeverity::Elevated);
377        // 20% → Remediate
378        assert_eq!(classify_severity(20, 100), CollisionSeverity::Remediate);
379        // Boundary cases per runbook: 5% → Elevated (inclusive),
380        // 15% → Elevated (inclusive), 16% → Remediate.
381        assert_eq!(classify_severity(5, 100), CollisionSeverity::Elevated);
382        assert_eq!(classify_severity(15, 100), CollisionSeverity::Elevated);
383        assert_eq!(classify_severity(16, 100), CollisionSeverity::Remediate);
384    }
385
386    #[test]
387    fn entries_sorted_deterministically() {
388        // Even with lanes passed in arbitrary order, output is sorted by
389        // (partition, lane). Lets operators diff two runs cleanly.
390        let lanes = vec![lane("zzz"), lane("aaa"), lane("mmm")];
391        let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
392        for pair in r.entries.windows(2) {
393            let a = &pair[0];
394            let b = &pair[1];
395            assert!(
396                a.index < b.index
397                    || (a.index == b.index && a.lane.as_str() <= b.lane.as_str()),
398                "entries not sorted: {a:?} before {b:?}"
399            );
400        }
401    }
402
403    #[test]
404    fn format_plain_clean_deployment() {
405        let lanes = vec![lane("default")];
406        let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
407        let out = r.format_plain();
408        assert!(out.contains("num_flow_partitions: 256"));
409        assert!(out.contains("lanes configured:    1"));
410        assert!(out.contains("lanes colliding:     0"));
411        assert!(out.contains("Clean"));
412        assert!(out.contains("default"));
413        // Clean deployments get NO remediation section.
414        assert!(!out.contains("Remediation"));
415    }
416
417    #[test]
418    fn format_plain_adapts_width_to_long_lane_name() {
419        // LaneId permits up to 64 bytes. A 40-byte name should NOT
420        // produce a broken-looking table — column width must adapt.
421        let long = "x".repeat(40);
422        let lanes = vec![lane(&long), lane("short")];
423        let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
424        let out = r.format_plain();
425        // Each data row should have the lane section padded to the long
426        // name's length. Split on '|' and confirm the middle column is
427        // at least 40 chars wide (plus potential surrounding space).
428        for line in out.lines().filter(|l| l.starts_with(|c: char| c.is_ascii_digit() || c == ' ')) {
429            if let Some(middle) = line.split('|').nth(1) {
430                let middle_trim_right = middle.trim_end();
431                // At minimum the longest name must fit without truncation.
432                if middle_trim_right.contains(&long) {
433                    assert!(
434                        middle.len() > long.len(),
435                        "row middle too narrow for long lane: {middle:?}"
436                    );
437                }
438            }
439        }
440        // And crucially: the long lane name appears intact.
441        assert!(out.contains(&long));
442    }
443
444    #[test]
445    fn format_plain_forced_collision_includes_remediation() {
446        let lanes = vec![lane("a"), lane("b")];
447        let r = PartitionCollisionsReport::compute(&lanes, &cfg(1));
448        let out = r.format_plain();
449        assert!(out.contains("Remediate"));
450        assert!(out.contains("Remediation"));
451        assert!(out.contains("FF_FLOW_PARTITIONS"));
452        assert!(out.contains("SoloPartitioner"));
453        // Each lane's row lists the other as collides_with.
454        assert!(out.contains("a") && out.contains("b"));
455        // Numbered list items retain the intended two-space indent. The
456        // backslash line-continuation Rust string idiom strips leading
457        // whitespace, so we explicitly push each numbered line; this test
458        // pins that we didn't regress back to the continuation shape.
459        assert!(
460            out.contains("\n  1. Rename"),
461            "remediation step 1 missing two-space indent in: {out:?}"
462        );
463        assert!(
464            out.contains("\n  2. Bump FF_FLOW_PARTITIONS"),
465            "remediation step 2 missing two-space indent in: {out:?}"
466        );
467        assert!(
468            out.contains("\n  3. Install a custom SoloPartitioner"),
469            "remediation step 3 missing two-space indent in: {out:?}"
470        );
471    }
472}