1use ff_core::partition::{solo_partition, PartitionConfig};
16use ff_core::types::LaneId;
17
18pub fn load_probe_inputs() -> Result<(Vec<LaneId>, PartitionConfig), String> {
33 let raw = std::env::var("FF_LANES").unwrap_or_else(|_| "default".to_string());
37 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
38 let mut lanes: Vec<LaneId> = Vec::new();
39 for token in raw.split(',') {
40 let trimmed = token.trim();
41 if trimmed.is_empty() {
42 continue;
43 }
44 let lane = LaneId::try_new(trimmed).map_err(|e| {
45 format!("FF_LANES: invalid lane name '{trimmed}': {e}")
46 })?;
47 if !seen.insert(lane.as_str().to_string()) {
48 return Err(format!(
49 "FF_LANES: duplicate lane name '{trimmed}' — remove one of the entries"
50 ));
51 }
52 lanes.push(lane);
53 }
54 if lanes.is_empty() {
55 return Err(
56 "FF_LANES: at least one non-empty lane name is required".to_string(),
57 );
58 }
59
60 let num_flow_partitions = parse_u16_positive("FF_FLOW_PARTITIONS", 256)?;
61 let num_budget_partitions = parse_u16_positive("FF_BUDGET_PARTITIONS", 32)?;
62 let num_quota_partitions = parse_u16_positive("FF_QUOTA_PARTITIONS", 32)?;
63
64 Ok((
65 lanes,
66 PartitionConfig {
67 num_flow_partitions,
68 num_budget_partitions,
69 num_quota_partitions,
70 },
71 ))
72}
73
74fn parse_u16_positive(var: &str, default: u16) -> Result<u16, String> {
75 match std::env::var(var) {
76 Ok(s) => {
77 let n: u16 = s.parse().map_err(|_| {
78 format!("{var}: '{s}' is not a valid u16 (1-65535)")
79 })?;
80 if n == 0 {
81 return Err(format!("{var}: must be > 0"));
82 }
83 Ok(n)
84 }
85 Err(_) => Ok(default),
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct LanePartition {
93 pub lane: LaneId,
95 pub index: u16,
97 pub collides_with: Vec<LaneId>,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub enum CollisionSeverity {
106 Clean,
108 Watch,
110 Elevated,
112 Remediate,
114}
115
116#[derive(Debug, Clone)]
118pub struct PartitionCollisionsReport {
119 pub partitions: u16,
120 pub total_lanes: usize,
121 pub colliding_lanes: usize,
122 pub severity: CollisionSeverity,
123 pub entries: Vec<LanePartition>,
126}
127
128impl PartitionCollisionsReport {
129 pub fn compute(lanes: &[LaneId], config: &PartitionConfig) -> Self {
140 let mut by_partition: std::collections::BTreeMap<u16, Vec<LaneId>> =
142 std::collections::BTreeMap::new();
143 for lane in lanes {
144 let p = solo_partition(lane, config);
145 by_partition.entry(p.index).or_default().push(lane.clone());
146 }
147
148 let mut entries: Vec<LanePartition> = Vec::with_capacity(lanes.len());
165 let mut colliding_lanes = 0usize;
166 for (index, siblings) in &by_partition {
167 let mut sorted_siblings: Vec<LaneId> = siblings.clone();
168 sorted_siblings.sort_by(|a, b| a.as_str().cmp(b.as_str()));
169 for lane in siblings {
170 let mut seen_self = false;
171 let others: Vec<LaneId> = sorted_siblings
172 .iter()
173 .filter(|sib| {
174 if sib.as_str() == lane.as_str() && !seen_self {
175 seen_self = true;
177 false
178 } else {
179 true
180 }
181 })
182 .cloned()
183 .collect();
184 if !others.is_empty() {
185 colliding_lanes += 1;
186 }
187 entries.push(LanePartition {
188 lane: lane.clone(),
189 index: *index,
190 collides_with: others,
191 });
192 }
193 }
194 entries.sort_by(|a, b| {
197 a.index
198 .cmp(&b.index)
199 .then_with(|| a.lane.as_str().cmp(b.lane.as_str()))
200 });
201
202 let severity = classify_severity(colliding_lanes, lanes.len());
203
204 Self {
205 partitions: config.num_flow_partitions,
206 total_lanes: lanes.len(),
207 colliding_lanes,
208 severity,
209 entries,
210 }
211 }
212
213 pub fn format_plain(&self) -> String {
217 let mut out = String::new();
218 out.push_str(&format!(
219 "FlowFabric partition-collisions probe (RFC-011 §5.6)\n\
220 \n\
221 num_flow_partitions: {partitions}\n\
222 lanes configured: {total}\n\
223 lanes colliding: {colliding} ({pct:.1}%)\n\
224 severity: {severity:?}\n\
225 \n",
226 partitions = self.partitions,
227 total = self.total_lanes,
228 colliding = self.colliding_lanes,
229 pct = if self.total_lanes == 0 {
230 0.0
231 } else {
232 100.0 * self.colliding_lanes as f64 / self.total_lanes as f64
233 },
234 severity = self.severity,
235 ));
236
237 let lane_width = self
242 .entries
243 .iter()
244 .map(|e| e.lane.as_str().len())
245 .max()
246 .unwrap_or(0)
247 .max(16);
248 out.push_str(&format!(
249 "{:>9} | {:<width$} | collides_with\n",
250 "partition",
251 "lane",
252 width = lane_width,
253 ));
254 out.push_str(&format!(
255 "{} | {} | {}\n",
256 "-".repeat(9),
257 "-".repeat(lane_width),
258 "-".repeat(40),
259 ));
260 for entry in &self.entries {
261 let collides = if entry.collides_with.is_empty() {
262 "—".to_string()
263 } else {
264 entry
265 .collides_with
266 .iter()
267 .map(|l| l.as_str().to_string())
268 .collect::<Vec<_>>()
269 .join(", ")
270 };
271 out.push_str(&format!(
272 "{:>9} | {:<width$} | {}\n",
273 entry.index,
274 entry.lane.as_str(),
275 collides,
276 width = lane_width,
277 ));
278 }
279
280 if self.colliding_lanes > 0 {
281 out.push('\n');
285 out.push_str("Remediation (see docs/rfc011-operator-runbook.md §Partition-collision observability):\n");
286 out.push_str(" 1. Rename a colliding lane to hash differently (cheapest).\n");
287 out.push_str(" 2. Bump FF_FLOW_PARTITIONS to halve collision probability (requires clean state).\n");
288 out.push_str(" 3. Install a custom SoloPartitioner via solo_partition_with (advanced; requires fork).\n");
289 }
290 out
291 }
292}
293
294fn classify_severity(colliding: usize, total: usize) -> CollisionSeverity {
295 if colliding == 0 {
296 return CollisionSeverity::Clean;
297 }
298 if total == 0 {
299 return CollisionSeverity::Clean;
300 }
301 let colliding_bp = colliding.saturating_mul(100); let five_pct = total.saturating_mul(5);
310 let fifteen_pct = total.saturating_mul(15);
311 if colliding_bp < five_pct {
312 CollisionSeverity::Watch
313 } else if colliding_bp <= fifteen_pct {
314 CollisionSeverity::Elevated
315 } else {
316 CollisionSeverity::Remediate
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 fn cfg(num_flow: u16) -> PartitionConfig {
325 PartitionConfig {
326 num_flow_partitions: num_flow,
327 num_budget_partitions: 32,
328 num_quota_partitions: 32,
329 }
330 }
331
332 fn lane(name: &str) -> LaneId {
333 LaneId::try_new(name).expect("valid lane id")
334 }
335
336 #[test]
337 fn zero_lanes_is_clean() {
338 let r = PartitionCollisionsReport::compute(&[], &cfg(256));
339 assert_eq!(r.total_lanes, 0);
340 assert_eq!(r.colliding_lanes, 0);
341 assert_eq!(r.severity, CollisionSeverity::Clean);
342 assert!(r.entries.is_empty());
343 }
344
345 #[test]
346 fn single_lane_is_clean() {
347 let lanes = vec![lane("default")];
348 let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
349 assert_eq!(r.colliding_lanes, 0);
350 assert_eq!(r.severity, CollisionSeverity::Clean);
351 assert_eq!(r.entries.len(), 1);
352 assert!(r.entries[0].collides_with.is_empty());
353 }
354
355 #[test]
356 fn forced_collision_via_tiny_partition_count() {
357 let lanes = vec![lane("a"), lane("b"), lane("c")];
359 let r = PartitionCollisionsReport::compute(&lanes, &cfg(1));
360 assert_eq!(r.colliding_lanes, 3);
361 assert_eq!(r.severity, CollisionSeverity::Remediate);
362 for entry in &r.entries {
364 assert_eq!(entry.index, 0);
365 assert_eq!(entry.collides_with.len(), 2);
366 }
367 }
368
369 #[test]
370 fn severity_thresholds() {
371 assert_eq!(classify_severity(0, 100), CollisionSeverity::Clean);
373 assert_eq!(classify_severity(4, 100), CollisionSeverity::Watch);
375 assert_eq!(classify_severity(10, 100), CollisionSeverity::Elevated);
377 assert_eq!(classify_severity(20, 100), CollisionSeverity::Remediate);
379 assert_eq!(classify_severity(5, 100), CollisionSeverity::Elevated);
382 assert_eq!(classify_severity(15, 100), CollisionSeverity::Elevated);
383 assert_eq!(classify_severity(16, 100), CollisionSeverity::Remediate);
384 }
385
386 #[test]
387 fn entries_sorted_deterministically() {
388 let lanes = vec![lane("zzz"), lane("aaa"), lane("mmm")];
391 let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
392 for pair in r.entries.windows(2) {
393 let a = &pair[0];
394 let b = &pair[1];
395 assert!(
396 a.index < b.index
397 || (a.index == b.index && a.lane.as_str() <= b.lane.as_str()),
398 "entries not sorted: {a:?} before {b:?}"
399 );
400 }
401 }
402
403 #[test]
404 fn format_plain_clean_deployment() {
405 let lanes = vec![lane("default")];
406 let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
407 let out = r.format_plain();
408 assert!(out.contains("num_flow_partitions: 256"));
409 assert!(out.contains("lanes configured: 1"));
410 assert!(out.contains("lanes colliding: 0"));
411 assert!(out.contains("Clean"));
412 assert!(out.contains("default"));
413 assert!(!out.contains("Remediation"));
415 }
416
417 #[test]
418 fn format_plain_adapts_width_to_long_lane_name() {
419 let long = "x".repeat(40);
422 let lanes = vec![lane(&long), lane("short")];
423 let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
424 let out = r.format_plain();
425 for line in out.lines().filter(|l| l.starts_with(|c: char| c.is_ascii_digit() || c == ' ')) {
429 if let Some(middle) = line.split('|').nth(1) {
430 let middle_trim_right = middle.trim_end();
431 if middle_trim_right.contains(&long) {
433 assert!(
434 middle.len() > long.len(),
435 "row middle too narrow for long lane: {middle:?}"
436 );
437 }
438 }
439 }
440 assert!(out.contains(&long));
442 }
443
444 #[test]
445 fn format_plain_forced_collision_includes_remediation() {
446 let lanes = vec![lane("a"), lane("b")];
447 let r = PartitionCollisionsReport::compute(&lanes, &cfg(1));
448 let out = r.format_plain();
449 assert!(out.contains("Remediate"));
450 assert!(out.contains("Remediation"));
451 assert!(out.contains("FF_FLOW_PARTITIONS"));
452 assert!(out.contains("SoloPartitioner"));
453 assert!(out.contains("a") && out.contains("b"));
455 assert!(
460 out.contains("\n 1. Rename"),
461 "remediation step 1 missing two-space indent in: {out:?}"
462 );
463 assert!(
464 out.contains("\n 2. Bump FF_FLOW_PARTITIONS"),
465 "remediation step 2 missing two-space indent in: {out:?}"
466 );
467 assert!(
468 out.contains("\n 3. Install a custom SoloPartitioner"),
469 "remediation step 3 missing two-space indent in: {out:?}"
470 );
471 }
472}