#![allow(dead_code)]
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulingStrategy {
Redundant,
RoundRobin,
BandwidthWeighted,
ActiveStandby,
}
impl SchedulingStrategy {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::Redundant => "redundant",
Self::RoundRobin => "round-robin",
Self::BandwidthWeighted => "bandwidth-weighted",
Self::ActiveStandby => "active-standby",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PathStatus {
Up,
Degraded,
Down,
Standby,
}
#[derive(Debug, Clone)]
pub struct PathHealth {
pub rtt: Duration,
pub rtt_variance: Duration,
pub one_way_delay: Duration,
pub loss_rate: f64,
pub bandwidth_bps: f64,
pub jitter_us: f64,
pub status: PathStatus,
pub last_updated: Instant,
}
impl Default for PathHealth {
fn default() -> Self {
Self {
rtt: Duration::from_millis(10),
rtt_variance: Duration::from_millis(1),
one_way_delay: Duration::from_millis(5),
loss_rate: 0.0,
bandwidth_bps: 10_000_000.0, jitter_us: 0.0,
status: PathStatus::Up,
last_updated: Instant::now(),
}
}
}
impl PathHealth {
#[must_use]
pub fn quality_score(&self) -> f64 {
let rtt_penalty = (self.rtt.as_millis() as f64 / 200.0).clamp(0.0, 1.0);
let loss_penalty = self.loss_rate.clamp(0.0, 1.0);
let jitter_penalty = (self.jitter_us / 10_000.0).clamp(0.0, 1.0);
let raw = 1.0 - (rtt_penalty * 0.4 + loss_penalty * 0.4 + jitter_penalty * 0.2);
raw.clamp(0.0, 1.0)
}
pub fn update_rtt(&mut self, sample: Duration) {
let alpha_8 = sample / 8;
self.rtt = self.rtt - (self.rtt / 8) + alpha_8;
let abs_diff = if sample > self.rtt {
sample - self.rtt
} else {
self.rtt - sample
};
self.rtt_variance = self.rtt_variance - (self.rtt_variance / 4) + abs_diff / 4;
self.last_updated = Instant::now();
}
pub fn update_loss(&mut self, lost: bool) {
let sample = if lost { 1.0 } else { 0.0 };
self.loss_rate = 0.875 * self.loss_rate + 0.125 * sample;
if self.loss_rate > 0.5 {
self.status = PathStatus::Degraded;
} else {
self.status = PathStatus::Up;
}
self.last_updated = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct PathHandle {
pub id: u32,
pub local_addr: SocketAddr,
pub remote_addr: SocketAddr,
pub label: String,
pub is_primary: bool,
pub health: PathHealth,
pub bytes_sent: u64,
pub packets_sent: u64,
}
impl PathHandle {
#[must_use]
pub fn new(
id: u32,
local_addr: SocketAddr,
remote_addr: SocketAddr,
label: impl Into<String>,
) -> Self {
Self {
id,
local_addr,
remote_addr,
label: label.into(),
is_primary: id == 0,
health: PathHealth::default(),
bytes_sent: 0,
packets_sent: 0,
}
}
#[must_use]
pub fn is_usable(&self) -> bool {
matches!(self.health.status, PathStatus::Up | PathStatus::Degraded)
}
}
#[derive(Debug, Clone)]
pub struct MultipathConfig {
pub strategy: SchedulingStrategy,
pub dedup_window: u32,
pub merge_delay: Duration,
pub probe_interval: Duration,
pub degraded_loss_threshold: f64,
pub down_loss_threshold: f64,
}
impl Default for MultipathConfig {
fn default() -> Self {
Self {
strategy: SchedulingStrategy::Redundant,
dedup_window: 512,
merge_delay: Duration::from_millis(30),
probe_interval: Duration::from_millis(200),
degraded_loss_threshold: 0.05,
down_loss_threshold: 0.30,
}
}
}
impl MultipathConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_strategy(mut self, s: SchedulingStrategy) -> Self {
self.strategy = s;
self
}
}
pub struct MultipathSender {
config: MultipathConfig,
paths: Vec<PathHandle>,
rr_index: usize,
total_dispatched: u64,
next_seq: u32,
}
impl MultipathSender {
#[must_use]
pub fn new(config: MultipathConfig) -> Self {
Self {
config,
paths: Vec::new(),
rr_index: 0,
total_dispatched: 0,
next_seq: 0,
}
}
pub fn add_path(&mut self, path: PathHandle) {
self.paths.push(path);
}
pub fn remove_path(&mut self, path_id: u32) {
self.paths.retain(|p| p.id != path_id);
}
#[must_use]
pub fn path_count(&self) -> usize {
self.paths.len()
}
#[must_use]
pub fn usable_path_count(&self) -> usize {
self.paths.iter().filter(|p| p.is_usable()).count()
}
pub fn schedule(&mut self, payload_len: usize) -> Vec<(u32, SocketAddr)> {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
self.total_dispatched += 1;
let usable: Vec<usize> = self
.paths
.iter()
.enumerate()
.filter(|(_, p)| p.is_usable())
.map(|(i, _)| i)
.collect();
if usable.is_empty() {
return Vec::new();
}
let mut out = Vec::new();
match self.config.strategy {
SchedulingStrategy::Redundant => {
for &idx in &usable {
self.paths[idx].packets_sent += 1;
self.paths[idx].bytes_sent += payload_len as u64;
out.push((self.paths[idx].id, self.paths[idx].remote_addr));
}
}
SchedulingStrategy::RoundRobin => {
let start = self.rr_index % usable.len();
let idx = usable[start];
self.rr_index = (self.rr_index + 1) % usable.len();
self.paths[idx].packets_sent += 1;
self.paths[idx].bytes_sent += payload_len as u64;
out.push((self.paths[idx].id, self.paths[idx].remote_addr));
}
SchedulingStrategy::BandwidthWeighted => {
let best_idx = usable
.iter()
.copied()
.max_by(|&a, &b| {
self.paths[a]
.health
.bandwidth_bps
.partial_cmp(&self.paths[b].health.bandwidth_bps)
.unwrap_or(std::cmp::Ordering::Equal)
})
.unwrap_or(usable[0]);
self.paths[best_idx].packets_sent += 1;
self.paths[best_idx].bytes_sent += payload_len as u64;
out.push((self.paths[best_idx].id, self.paths[best_idx].remote_addr));
}
SchedulingStrategy::ActiveStandby => {
let primary_idx = usable.iter().copied().find(|&i| self.paths[i].is_primary);
let idx = primary_idx.unwrap_or(usable[0]);
self.paths[idx].packets_sent += 1;
self.paths[idx].bytes_sent += payload_len as u64;
out.push((self.paths[idx].id, self.paths[idx].remote_addr));
}
}
let _ = seq; out
}
pub fn update_path_rtt(&mut self, path_id: u32, rtt: Duration) {
if let Some(p) = self.paths.iter_mut().find(|p| p.id == path_id) {
p.health.update_rtt(rtt);
}
}
pub fn report_path_loss(&mut self, path_id: u32, lost: bool) {
if let Some(p) = self.paths.iter_mut().find(|p| p.id == path_id) {
p.health.update_loss(lost);
}
}
#[must_use]
pub fn paths(&self) -> &[PathHandle] {
&self.paths
}
#[must_use]
pub fn best_path(&self) -> Option<&PathHandle> {
self.paths.iter().filter(|p| p.is_usable()).max_by(|a, b| {
a.health
.quality_score()
.partial_cmp(&b.health.quality_score())
.unwrap_or(std::cmp::Ordering::Equal)
})
}
#[must_use]
pub const fn total_dispatched(&self) -> u64 {
self.total_dispatched
}
}
pub struct MultipathReceiver {
config: MultipathConfig,
dedup_seen: BTreeMap<u32, Instant>,
ready: BTreeMap<u32, Vec<u8>>,
path_stats: HashMap<u32, (u64, u64)>,
total_delivered: u64,
highest_seq: Option<u32>,
}
impl MultipathReceiver {
#[must_use]
pub fn new(config: MultipathConfig) -> Self {
Self {
config,
dedup_seen: BTreeMap::new(),
ready: BTreeMap::new(),
path_stats: HashMap::new(),
total_delivered: 0,
highest_seq: None,
}
}
pub fn receive(&mut self, seq: u32, payload: Vec<u8>, path_id: u32) -> bool {
let stats = self.path_stats.entry(path_id).or_insert((0, 0));
stats.0 += 1;
if self.dedup_seen.contains_key(&seq) {
stats.1 += 1;
return false;
}
let window = self.config.dedup_window;
if self.dedup_seen.len() >= window as usize {
if let Some((&oldest_seq, _)) = self.dedup_seen.iter().next() {
self.dedup_seen.remove(&oldest_seq);
}
}
self.dedup_seen.insert(seq, Instant::now());
self.ready.insert(seq, payload);
match self.highest_seq {
None => self.highest_seq = Some(seq),
Some(h) if seq.wrapping_sub(h) < 32768 => self.highest_seq = Some(seq),
_ => {}
}
true
}
pub fn drain_ready(&mut self) -> Vec<(u32, Vec<u8>)> {
let delay = self.config.merge_delay;
let mut out = Vec::new();
let ready_seqs: Vec<u32> = self
.dedup_seen
.iter()
.filter(|(_, t)| t.elapsed() >= delay)
.filter(|(seq, _)| self.ready.contains_key(seq))
.map(|(&seq, _)| seq)
.collect();
for seq in ready_seqs {
if let Some(payload) = self.ready.remove(&seq) {
out.push((seq, payload));
self.total_delivered += 1;
}
}
out
}
#[must_use]
pub fn path_stats(&self, path_id: u32) -> Option<(u64, u64)> {
self.path_stats.get(&path_id).copied()
}
#[must_use]
pub const fn total_delivered(&self) -> u64 {
self.total_delivered
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.ready.len()
}
}
#[derive(Debug, Clone)]
pub struct MultipathEndpoint {
pub address: String,
pub weight: f32,
pub interface: Option<String>,
}
impl MultipathEndpoint {
#[must_use]
pub fn new(address: impl Into<String>, weight: f32) -> Self {
Self {
address: address.into(),
weight: weight.clamp(0.0, 1.0),
interface: None,
}
}
#[must_use]
pub fn with_interface(mut self, iface: impl Into<String>) -> Self {
self.interface = Some(iface.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MultipathScheduler {
RoundRobin,
WeightedRoundRobin,
MinLatency,
Redundant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RedundancyMode {
None,
Fec,
Duplicate,
}
#[derive(Debug, Clone, Default)]
pub struct PathStats {
pub bytes_sent: u64,
pub bytes_received: u64,
pub rtt_ms: f32,
pub packet_loss_rate: f32,
pub active: bool,
}
impl PathStats {
#[must_use]
pub fn new_active() -> Self {
Self {
active: true,
..Default::default()
}
}
pub fn update_rtt(&mut self, sample_ms: f32) {
if self.rtt_ms == 0.0 {
self.rtt_ms = sample_ms;
} else {
self.rtt_ms = self.rtt_ms * 0.875 + sample_ms * 0.125;
}
}
pub fn update_loss(&mut self, loss_rate: f32) {
self.packet_loss_rate = self.packet_loss_rate * 0.875 + loss_rate.clamp(0.0, 1.0) * 0.125;
}
}
#[derive(Debug, Clone, Default)]
pub struct MultipathStats {
pub path_stats: Vec<PathStats>,
pub total_bytes_sent: u64,
pub total_bytes_received: u64,
pub reordered_packets: u64,
pub recovered_packets: u64,
}
impl MultipathStats {
#[must_use]
pub fn for_endpoints(count: usize) -> Self {
Self {
path_stats: (0..count).map(|_| PathStats::new_active()).collect(),
..Default::default()
}
}
}
#[derive(Debug, Clone)]
pub struct MultipathStreamConfig {
pub paths: Vec<MultipathEndpoint>,
pub scheduler: MultipathScheduler,
pub redundancy_mode: RedundancyMode,
}
impl MultipathStreamConfig {
#[must_use]
pub fn new(
paths: Vec<MultipathEndpoint>,
scheduler: MultipathScheduler,
redundancy_mode: RedundancyMode,
) -> Self {
Self {
paths,
scheduler,
redundancy_mode,
}
}
}
pub struct MultipathStreamSender {
config: MultipathStreamConfig,
stats: MultipathStats,
round_robin_idx: usize,
wrr_counters: Vec<f32>,
}
impl MultipathStreamSender {
#[must_use]
pub fn new(config: MultipathStreamConfig) -> Self {
let path_count = config.paths.len();
let wrr_counters = config.paths.iter().map(|_| 0.0f32).collect();
Self {
stats: MultipathStats::for_endpoints(path_count),
config,
round_robin_idx: 0,
wrr_counters,
}
}
pub fn select_path(&mut self) -> usize {
let active: Vec<usize> = self
.stats
.path_stats
.iter()
.enumerate()
.filter(|(_, s)| s.active)
.map(|(i, _)| i)
.collect();
if active.is_empty() {
return 0;
}
match self.config.scheduler {
MultipathScheduler::RoundRobin => {
let pos = self.round_robin_idx % active.len();
let idx = active[pos];
self.round_robin_idx = self.round_robin_idx.wrapping_add(1);
idx
}
MultipathScheduler::WeightedRoundRobin => {
let total_weight: f32 = active
.iter()
.map(|&i| self.config.paths[i].weight)
.sum::<f32>()
.max(f32::EPSILON);
for &i in &active {
self.wrr_counters[i] += self.config.paths[i].weight / total_weight;
}
let best = active
.iter()
.copied()
.max_by(|&a, &b| {
self.wrr_counters[a]
.partial_cmp(&self.wrr_counters[b])
.unwrap_or(std::cmp::Ordering::Equal)
})
.unwrap_or(active[0]);
self.wrr_counters[best] -= 1.0;
best
}
MultipathScheduler::MinLatency => {
active
.iter()
.copied()
.min_by(|&a, &b| {
let ra = if self.stats.path_stats[a].rtt_ms > 0.0 {
self.stats.path_stats[a].rtt_ms
} else {
f32::MAX
};
let rb = if self.stats.path_stats[b].rtt_ms > 0.0 {
self.stats.path_stats[b].rtt_ms
} else {
f32::MAX
};
ra.partial_cmp(&rb).unwrap_or(std::cmp::Ordering::Equal)
})
.unwrap_or(active[0])
}
MultipathScheduler::Redundant => {
active[0]
}
}
}
#[must_use]
pub fn active_path_indices(&self) -> Vec<usize> {
self.stats
.path_stats
.iter()
.enumerate()
.filter(|(_, s)| s.active)
.map(|(i, _)| i)
.collect()
}
pub fn update_path_rtt(&mut self, path_idx: usize, rtt_ms: f32) {
if let Some(ps) = self.stats.path_stats.get_mut(path_idx) {
ps.update_rtt(rtt_ms);
}
}
pub fn update_path_loss(&mut self, path_idx: usize, loss_rate: f32) {
if let Some(ps) = self.stats.path_stats.get_mut(path_idx) {
ps.update_loss(loss_rate);
if ps.packet_loss_rate > 0.90 {
ps.active = false;
}
}
}
pub fn record_bytes_sent(&mut self, path_idx: usize, bytes: u64) {
if let Some(ps) = self.stats.path_stats.get_mut(path_idx) {
ps.bytes_sent = ps.bytes_sent.saturating_add(bytes);
self.stats.total_bytes_sent = self.stats.total_bytes_sent.saturating_add(bytes);
}
}
#[must_use]
pub fn stats(&self) -> &MultipathStats {
&self.stats
}
#[must_use]
pub fn active_path_count(&self) -> usize {
self.stats.path_stats.iter().filter(|s| s.active).count()
}
pub fn set_path_active(&mut self, path_idx: usize, active: bool) {
if let Some(ps) = self.stats.path_stats.get_mut(path_idx) {
ps.active = active;
}
}
#[must_use]
pub fn config(&self) -> &MultipathStreamConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_path(id: u32) -> PathHandle {
PathHandle::new(
id,
"127.0.0.1:5000".parse().expect("valid addr"),
"192.168.1.1:5000".parse().expect("valid addr"),
format!("eth{id}"),
)
}
fn default_config() -> MultipathConfig {
MultipathConfig::default()
}
#[test]
fn test_strategy_names() {
assert_eq!(SchedulingStrategy::Redundant.name(), "redundant");
assert_eq!(SchedulingStrategy::RoundRobin.name(), "round-robin");
assert_eq!(
SchedulingStrategy::BandwidthWeighted.name(),
"bandwidth-weighted"
);
assert_eq!(SchedulingStrategy::ActiveStandby.name(), "active-standby");
}
#[test]
fn test_path_health_quality_score_default() {
let h = PathHealth::default();
let score = h.quality_score();
assert!(score > 0.0 && score <= 1.0);
}
#[test]
fn test_path_health_quality_decreases_with_loss() {
let h_good = PathHealth::default();
let mut h_bad = PathHealth::default();
h_bad.loss_rate = 0.5;
assert!(h_good.quality_score() > h_bad.quality_score());
}
#[test]
fn test_path_health_rtt_update() {
let mut h = PathHealth::default();
let old_rtt = h.rtt;
h.update_rtt(Duration::from_millis(100));
assert_ne!(h.rtt, old_rtt);
}
#[test]
fn test_path_health_loss_update() {
let mut h = PathHealth::default();
assert_eq!(h.loss_rate, 0.0);
h.update_loss(true);
assert!(h.loss_rate > 0.0);
}
#[test]
fn test_path_handle_is_usable() {
let p = make_path(0);
assert!(p.is_usable());
}
#[test]
fn test_sender_add_path() {
let mut sender = MultipathSender::new(default_config());
sender.add_path(make_path(0));
sender.add_path(make_path(1));
assert_eq!(sender.path_count(), 2);
}
#[test]
fn test_sender_redundant_strategy() {
let cfg = MultipathConfig::new().with_strategy(SchedulingStrategy::Redundant);
let mut sender = MultipathSender::new(cfg);
sender.add_path(make_path(0));
sender.add_path(make_path(1));
let dispatched = sender.schedule(188);
assert_eq!(dispatched.len(), 2);
}
#[test]
fn test_sender_round_robin() {
let cfg = MultipathConfig::new().with_strategy(SchedulingStrategy::RoundRobin);
let mut sender = MultipathSender::new(cfg);
sender.add_path(make_path(0));
sender.add_path(make_path(1));
let d1 = sender.schedule(188);
let d2 = sender.schedule(188);
assert_eq!(d1.len(), 1);
assert_eq!(d2.len(), 1);
assert_ne!(d1[0].0, d2[0].0);
}
#[test]
fn test_sender_bandwidth_weighted() {
let cfg = MultipathConfig::new().with_strategy(SchedulingStrategy::BandwidthWeighted);
let mut sender = MultipathSender::new(cfg);
let mut p0 = make_path(0);
p0.health.bandwidth_bps = 1_000_000.0;
let mut p1 = make_path(1);
p1.health.bandwidth_bps = 10_000_000.0;
sender.add_path(p0);
sender.add_path(p1);
let dispatched = sender.schedule(188);
assert_eq!(dispatched[0].0, 1); }
#[test]
fn test_sender_active_standby() {
let cfg = MultipathConfig::new().with_strategy(SchedulingStrategy::ActiveStandby);
let mut sender = MultipathSender::new(cfg);
let p0 = make_path(0); let p1 = make_path(1);
sender.add_path(p0);
sender.add_path(p1);
let dispatched = sender.schedule(188);
assert_eq!(dispatched[0].0, 0); }
#[test]
fn test_sender_remove_path() {
let mut sender = MultipathSender::new(default_config());
sender.add_path(make_path(0));
sender.add_path(make_path(1));
sender.remove_path(1);
assert_eq!(sender.path_count(), 1);
}
#[test]
fn test_sender_no_paths() {
let mut sender = MultipathSender::new(default_config());
let dispatched = sender.schedule(188);
assert!(dispatched.is_empty());
}
#[test]
fn test_receiver_dedup() {
let mut rx = MultipathReceiver::new(default_config());
assert!(rx.receive(0, vec![0u8], 0)); assert!(!rx.receive(0, vec![0u8], 1)); }
#[test]
fn test_receiver_drain_after_delay() {
let cfg = MultipathConfig {
merge_delay: Duration::from_millis(1),
..Default::default()
};
let mut rx = MultipathReceiver::new(cfg);
rx.receive(0, vec![42u8], 0);
std::thread::sleep(Duration::from_millis(5));
let drained = rx.drain_ready();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].0, 0);
}
#[test]
fn test_receiver_path_stats() {
let mut rx = MultipathReceiver::new(default_config());
rx.receive(0, vec![0u8], 0);
rx.receive(0, vec![0u8], 1); let (recv, dups) = rx.path_stats(1).expect("path 1 should have stats");
assert_eq!(recv, 1);
assert_eq!(dups, 1);
}
#[test]
fn test_sender_best_path() {
let mut sender = MultipathSender::new(default_config());
let mut p0 = make_path(0);
p0.health.loss_rate = 0.5; let p1 = make_path(1); sender.add_path(p0);
sender.add_path(p1);
let best = sender.best_path().expect("should have a best path");
assert_eq!(best.id, 1);
}
#[test]
fn test_sender_update_path_rtt() {
let mut sender = MultipathSender::new(default_config());
sender.add_path(make_path(0));
sender.update_path_rtt(0, Duration::from_millis(50));
let p = sender.paths().first().expect("should have path");
assert!(p.health.rtt > Duration::ZERO);
}
#[test]
fn test_sender_usable_path_count() {
let mut sender = MultipathSender::new(default_config());
let mut p0 = make_path(0);
p0.health.status = PathStatus::Down;
let p1 = make_path(1);
sender.add_path(p0);
sender.add_path(p1);
assert_eq!(sender.usable_path_count(), 1);
}
#[test]
fn test_sender_total_dispatched() {
let cfg = MultipathConfig::new().with_strategy(SchedulingStrategy::RoundRobin);
let mut sender = MultipathSender::new(cfg);
sender.add_path(make_path(0));
sender.schedule(100);
sender.schedule(100);
assert_eq!(sender.total_dispatched(), 2);
}
fn make_stream_config(scheduler: MultipathScheduler) -> MultipathStreamConfig {
MultipathStreamConfig::new(
vec![
MultipathEndpoint::new("192.168.1.1:5004", 0.5),
MultipathEndpoint::new("192.168.2.1:5004", 0.5),
],
scheduler,
RedundancyMode::None,
)
}
#[test]
fn test_stream_sender_starts_active() {
let s = MultipathStreamSender::new(make_stream_config(MultipathScheduler::RoundRobin));
assert_eq!(s.active_path_count(), 2);
}
#[test]
fn test_stream_sender_round_robin_alternates() {
let cfg = make_stream_config(MultipathScheduler::RoundRobin);
let mut s = MultipathStreamSender::new(cfg);
let p0 = s.select_path();
let p1 = s.select_path();
assert_ne!(p0, p1);
let p2 = s.select_path();
assert_eq!(p0, p2); }
#[test]
fn test_stream_sender_min_latency() {
let cfg = make_stream_config(MultipathScheduler::MinLatency);
let mut s = MultipathStreamSender::new(cfg);
s.update_path_rtt(0, 150.0);
s.update_path_rtt(1, 20.0);
let chosen = s.select_path();
assert_eq!(chosen, 1, "should prefer path with lower RTT");
}
#[test]
fn test_stream_sender_weighted_round_robin() {
let cfg = MultipathStreamConfig::new(
vec![
MultipathEndpoint::new("10.0.0.1:5004", 0.8),
MultipathEndpoint::new("10.0.0.2:5004", 0.2),
],
MultipathScheduler::WeightedRoundRobin,
RedundancyMode::None,
);
let mut s = MultipathStreamSender::new(cfg);
let chosen = s.select_path();
assert_eq!(chosen, 0);
}
#[test]
fn test_stream_sender_redundant_returns_index() {
let cfg = make_stream_config(MultipathScheduler::Redundant);
let mut s = MultipathStreamSender::new(cfg);
let idx = s.select_path();
assert!(idx < 2);
}
#[test]
fn test_stream_sender_active_path_indices() {
let cfg = make_stream_config(MultipathScheduler::Redundant);
let s = MultipathStreamSender::new(cfg);
let indices = s.active_path_indices();
assert_eq!(indices, vec![0, 1]);
}
#[test]
fn test_stream_sender_deactivate_path() {
let cfg = make_stream_config(MultipathScheduler::RoundRobin);
let mut s = MultipathStreamSender::new(cfg);
s.set_path_active(0, false);
assert_eq!(s.active_path_count(), 1);
let idx = s.select_path();
assert_eq!(idx, 1); }
#[test]
fn test_stream_sender_update_rtt() {
let cfg = make_stream_config(MultipathScheduler::RoundRobin);
let mut s = MultipathStreamSender::new(cfg);
s.update_path_rtt(0, 80.0);
assert!(s.stats().path_stats[0].rtt_ms > 0.0);
}
#[test]
fn test_stream_sender_high_loss_deactivates_path() {
let cfg = make_stream_config(MultipathScheduler::RoundRobin);
let mut s = MultipathStreamSender::new(cfg);
for _ in 0..40 {
s.update_path_loss(0, 1.0);
}
assert!(!s.stats().path_stats[0].active);
}
#[test]
fn test_stream_sender_record_bytes() {
let cfg = make_stream_config(MultipathScheduler::RoundRobin);
let mut s = MultipathStreamSender::new(cfg);
s.record_bytes_sent(0, 1000);
s.record_bytes_sent(1, 500);
assert_eq!(s.stats().path_stats[0].bytes_sent, 1000);
assert_eq!(s.stats().total_bytes_sent, 1500);
}
#[test]
fn test_path_stats_new_active() {
let ps = PathStats::new_active();
assert!(ps.active);
assert_eq!(ps.rtt_ms, 0.0);
assert_eq!(ps.packet_loss_rate, 0.0);
}
#[test]
fn test_endpoint_with_interface() {
let ep = MultipathEndpoint::new("10.0.0.1:5004", 1.0).with_interface("eth0");
assert_eq!(ep.interface.as_deref(), Some("eth0"));
}
}