#![allow(dead_code)]
use std::collections::HashMap;
use std::net::IpAddr;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BondingMode {
RoundRobin,
ActiveBackup,
WeightedBalance,
Broadcast,
}
impl std::fmt::Display for BondingMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let label = match self {
Self::RoundRobin => "round_robin",
Self::ActiveBackup => "active_backup",
Self::WeightedBalance => "weighted_balance",
Self::Broadcast => "broadcast",
};
write!(f, "{label}")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LinkHealth {
Healthy,
Degraded,
Down,
}
impl std::fmt::Display for LinkHealth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let label = match self {
Self::Healthy => "healthy",
Self::Degraded => "degraded",
Self::Down => "down",
};
write!(f, "{label}")
}
}
#[derive(Debug, Clone)]
pub struct LinkConfig {
pub id: String,
pub local_addr: IpAddr,
pub weight: u32,
pub max_bandwidth_bps: u64,
pub health_check_interval: Duration,
pub failure_threshold: u32,
}
#[derive(Debug, Clone)]
pub struct LinkState {
pub config: LinkConfig,
pub health: LinkHealth,
pub packets_sent: u64,
pub bytes_sent: u64,
pub packets_lost: u64,
pub rtt: Duration,
pub last_health_check: Option<Instant>,
pub consecutive_failures: u32,
}
impl LinkState {
fn new(config: LinkConfig) -> Self {
Self {
config,
health: LinkHealth::Healthy,
packets_sent: 0,
bytes_sent: 0,
packets_lost: 0,
rtt: Duration::from_millis(1),
last_health_check: None,
consecutive_failures: 0,
}
}
#[allow(clippy::cast_precision_loss)]
fn loss_rate(&self) -> f64 {
if self.packets_sent == 0 {
return 0.0;
}
self.packets_lost as f64 / self.packets_sent as f64
}
}
#[derive(Debug, Clone)]
pub struct BondingConfig {
pub mode: BondingMode,
pub degraded_loss_threshold: f64,
pub down_loss_threshold: f64,
pub degraded_rtt_threshold: Duration,
pub min_healthy_links: usize,
}
impl Default for BondingConfig {
fn default() -> Self {
Self {
mode: BondingMode::WeightedBalance,
degraded_loss_threshold: 0.01,
down_loss_threshold: 0.10,
degraded_rtt_threshold: Duration::from_millis(50),
min_healthy_links: 1,
}
}
}
#[derive(Debug, Clone)]
pub struct RouteDecision {
pub link_ids: Vec<String>,
pub is_redundant: bool,
}
#[derive(Debug, Clone)]
pub struct BondingStats {
pub total_links: usize,
pub healthy_links: usize,
pub degraded_links: usize,
pub down_links: usize,
pub aggregate_bandwidth_bps: u64,
pub total_packets_routed: u64,
pub mode: BondingMode,
}
pub struct BondingGroup {
config: BondingConfig,
links: HashMap<String, LinkState>,
link_order: Vec<String>,
rr_index: usize,
active_link: Option<String>,
total_routed: u64,
}
impl BondingGroup {
#[must_use]
pub fn new(config: BondingConfig) -> Self {
Self {
config,
links: HashMap::new(),
link_order: Vec::new(),
rr_index: 0,
active_link: None,
total_routed: 0,
}
}
pub fn add_link(&mut self, config: LinkConfig) {
let id = config.id.clone();
self.links.insert(id.clone(), LinkState::new(config));
self.link_order.push(id.clone());
if self.active_link.is_none() {
self.active_link = Some(id);
}
}
pub fn remove_link(&mut self, link_id: &str) -> bool {
if self.links.remove(link_id).is_some() {
self.link_order.retain(|id| id != link_id);
if self.active_link.as_deref() == Some(link_id) {
self.active_link = self.find_healthy_link();
}
true
} else {
false
}
}
pub fn report_link_health(
&mut self,
link_id: &str,
rtt: Duration,
packets_sent: u64,
packets_lost: u64,
) {
if let Some(link) = self.links.get_mut(link_id) {
link.rtt = rtt;
link.packets_sent += packets_sent;
link.packets_lost += packets_lost;
link.last_health_check = Some(Instant::now());
let loss = link.loss_rate();
if loss >= self.config.down_loss_threshold {
link.consecutive_failures += 1;
if link.consecutive_failures >= link.config.failure_threshold {
link.health = LinkHealth::Down;
}
} else if loss >= self.config.degraded_loss_threshold
|| rtt > self.config.degraded_rtt_threshold
{
link.health = LinkHealth::Degraded;
link.consecutive_failures = 0;
} else {
link.health = LinkHealth::Healthy;
link.consecutive_failures = 0;
}
if self.config.mode == BondingMode::ActiveBackup {
if let Some(active_id) = &self.active_link {
if let Some(active) = self.links.get(active_id) {
if active.health == LinkHealth::Down {
self.active_link = self.find_healthy_link();
}
}
}
}
}
}
fn find_healthy_link(&self) -> Option<String> {
self.links
.iter()
.filter(|(_, state)| state.health == LinkHealth::Healthy)
.min_by_key(|(_, state)| state.rtt)
.map(|(id, _)| id.clone())
}
pub fn route_packet(&mut self, _packet_size: usize) -> RouteDecision {
self.total_routed += 1;
match self.config.mode {
BondingMode::RoundRobin => self.route_round_robin(),
BondingMode::ActiveBackup => self.route_active_backup(),
BondingMode::WeightedBalance => self.route_weighted(),
BondingMode::Broadcast => self.route_broadcast(),
}
}
fn route_round_robin(&mut self) -> RouteDecision {
let healthy: Vec<String> = self
.link_order
.iter()
.filter(|id| {
self.links
.get(*id)
.is_some_and(|s| s.health != LinkHealth::Down)
})
.cloned()
.collect();
if healthy.is_empty() {
return RouteDecision {
link_ids: Vec::new(),
is_redundant: false,
};
}
let idx = self.rr_index % healthy.len();
self.rr_index = self.rr_index.wrapping_add(1);
RouteDecision {
link_ids: vec![healthy[idx].clone()],
is_redundant: false,
}
}
fn route_active_backup(&self) -> RouteDecision {
let link_id = self
.active_link
.clone()
.or_else(|| self.find_healthy_link());
RouteDecision {
link_ids: link_id.into_iter().collect(),
is_redundant: false,
}
}
#[allow(clippy::cast_precision_loss)]
fn route_weighted(&mut self) -> RouteDecision {
let healthy: Vec<(String, u32)> = self
.links
.iter()
.filter(|(_, state)| state.health != LinkHealth::Down)
.map(|(id, state)| (id.clone(), state.config.weight))
.collect();
if healthy.is_empty() {
return RouteDecision {
link_ids: Vec::new(),
is_redundant: false,
};
}
let total_weight: u32 = healthy.iter().map(|(_, w)| w).sum();
if total_weight == 0 {
return self.route_round_robin();
}
let slot = (self.total_routed % u64::from(total_weight)) as u32;
let mut cumulative = 0u32;
for (id, weight) in &healthy {
cumulative += weight;
if slot < cumulative {
return RouteDecision {
link_ids: vec![id.clone()],
is_redundant: false,
};
}
}
RouteDecision {
link_ids: vec![healthy[0].0.clone()],
is_redundant: false,
}
}
fn route_broadcast(&self) -> RouteDecision {
let ids: Vec<String> = self
.links
.iter()
.filter(|(_, state)| state.health != LinkHealth::Down)
.map(|(id, _)| id.clone())
.collect();
RouteDecision {
link_ids: ids,
is_redundant: true,
}
}
#[must_use]
pub fn stats(&self) -> BondingStats {
let healthy = self
.links
.values()
.filter(|s| s.health == LinkHealth::Healthy)
.count();
let degraded = self
.links
.values()
.filter(|s| s.health == LinkHealth::Degraded)
.count();
let down = self
.links
.values()
.filter(|s| s.health == LinkHealth::Down)
.count();
let agg_bw: u64 = self
.links
.values()
.filter(|s| s.health != LinkHealth::Down)
.map(|s| s.config.max_bandwidth_bps)
.sum();
BondingStats {
total_links: self.links.len(),
healthy_links: healthy,
degraded_links: degraded,
down_links: down,
aggregate_bandwidth_bps: agg_bw,
total_packets_routed: self.total_routed,
mode: self.config.mode,
}
}
#[must_use]
pub fn link_count(&self) -> usize {
self.links.len()
}
#[must_use]
pub fn is_operational(&self) -> bool {
let healthy = self
.links
.values()
.filter(|s| s.health == LinkHealth::Healthy)
.count();
healthy >= self.config.min_healthy_links
}
}
#[must_use]
pub fn make_link_config(id: &str, addr: IpAddr, bandwidth_mbps: u64, weight: u32) -> LinkConfig {
LinkConfig {
id: id.to_string(),
local_addr: addr,
weight,
max_bandwidth_bps: bandwidth_mbps * 1_000_000,
health_check_interval: Duration::from_secs(1),
failure_threshold: 3,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
fn test_addr(last_octet: u8) -> IpAddr {
IpAddr::V4(Ipv4Addr::new(192, 168, 1, last_octet))
}
fn make_test_bond(mode: BondingMode) -> BondingGroup {
let config = BondingConfig {
mode,
..Default::default()
};
let mut group = BondingGroup::new(config);
group.add_link(make_link_config("eth0", test_addr(1), 1000, 3));
group.add_link(make_link_config("eth1", test_addr(2), 1000, 1));
group
}
#[test]
fn test_add_remove_links() {
let mut group = make_test_bond(BondingMode::RoundRobin);
assert_eq!(group.link_count(), 2);
group.remove_link("eth0");
assert_eq!(group.link_count(), 1);
assert!(!group.remove_link("nonexistent"));
}
#[test]
fn test_round_robin_distribution() {
let mut group = make_test_bond(BondingMode::RoundRobin);
let d1 = group.route_packet(1000);
let d2 = group.route_packet(1000);
assert_eq!(d1.link_ids.len(), 1);
assert_eq!(d2.link_ids.len(), 1);
assert_ne!(d1.link_ids[0], d2.link_ids[0]);
}
#[test]
fn test_active_backup_single_link() {
let mut group = make_test_bond(BondingMode::ActiveBackup);
let d1 = group.route_packet(1000);
let d2 = group.route_packet(1000);
assert_eq!(
d1.link_ids, d2.link_ids,
"active-backup should use same link"
);
}
#[test]
fn test_broadcast_all_links() {
let mut group = make_test_bond(BondingMode::Broadcast);
let decision = group.route_packet(1000);
assert_eq!(decision.link_ids.len(), 2);
assert!(decision.is_redundant);
}
#[test]
fn test_weighted_balance_respects_weight() {
let mut group = make_test_bond(BondingMode::WeightedBalance);
let mut eth0_count = 0u64;
let mut eth1_count = 0u64;
for _ in 0..400 {
let d = group.route_packet(1000);
if d.link_ids.first().map(|s| s.as_str()) == Some("eth0") {
eth0_count += 1;
} else {
eth1_count += 1;
}
}
assert!(
eth0_count > eth1_count,
"eth0 (weight 3) should get more traffic: {} vs {}",
eth0_count,
eth1_count
);
}
#[test]
fn test_link_health_degrades() {
let mut group = make_test_bond(BondingMode::RoundRobin);
group.report_link_health("eth0", Duration::from_millis(5), 100, 5);
let state = group.links.get("eth0").expect("should succeed in test");
assert_eq!(state.health, LinkHealth::Degraded);
}
#[test]
fn test_link_goes_down() {
let mut group = make_test_bond(BondingMode::RoundRobin);
for _ in 0..3 {
group.report_link_health("eth0", Duration::from_millis(5), 100, 50);
}
let state = group.links.get("eth0").expect("should succeed in test");
assert_eq!(state.health, LinkHealth::Down);
}
#[test]
fn test_active_backup_failover() {
let mut group = make_test_bond(BondingMode::ActiveBackup);
let initial = group.active_link.clone().expect("should succeed in test");
for _ in 0..3 {
group.report_link_health(&initial, Duration::from_millis(5), 100, 50);
}
let d = group.route_packet(1000);
assert_eq!(d.link_ids.len(), 1);
assert_ne!(d.link_ids[0], initial, "should failover to backup");
}
#[test]
fn test_bonding_stats() {
let mut group = make_test_bond(BondingMode::RoundRobin);
group.route_packet(1000);
group.route_packet(1000);
let stats = group.stats();
assert_eq!(stats.total_links, 2);
assert_eq!(stats.healthy_links, 2);
assert_eq!(stats.total_packets_routed, 2);
assert_eq!(stats.aggregate_bandwidth_bps, 2_000_000_000);
}
#[test]
fn test_is_operational() {
let config = BondingConfig {
mode: BondingMode::RoundRobin,
min_healthy_links: 2,
..Default::default()
};
let mut group = BondingGroup::new(config);
group.add_link(make_link_config("eth0", test_addr(1), 1000, 1));
group.add_link(make_link_config("eth1", test_addr(2), 1000, 1));
assert!(group.is_operational());
for _ in 0..3 {
group.report_link_health("eth0", Duration::from_millis(5), 100, 50);
}
assert!(!group.is_operational());
}
#[test]
fn test_bonding_mode_display() {
assert_eq!(format!("{}", BondingMode::RoundRobin), "round_robin");
assert_eq!(format!("{}", BondingMode::ActiveBackup), "active_backup");
assert_eq!(
format!("{}", BondingMode::WeightedBalance),
"weighted_balance"
);
assert_eq!(format!("{}", BondingMode::Broadcast), "broadcast");
}
#[test]
fn test_link_health_display() {
assert_eq!(format!("{}", LinkHealth::Healthy), "healthy");
assert_eq!(format!("{}", LinkHealth::Degraded), "degraded");
assert_eq!(format!("{}", LinkHealth::Down), "down");
}
#[test]
fn test_empty_group_route() {
let config = BondingConfig::default();
let mut group = BondingGroup::new(config);
let d = group.route_packet(1000);
assert!(d.link_ids.is_empty());
}
}