use ff_core::partition::{solo_partition, PartitionConfig};
use ff_core::types::LaneId;
pub fn load_probe_inputs() -> Result<(Vec<LaneId>, PartitionConfig), String> {
let raw = std::env::var("FF_LANES").unwrap_or_else(|_| "default".to_string());
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut lanes: Vec<LaneId> = Vec::new();
for token in raw.split(',') {
let trimmed = token.trim();
if trimmed.is_empty() {
continue;
}
let lane = LaneId::try_new(trimmed).map_err(|e| {
format!("FF_LANES: invalid lane name '{trimmed}': {e}")
})?;
if !seen.insert(lane.as_str().to_string()) {
return Err(format!(
"FF_LANES: duplicate lane name '{trimmed}' — remove one of the entries"
));
}
lanes.push(lane);
}
if lanes.is_empty() {
return Err(
"FF_LANES: at least one non-empty lane name is required".to_string(),
);
}
let num_flow_partitions = parse_u16_positive("FF_FLOW_PARTITIONS", 256)?;
let num_budget_partitions = parse_u16_positive("FF_BUDGET_PARTITIONS", 32)?;
let num_quota_partitions = parse_u16_positive("FF_QUOTA_PARTITIONS", 32)?;
Ok((
lanes,
PartitionConfig {
num_flow_partitions,
num_budget_partitions,
num_quota_partitions,
},
))
}
fn parse_u16_positive(var: &str, default: u16) -> Result<u16, String> {
match std::env::var(var) {
Ok(s) => {
let n: u16 = s.parse().map_err(|_| {
format!("{var}: '{s}' is not a valid u16 (1-65535)")
})?;
if n == 0 {
return Err(format!("{var}: must be > 0"));
}
Ok(n)
}
Err(_) => Ok(default),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LanePartition {
pub lane: LaneId,
pub index: u16,
pub collides_with: Vec<LaneId>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CollisionSeverity {
Clean,
Watch,
Elevated,
Remediate,
}
#[derive(Debug, Clone)]
pub struct PartitionCollisionsReport {
pub partitions: u16,
pub total_lanes: usize,
pub colliding_lanes: usize,
pub severity: CollisionSeverity,
pub entries: Vec<LanePartition>,
}
impl PartitionCollisionsReport {
pub fn compute(lanes: &[LaneId], config: &PartitionConfig) -> Self {
let mut by_partition: std::collections::BTreeMap<u16, Vec<LaneId>> =
std::collections::BTreeMap::new();
for lane in lanes {
let p = solo_partition(lane, config);
by_partition.entry(p.index).or_default().push(lane.clone());
}
let mut entries: Vec<LanePartition> = Vec::with_capacity(lanes.len());
let mut colliding_lanes = 0usize;
for (index, siblings) in &by_partition {
let mut sorted_siblings: Vec<LaneId> = siblings.clone();
sorted_siblings.sort_by(|a, b| a.as_str().cmp(b.as_str()));
for lane in siblings {
let mut seen_self = false;
let others: Vec<LaneId> = sorted_siblings
.iter()
.filter(|sib| {
if sib.as_str() == lane.as_str() && !seen_self {
seen_self = true;
false
} else {
true
}
})
.cloned()
.collect();
if !others.is_empty() {
colliding_lanes += 1;
}
entries.push(LanePartition {
lane: lane.clone(),
index: *index,
collides_with: others,
});
}
}
entries.sort_by(|a, b| {
a.index
.cmp(&b.index)
.then_with(|| a.lane.as_str().cmp(b.lane.as_str()))
});
let severity = classify_severity(colliding_lanes, lanes.len());
Self {
partitions: config.num_flow_partitions,
total_lanes: lanes.len(),
colliding_lanes,
severity,
entries,
}
}
pub fn format_plain(&self) -> String {
let mut out = String::new();
out.push_str(&format!(
"FlowFabric partition-collisions probe (RFC-011 §5.6)\n\
\n\
num_flow_partitions: {partitions}\n\
lanes configured: {total}\n\
lanes colliding: {colliding} ({pct:.1}%)\n\
severity: {severity:?}\n\
\n",
partitions = self.partitions,
total = self.total_lanes,
colliding = self.colliding_lanes,
pct = if self.total_lanes == 0 {
0.0
} else {
100.0 * self.colliding_lanes as f64 / self.total_lanes as f64
},
severity = self.severity,
));
let lane_width = self
.entries
.iter()
.map(|e| e.lane.as_str().len())
.max()
.unwrap_or(0)
.max(16);
out.push_str(&format!(
"{:>9} | {:<width$} | collides_with\n",
"partition",
"lane",
width = lane_width,
));
out.push_str(&format!(
"{} | {} | {}\n",
"-".repeat(9),
"-".repeat(lane_width),
"-".repeat(40),
));
for entry in &self.entries {
let collides = if entry.collides_with.is_empty() {
"—".to_string()
} else {
entry
.collides_with
.iter()
.map(|l| l.as_str().to_string())
.collect::<Vec<_>>()
.join(", ")
};
out.push_str(&format!(
"{:>9} | {:<width$} | {}\n",
entry.index,
entry.lane.as_str(),
collides,
width = lane_width,
));
}
if self.colliding_lanes > 0 {
out.push('\n');
out.push_str("Remediation (see docs/rfc011-operator-runbook.md §Partition-collision observability):\n");
out.push_str(" 1. Rename a colliding lane to hash differently (cheapest).\n");
out.push_str(" 2. Bump FF_FLOW_PARTITIONS to halve collision probability (requires clean state).\n");
out.push_str(" 3. Install a custom SoloPartitioner via solo_partition_with (advanced; requires fork).\n");
}
out
}
}
fn classify_severity(colliding: usize, total: usize) -> CollisionSeverity {
if colliding == 0 {
return CollisionSeverity::Clean;
}
if total == 0 {
return CollisionSeverity::Clean;
}
let colliding_bp = colliding.saturating_mul(100); let five_pct = total.saturating_mul(5);
let fifteen_pct = total.saturating_mul(15);
if colliding_bp < five_pct {
CollisionSeverity::Watch
} else if colliding_bp <= fifteen_pct {
CollisionSeverity::Elevated
} else {
CollisionSeverity::Remediate
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg(num_flow: u16) -> PartitionConfig {
PartitionConfig {
num_flow_partitions: num_flow,
num_budget_partitions: 32,
num_quota_partitions: 32,
}
}
fn lane(name: &str) -> LaneId {
LaneId::try_new(name).expect("valid lane id")
}
#[test]
fn zero_lanes_is_clean() {
let r = PartitionCollisionsReport::compute(&[], &cfg(256));
assert_eq!(r.total_lanes, 0);
assert_eq!(r.colliding_lanes, 0);
assert_eq!(r.severity, CollisionSeverity::Clean);
assert!(r.entries.is_empty());
}
#[test]
fn single_lane_is_clean() {
let lanes = vec![lane("default")];
let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
assert_eq!(r.colliding_lanes, 0);
assert_eq!(r.severity, CollisionSeverity::Clean);
assert_eq!(r.entries.len(), 1);
assert!(r.entries[0].collides_with.is_empty());
}
#[test]
fn forced_collision_via_tiny_partition_count() {
let lanes = vec![lane("a"), lane("b"), lane("c")];
let r = PartitionCollisionsReport::compute(&lanes, &cfg(1));
assert_eq!(r.colliding_lanes, 3);
assert_eq!(r.severity, CollisionSeverity::Remediate);
for entry in &r.entries {
assert_eq!(entry.index, 0);
assert_eq!(entry.collides_with.len(), 2);
}
}
#[test]
fn severity_thresholds() {
assert_eq!(classify_severity(0, 100), CollisionSeverity::Clean);
assert_eq!(classify_severity(4, 100), CollisionSeverity::Watch);
assert_eq!(classify_severity(10, 100), CollisionSeverity::Elevated);
assert_eq!(classify_severity(20, 100), CollisionSeverity::Remediate);
assert_eq!(classify_severity(5, 100), CollisionSeverity::Elevated);
assert_eq!(classify_severity(15, 100), CollisionSeverity::Elevated);
assert_eq!(classify_severity(16, 100), CollisionSeverity::Remediate);
}
#[test]
fn entries_sorted_deterministically() {
let lanes = vec![lane("zzz"), lane("aaa"), lane("mmm")];
let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
for pair in r.entries.windows(2) {
let a = &pair[0];
let b = &pair[1];
assert!(
a.index < b.index
|| (a.index == b.index && a.lane.as_str() <= b.lane.as_str()),
"entries not sorted: {a:?} before {b:?}"
);
}
}
#[test]
fn format_plain_clean_deployment() {
let lanes = vec![lane("default")];
let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
let out = r.format_plain();
assert!(out.contains("num_flow_partitions: 256"));
assert!(out.contains("lanes configured: 1"));
assert!(out.contains("lanes colliding: 0"));
assert!(out.contains("Clean"));
assert!(out.contains("default"));
assert!(!out.contains("Remediation"));
}
#[test]
fn format_plain_adapts_width_to_long_lane_name() {
let long = "x".repeat(40);
let lanes = vec![lane(&long), lane("short")];
let r = PartitionCollisionsReport::compute(&lanes, &cfg(256));
let out = r.format_plain();
for line in out.lines().filter(|l| l.starts_with(|c: char| c.is_ascii_digit() || c == ' ')) {
if let Some(middle) = line.split('|').nth(1) {
let middle_trim_right = middle.trim_end();
if middle_trim_right.contains(&long) {
assert!(
middle.len() > long.len(),
"row middle too narrow for long lane: {middle:?}"
);
}
}
}
assert!(out.contains(&long));
}
#[test]
fn format_plain_forced_collision_includes_remediation() {
let lanes = vec![lane("a"), lane("b")];
let r = PartitionCollisionsReport::compute(&lanes, &cfg(1));
let out = r.format_plain();
assert!(out.contains("Remediate"));
assert!(out.contains("Remediation"));
assert!(out.contains("FF_FLOW_PARTITIONS"));
assert!(out.contains("SoloPartitioner"));
assert!(out.contains("a") && out.contains("b"));
assert!(
out.contains("\n 1. Rename"),
"remediation step 1 missing two-space indent in: {out:?}"
);
assert!(
out.contains("\n 2. Bump FF_FLOW_PARTITIONS"),
"remediation step 2 missing two-space indent in: {out:?}"
);
assert!(
out.contains("\n 3. Install a custom SoloPartitioner"),
"remediation step 3 missing two-space indent in: {out:?}"
);
}
}