1use std::collections::HashMap;
45use std::time::{Duration, Instant};
46use tracing::{debug, info, warn};
47
48#[derive(Debug, Clone)]
50pub struct PathInfo {
51 pub name: String,
53 pub local_addr: String,
55 pub bandwidth_mbps: u32,
57 pub latency_ms: u32,
59 pub active: bool,
61 pub packets_sent: u64,
63 pub packets_lost: u64,
65 pub srtt: Option<Duration>,
67 pub last_used: Option<Instant>,
69}
70
71impl PathInfo {
72 pub fn new(name: &str, local_addr: &str, bandwidth_mbps: u32, latency_ms: u32) -> Self {
74 PathInfo {
75 name: name.to_string(),
76 local_addr: local_addr.to_string(),
77 bandwidth_mbps,
78 latency_ms,
79 active: true,
80 packets_sent: 0,
81 packets_lost: 0,
82 srtt: None,
83 last_used: None,
84 }
85 }
86
87 pub fn score(&self) -> f64 {
90 if !self.active || self.latency_ms == 0 {
91 return 0.0;
92 }
93 let base = self.bandwidth_mbps as f64 / self.latency_ms as f64;
94 let loss_penalty = 1.0 - self.loss_rate();
95 base * loss_penalty
96 }
97
98 pub fn loss_rate(&self) -> f64 {
100 if self.packets_sent == 0 {
101 return 0.0;
102 }
103 self.packets_lost as f64 / self.packets_sent as f64
104 }
105
106 pub fn update_srtt(&mut self, rtt: Duration) {
108 self.srtt = Some(match self.srtt {
109 None => rtt,
110 Some(srtt) => {
111 let srtt_ns = srtt.as_nanos() as u64;
112 let rtt_ns = rtt.as_nanos() as u64;
113 Duration::from_nanos(srtt_ns / 8 * 7 + rtt_ns / 8)
114 }
115 });
116 }
117
118 pub fn record_sent(&mut self) {
120 self.packets_sent += 1;
121 self.last_used = Some(Instant::now());
122 }
123
124 pub fn record_loss(&mut self) {
126 self.packets_lost += 1;
127 }
128}
129
130#[derive(Debug, Clone, PartialEq)]
132pub enum SchedulingPolicy {
133 BestPath,
135 RoundRobin,
137 WeightedRoundRobin,
139 Redundant,
142 LowestLatency,
144}
145
146pub struct MultipathSender {
148 paths: Vec<PathInfo>,
149 policy: SchedulingPolicy,
150 rr_index: usize,
152 rr_weights: Vec<u32>,
154 total_scheduled: u64,
156}
157
158impl MultipathSender {
159 pub fn new(paths: Vec<PathInfo>, policy: SchedulingPolicy) -> Self {
164 assert!(!paths.is_empty(), "MultipathSender requires at least one path");
165 let rr_weights = paths.iter().map(|_| 0).collect();
166 info!(
167 paths = paths.len(),
168 policy = ?policy,
169 "MultipathSender created"
170 );
171 MultipathSender {
172 paths,
173 policy,
174 rr_index: 0,
175 rr_weights,
176 total_scheduled: 0,
177 }
178 }
179
180 pub fn select_path_index(&mut self, data_len: usize) -> Option<usize> {
184 let active: Vec<usize> = self.paths.iter()
185 .enumerate()
186 .filter(|(_, p)| p.active)
187 .map(|(i, _)| i)
188 .collect();
189
190 if active.is_empty() {
191 warn!("No active paths available");
192 return None;
193 }
194
195 let idx = match &self.policy {
196 SchedulingPolicy::BestPath => {
197 active.iter()
198 .max_by(|&&a, &&b| {
199 self.paths[a].score()
200 .partial_cmp(&self.paths[b].score())
201 .unwrap()
202 })
203 .copied()
204 }
205 SchedulingPolicy::RoundRobin => {
206 let pos = self.rr_index % active.len();
207 self.rr_index += 1;
208 Some(active[pos])
209 }
210 SchedulingPolicy::WeightedRoundRobin => {
211 active.iter()
213 .min_by(|&&a, &&b| {
214 let ra = self.paths[a].packets_sent as f64
215 / self.paths[a].bandwidth_mbps.max(1) as f64;
216 let rb = self.paths[b].packets_sent as f64
217 / self.paths[b].bandwidth_mbps.max(1) as f64;
218 ra.partial_cmp(&rb).unwrap()
219 })
220 .copied()
221 }
222 SchedulingPolicy::Redundant => {
223 Some(active[0])
225 }
226 SchedulingPolicy::LowestLatency => {
227 active.iter()
228 .min_by_key(|&&i| self.paths[i].latency_ms)
229 .copied()
230 }
231 };
232
233 if let Some(i) = idx {
234 self.paths[i].record_sent();
235 self.total_scheduled += 1;
236 debug!(
237 path = %self.paths[i].name,
238 data_len,
239 policy = ?self.policy,
240 "Path selected"
241 );
242 }
243 idx
244 }
245
246 pub fn select_all_paths(&mut self) -> Vec<usize> {
250 let active: Vec<usize> = self.paths.iter()
251 .enumerate()
252 .filter(|(_, p)| p.active)
253 .map(|(i, _)| i)
254 .collect();
255
256 for &i in &active {
257 self.paths[i].record_sent();
258 }
259 self.total_scheduled += 1;
260 active
261 }
262
263 pub fn path(&self, index: usize) -> Option<&PathInfo> {
265 self.paths.get(index)
266 }
267
268 pub fn path_mut(&mut self, index: usize) -> Option<&mut PathInfo> {
270 self.paths.get_mut(index)
271 }
272
273 pub fn paths(&self) -> &[PathInfo] {
275 &self.paths
276 }
277
278 pub fn active_path_count(&self) -> usize {
280 self.paths.iter().filter(|p| p.active).count()
281 }
282
283 pub fn deactivate_path(&mut self, index: usize) {
285 if let Some(path) = self.paths.get_mut(index) {
286 warn!(name = %path.name, "Path deactivated");
287 path.active = false;
288 }
289 }
290
291 pub fn activate_path(&mut self, index: usize) {
293 if let Some(path) = self.paths.get_mut(index) {
294 info!(name = %path.name, "Path reactivated");
295 path.active = true;
296 }
297 }
298
299 pub fn record_loss(&mut self, index: usize) {
301 if let Some(path) = self.paths.get_mut(index) {
302 path.record_loss();
303 }
304 }
305
306 pub fn update_rtt(&mut self, index: usize, rtt: Duration) {
308 if let Some(path) = self.paths.get_mut(index) {
309 path.update_srtt(rtt);
310 }
311 }
312
313 pub fn total_scheduled(&self) -> u64 {
315 self.total_scheduled
316 }
317
318 pub fn set_policy(&mut self, policy: SchedulingPolicy) {
320 info!(policy = ?policy, "Scheduling policy changed");
321 self.policy = policy;
322 }
323
324 pub fn policy(&self) -> &SchedulingPolicy {
326 &self.policy
327 }
328}
329
330pub struct MultipathReceiver {
335 pending: HashMap<u64, (String, Vec<u8>)>,
337 next_seq: u64,
339 max_buffer: usize,
341 total_received: u64,
343 total_delivered: u64,
345 total_duplicates: u64,
347}
348
349impl MultipathReceiver {
350 pub fn new() -> Self {
352 MultipathReceiver {
353 pending: HashMap::new(),
354 next_seq: 0,
355 max_buffer: 256,
356 total_received: 0,
357 total_delivered: 0,
358 total_duplicates: 0,
359 }
360 }
361
362 pub fn with_buffer_size(max_buffer: usize) -> Self {
364 MultipathReceiver {
365 pending: HashMap::new(),
366 next_seq: 0,
367 max_buffer,
368 total_received: 0,
369 total_delivered: 0,
370 total_duplicates: 0,
371 }
372 }
373
374 pub fn add(&mut self, seq: u64, path_id: &str, data: Vec<u8>) -> Option<(String, Vec<u8>)> {
381 self.total_received += 1;
382
383 if seq < self.next_seq {
385 warn!(seq, path = %path_id, "Duplicate/old multipath packet dropped");
386 self.total_duplicates += 1;
387 return None;
388 }
389
390 if seq == self.next_seq {
392 self.next_seq += 1;
393 self.total_delivered += 1;
394 debug!(seq, path = %path_id, "In-order multipath packet delivered");
395 return Some((path_id.to_string(), data));
396 }
397
398 if self.pending.len() >= self.max_buffer {
400 warn!(
401 seq,
402 pending = self.pending.len(),
403 max = self.max_buffer,
404 "Reorder buffer full, dropping packet"
405 );
406 return None;
407 }
408
409 if self.pending.contains_key(&seq) {
411 warn!(seq, "Duplicate multipath packet in pending buffer");
412 self.total_duplicates += 1;
413 return None;
414 }
415
416 debug!(seq, path = %path_id, pending = self.pending.len(), "Buffering out-of-order packet");
417 self.pending.insert(seq, (path_id.to_string(), data));
418 None
419 }
420
421 pub fn drain_ordered(&mut self) -> Vec<(u64, String, Vec<u8>)> {
426 let mut result = Vec::new();
427 while let Some((path_id, data)) = self.pending.remove(&self.next_seq) {
428 debug!(seq = self.next_seq, path = %path_id, "Drained buffered packet");
429 result.push((self.next_seq, path_id, data));
430 self.next_seq += 1;
431 self.total_delivered += 1;
432 }
433 result
434 }
435
436 pub fn next_seq(&self) -> u64 {
438 self.next_seq
439 }
440
441 pub fn pending_count(&self) -> usize {
443 self.pending.len()
444 }
445
446 pub fn clear(&mut self) {
448 let dropped = self.pending.len();
449 if dropped > 0 {
450 warn!(dropped, "Multipath receiver buffer cleared");
451 }
452 self.pending.clear();
453 }
454
455 pub fn total_received(&self) -> u64 {
457 self.total_received
458 }
459
460 pub fn total_delivered(&self) -> u64 {
462 self.total_delivered
463 }
464
465 pub fn total_duplicates(&self) -> u64 {
467 self.total_duplicates
468 }
469}
470
471impl Default for MultipathReceiver {
472 fn default() -> Self {
473 Self::new()
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480
481 fn two_paths() -> Vec<PathInfo> {
482 vec![
483 PathInfo::new("wifi", "192.168.1.100", 100, 10),
484 PathInfo::new("lte", "10.0.0.50", 50, 30),
485 ]
486 }
487
488 fn three_paths() -> Vec<PathInfo> {
489 vec![
490 PathInfo::new("wifi", "192.168.1.100", 100, 10),
491 PathInfo::new("lte", "10.0.0.50", 50, 30),
492 PathInfo::new("ethernet", "172.16.0.1", 200, 5),
493 ]
494 }
495
496 #[test]
499 fn test_path_info_new() {
500 let p = PathInfo::new("wifi", "192.168.1.1", 100, 10);
501 assert_eq!(p.name, "wifi");
502 assert_eq!(p.bandwidth_mbps, 100);
503 assert_eq!(p.latency_ms, 10);
504 assert!(p.active);
505 assert_eq!(p.loss_rate(), 0.0);
506 }
507
508 #[test]
509 fn test_path_score() {
510 let p = PathInfo::new("fast", "1.1.1.1", 100, 10);
511 let slow = PathInfo::new("slow", "2.2.2.2", 10, 100);
512 assert!(p.score() > slow.score());
513 }
514
515 #[test]
516 fn test_path_score_inactive() {
517 let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
518 p.active = false;
519 assert_eq!(p.score(), 0.0);
520 }
521
522 #[test]
523 fn test_path_loss_rate() {
524 let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
525 p.packets_sent = 100;
526 p.packets_lost = 10;
527 assert!((p.loss_rate() - 0.1).abs() < f64::EPSILON);
528 }
529
530 #[test]
531 fn test_path_update_srtt() {
532 let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
533 assert!(p.srtt.is_none());
534 p.update_srtt(Duration::from_millis(20));
535 assert!(p.srtt.is_some());
536 p.update_srtt(Duration::from_millis(10));
537 assert!(p.srtt.unwrap() < Duration::from_millis(20));
538 }
539
540 #[test]
541 fn test_path_record_sent() {
542 let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
543 p.record_sent();
544 assert_eq!(p.packets_sent, 1);
545 assert!(p.last_used.is_some());
546 }
547
548 #[test]
551 fn test_sender_best_path() {
552 let mut s = MultipathSender::new(three_paths(), SchedulingPolicy::BestPath);
553 let idx = s.select_path_index(100).unwrap();
555 assert_eq!(s.paths()[idx].name, "ethernet");
556 }
557
558 #[test]
559 fn test_sender_lowest_latency() {
560 let mut s = MultipathSender::new(three_paths(), SchedulingPolicy::LowestLatency);
561 let idx = s.select_path_index(100).unwrap();
562 assert_eq!(s.paths()[idx].name, "ethernet"); }
564
565 #[test]
566 fn test_sender_round_robin() {
567 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::RoundRobin);
568 let i0 = s.select_path_index(100).unwrap();
569 let i1 = s.select_path_index(100).unwrap();
570 let i2 = s.select_path_index(100).unwrap();
571 assert_ne!(i0, i1);
573 assert_eq!(i0, i2);
574 }
575
576 #[test]
577 fn test_sender_weighted_round_robin() {
578 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::WeightedRoundRobin);
579 let mut wifi_count = 0;
581 let mut lte_count = 0;
582 for _ in 0..20 {
583 let idx = s.select_path_index(100).unwrap();
584 if s.paths()[idx].name == "wifi" {
585 wifi_count += 1;
586 } else {
587 lte_count += 1;
588 }
589 }
590 assert!(wifi_count > lte_count);
591 }
592
593 #[test]
594 fn test_sender_redundant_all_paths() {
595 let mut s = MultipathSender::new(three_paths(), SchedulingPolicy::Redundant);
596 let indices = s.select_all_paths();
597 assert_eq!(indices.len(), 3);
598 }
599
600 #[test]
601 fn test_sender_deactivate_path() {
602 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
603 assert_eq!(s.active_path_count(), 2);
604 s.deactivate_path(0);
605 assert_eq!(s.active_path_count(), 1);
606 let idx = s.select_path_index(100).unwrap();
607 assert_eq!(s.paths()[idx].name, "lte");
608 }
609
610 #[test]
611 fn test_sender_activate_path() {
612 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
613 s.deactivate_path(0);
614 assert_eq!(s.active_path_count(), 1);
615 s.activate_path(0);
616 assert_eq!(s.active_path_count(), 2);
617 }
618
619 #[test]
620 fn test_sender_no_active_paths() {
621 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
622 s.deactivate_path(0);
623 s.deactivate_path(1);
624 assert!(s.select_path_index(100).is_none());
625 }
626
627 #[test]
628 fn test_sender_record_loss() {
629 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
630 s.select_path_index(100);
631 s.record_loss(0);
632 assert_eq!(s.paths()[0].packets_lost, 1);
633 }
634
635 #[test]
636 fn test_sender_update_rtt() {
637 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
638 s.update_rtt(0, Duration::from_millis(15));
639 assert!(s.paths()[0].srtt.is_some());
640 }
641
642 #[test]
643 fn test_sender_set_policy() {
644 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
645 assert_eq!(s.policy(), &SchedulingPolicy::BestPath);
646 s.set_policy(SchedulingPolicy::RoundRobin);
647 assert_eq!(s.policy(), &SchedulingPolicy::RoundRobin);
648 }
649
650 #[test]
651 fn test_sender_total_scheduled() {
652 let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::RoundRobin);
653 s.select_path_index(100);
654 s.select_path_index(100);
655 s.select_path_index(100);
656 assert_eq!(s.total_scheduled(), 3);
657 }
658
659 #[test]
662 fn test_receiver_in_order() {
663 let mut r = MultipathReceiver::new();
664 let result = r.add(0, "wifi", b"hello".to_vec());
665 assert!(result.is_some());
666 let (path, data) = result.unwrap();
667 assert_eq!(path, "wifi");
668 assert_eq!(data, b"hello");
669 assert_eq!(r.next_seq(), 1);
670 }
671
672 #[test]
673 fn test_receiver_out_of_order() {
674 let mut r = MultipathReceiver::new();
675 let r1 = r.add(1, "lte", b"second".to_vec());
677 assert!(r1.is_none()); assert_eq!(r.pending_count(), 1);
679
680 let r0 = r.add(0, "wifi", b"first".to_vec());
681 assert!(r0.is_some()); let (_, data) = r0.unwrap();
683 assert_eq!(data, b"first");
684
685 let drained = r.drain_ordered();
687 assert_eq!(drained.len(), 1);
688 assert_eq!(drained[0].0, 1);
689 assert_eq!(drained[0].2, b"second");
690 assert_eq!(r.next_seq(), 2);
691 assert_eq!(r.pending_count(), 0);
692 }
693
694 #[test]
695 fn test_receiver_duplicate() {
696 let mut r = MultipathReceiver::new();
697 r.add(0, "wifi", b"first".to_vec());
698 let dup = r.add(0, "lte", b"first".to_vec());
700 assert!(dup.is_none());
701 assert_eq!(r.total_duplicates(), 1);
702 }
703
704 #[test]
705 fn test_receiver_drain_multiple() {
706 let mut r = MultipathReceiver::new();
707 r.add(3, "wifi", b"d".to_vec());
708 r.add(2, "lte", b"c".to_vec());
709 r.add(1, "wifi", b"b".to_vec());
710 r.add(0, "lte", b"a".to_vec());
711
712 let drained = r.drain_ordered();
714 assert_eq!(drained.len(), 3); assert_eq!(r.next_seq(), 4);
716 assert_eq!(r.pending_count(), 0);
717 }
718
719 #[test]
720 fn test_receiver_buffer_full() {
721 let mut r = MultipathReceiver::with_buffer_size(2);
722 r.add(1, "wifi", b"b".to_vec());
723 r.add(2, "wifi", b"c".to_vec());
724 let result = r.add(3, "wifi", b"d".to_vec());
726 assert!(result.is_none());
727 assert_eq!(r.pending_count(), 2); }
729
730 #[test]
731 fn test_receiver_clear() {
732 let mut r = MultipathReceiver::new();
733 r.add(1, "wifi", b"b".to_vec());
734 r.add(2, "wifi", b"c".to_vec());
735 assert_eq!(r.pending_count(), 2);
736 r.clear();
737 assert_eq!(r.pending_count(), 0);
738 }
739
740 #[test]
741 fn test_receiver_stats() {
742 let mut r = MultipathReceiver::new();
743 r.add(0, "wifi", b"a".to_vec());
744 r.add(1, "lte", b"b".to_vec());
745 r.add(0, "eth", b"a".to_vec()); assert_eq!(r.total_received(), 3);
747 assert_eq!(r.total_delivered(), 2);
748 assert_eq!(r.total_duplicates(), 1);
749 }
750
751 #[test]
752 fn test_receiver_default() {
753 let r = MultipathReceiver::default();
754 assert_eq!(r.next_seq(), 0);
755 assert_eq!(r.pending_count(), 0);
756 }
757}