Skip to main content

vcl_protocol/
multipath.rs

1//! # VCL Multipath
2//!
3//! [`MultipathSender`] splits traffic across multiple network interfaces
4//! simultaneously (e.g. WiFi + LTE) and [`MultipathReceiver`] reassembles
5//! packets on the other side.
6//!
7//! ## How it works
8//!
9//! ```text
10//! Application → MultipathSender
11//!                   │
12//!         ┌─────────┼─────────┐
13//!         ↓         ↓         ↓
14//!     Interface0  Interface1  Interface2
15//!     (WiFi)      (LTE)       (Ethernet)
16//!         ↓         ↓         ↓
17//!         └─────────┼─────────┘
18//!                   │
19//!             MultipathReceiver
20//!                   │
21//!             Reordering buffer
22//!                   │
23//!             Application
24//! ```
25//!
26//! ## Example
27//!
28//! ```rust
29//! use vcl_protocol::multipath::{MultipathSender, MultipathReceiver, PathInfo, SchedulingPolicy};
30//!
31//! let paths = vec![
32//!     PathInfo::new("wifi",     "192.168.1.100", 100, 10),
33//!     PathInfo::new("lte",      "10.0.0.50",     50,  30),
34//!     PathInfo::new("ethernet", "172.16.0.1",    200, 5),
35//! ];
36//!
37//! let sender = MultipathSender::new(paths, SchedulingPolicy::WeightedRoundRobin);
38//! let mut receiver = MultipathReceiver::new();
39//!
40//! // sender.select_path(&data) → PathInfo to send on
41//! // receiver.add(seq, path_id, data) → Some(data) when reordered
42//! ```
43
44use std::collections::HashMap;
45use std::time::{Duration, Instant};
46use tracing::{debug, info, warn};
47
48/// Information about a single network path (interface).
49#[derive(Debug, Clone)]
50pub struct PathInfo {
51    /// Human-readable name (e.g. "wifi", "lte", "eth0").
52    pub name: String,
53    /// Local IP address bound to this interface.
54    pub local_addr: String,
55    /// Estimated bandwidth in Mbps.
56    pub bandwidth_mbps: u32,
57    /// Estimated latency in milliseconds.
58    pub latency_ms: u32,
59    /// Whether this path is currently active.
60    pub active: bool,
61    /// Number of packets sent on this path.
62    pub packets_sent: u64,
63    /// Number of packets lost on this path (estimated).
64    pub packets_lost: u64,
65    /// Smoothed RTT for this path.
66    pub srtt: Option<Duration>,
67    /// When this path was last used.
68    pub last_used: Option<Instant>,
69}
70
71impl PathInfo {
72    /// Create a new path with name, local address, bandwidth, and latency.
73    pub fn new(name: &str, local_addr: &str, bandwidth_mbps: u32, latency_ms: u32) -> Self {
74        PathInfo {
75            name: name.to_string(),
76            local_addr: local_addr.to_string(),
77            bandwidth_mbps,
78            latency_ms,
79            active: true,
80            packets_sent: 0,
81            packets_lost: 0,
82            srtt: None,
83            last_used: None,
84        }
85    }
86
87    /// Compute a score for path selection — higher is better.
88    /// Score = bandwidth / latency, penalized by loss rate.
89    pub fn score(&self) -> f64 {
90        if !self.active || self.latency_ms == 0 {
91            return 0.0;
92        }
93        let base = self.bandwidth_mbps as f64 / self.latency_ms as f64;
94        let loss_penalty = 1.0 - self.loss_rate();
95        base * loss_penalty
96    }
97
98    /// Packet loss rate for this path (0.0 – 1.0).
99    pub fn loss_rate(&self) -> f64 {
100        if self.packets_sent == 0 {
101            return 0.0;
102        }
103        self.packets_lost as f64 / self.packets_sent as f64
104    }
105
106    /// Update SRTT for this path using RFC 6298 smoothing.
107    pub fn update_srtt(&mut self, rtt: Duration) {
108        self.srtt = Some(match self.srtt {
109            None => rtt,
110            Some(srtt) => {
111                let srtt_ns = srtt.as_nanos() as u64;
112                let rtt_ns = rtt.as_nanos() as u64;
113                Duration::from_nanos(srtt_ns / 8 * 7 + rtt_ns / 8)
114            }
115        });
116    }
117
118    /// Mark this path as having sent a packet.
119    pub fn record_sent(&mut self) {
120        self.packets_sent += 1;
121        self.last_used = Some(Instant::now());
122    }
123
124    /// Mark a packet as lost on this path.
125    pub fn record_loss(&mut self) {
126        self.packets_lost += 1;
127    }
128}
129
130/// Strategy for selecting which path to send a packet on.
131#[derive(Debug, Clone, PartialEq)]
132pub enum SchedulingPolicy {
133    /// Always use the path with the highest score (bandwidth/latency).
134    BestPath,
135    /// Round-robin across all active paths.
136    RoundRobin,
137    /// Weighted round-robin — paths with higher bandwidth get more packets.
138    WeightedRoundRobin,
139    /// Redundant — send every packet on ALL active paths simultaneously.
140    /// Highest reliability, highest bandwidth usage.
141    Redundant,
142    /// Lowest latency — always pick the path with smallest latency_ms.
143    LowestLatency,
144}
145
146/// Sends packets across multiple paths according to a [`SchedulingPolicy`].
147pub struct MultipathSender {
148    paths: Vec<PathInfo>,
149    policy: SchedulingPolicy,
150    /// Current index for round-robin.
151    rr_index: usize,
152    /// Weighted round-robin counters.
153    rr_weights: Vec<u32>,
154    /// Total packets scheduled.
155    total_scheduled: u64,
156}
157
158impl MultipathSender {
159    /// Create a new sender with the given paths and scheduling policy.
160    ///
161    /// # Panics
162    /// Panics if `paths` is empty.
163    pub fn new(paths: Vec<PathInfo>, policy: SchedulingPolicy) -> Self {
164        assert!(!paths.is_empty(), "MultipathSender requires at least one path");
165        let rr_weights = paths.iter().map(|_| 0).collect();
166        info!(
167            paths = paths.len(),
168            policy = ?policy,
169            "MultipathSender created"
170        );
171        MultipathSender {
172            paths,
173            policy,
174            rr_index: 0,
175            rr_weights,
176            total_scheduled: 0,
177        }
178    }
179
180    /// Select the best path index for the next packet based on the policy.
181    ///
182    /// Returns `None` if no active paths are available.
183    pub fn select_path_index(&mut self, data_len: usize) -> Option<usize> {
184        let active: Vec<usize> = self.paths.iter()
185            .enumerate()
186            .filter(|(_, p)| p.active)
187            .map(|(i, _)| i)
188            .collect();
189
190        if active.is_empty() {
191            warn!("No active paths available");
192            return None;
193        }
194
195        let idx = match &self.policy {
196            SchedulingPolicy::BestPath => {
197                active.iter()
198                    .max_by(|&&a, &&b| {
199                        self.paths[a].score()
200                            .partial_cmp(&self.paths[b].score())
201                            .unwrap()
202                    })
203                    .copied()
204            }
205            SchedulingPolicy::RoundRobin => {
206                let pos = self.rr_index % active.len();
207                self.rr_index += 1;
208                Some(active[pos])
209            }
210            SchedulingPolicy::WeightedRoundRobin => {
211                // Pick path with lowest sent/bandwidth ratio
212                active.iter()
213                    .min_by(|&&a, &&b| {
214                        let ra = self.paths[a].packets_sent as f64
215                            / self.paths[a].bandwidth_mbps.max(1) as f64;
216                        let rb = self.paths[b].packets_sent as f64
217                            / self.paths[b].bandwidth_mbps.max(1) as f64;
218                        ra.partial_cmp(&rb).unwrap()
219                    })
220                    .copied()
221            }
222            SchedulingPolicy::Redundant => {
223                // Return first — caller should use select_all_paths for redundant
224                Some(active[0])
225            }
226            SchedulingPolicy::LowestLatency => {
227                active.iter()
228                    .min_by_key(|&&i| self.paths[i].latency_ms)
229                    .copied()
230            }
231        };
232
233        if let Some(i) = idx {
234            self.paths[i].record_sent();
235            self.total_scheduled += 1;
236            debug!(
237                path = %self.paths[i].name,
238                data_len,
239                policy = ?self.policy,
240                "Path selected"
241            );
242        }
243        idx
244    }
245
246    /// For `Redundant` policy — returns ALL active path indices.
247    ///
248    /// Each path should receive a copy of the packet.
249    pub fn select_all_paths(&mut self) -> Vec<usize> {
250        let active: Vec<usize> = self.paths.iter()
251            .enumerate()
252            .filter(|(_, p)| p.active)
253            .map(|(i, _)| i)
254            .collect();
255
256        for &i in &active {
257            self.paths[i].record_sent();
258        }
259        self.total_scheduled += 1;
260        active
261    }
262
263    /// Get an immutable reference to a path by index.
264    pub fn path(&self, index: usize) -> Option<&PathInfo> {
265        self.paths.get(index)
266    }
267
268    /// Get a mutable reference to a path by index.
269    pub fn path_mut(&mut self, index: usize) -> Option<&mut PathInfo> {
270        self.paths.get_mut(index)
271    }
272
273    /// Returns all paths.
274    pub fn paths(&self) -> &[PathInfo] {
275        &self.paths
276    }
277
278    /// Returns the number of active paths.
279    pub fn active_path_count(&self) -> usize {
280        self.paths.iter().filter(|p| p.active).count()
281    }
282
283    /// Mark a path as inactive (e.g. interface went down).
284    pub fn deactivate_path(&mut self, index: usize) {
285        if let Some(path) = self.paths.get_mut(index) {
286            warn!(name = %path.name, "Path deactivated");
287            path.active = false;
288        }
289    }
290
291    /// Mark a path as active again (e.g. interface came back up).
292    pub fn activate_path(&mut self, index: usize) {
293        if let Some(path) = self.paths.get_mut(index) {
294            info!(name = %path.name, "Path reactivated");
295            path.active = true;
296        }
297    }
298
299    /// Record a packet loss on a specific path.
300    pub fn record_loss(&mut self, index: usize) {
301        if let Some(path) = self.paths.get_mut(index) {
302            path.record_loss();
303        }
304    }
305
306    /// Update RTT estimate for a specific path.
307    pub fn update_rtt(&mut self, index: usize, rtt: Duration) {
308        if let Some(path) = self.paths.get_mut(index) {
309            path.update_srtt(rtt);
310        }
311    }
312
313    /// Total packets scheduled across all paths.
314    pub fn total_scheduled(&self) -> u64 {
315        self.total_scheduled
316    }
317
318    /// Change the scheduling policy at runtime.
319    pub fn set_policy(&mut self, policy: SchedulingPolicy) {
320        info!(policy = ?policy, "Scheduling policy changed");
321        self.policy = policy;
322    }
323
324    /// Returns the current scheduling policy.
325    pub fn policy(&self) -> &SchedulingPolicy {
326        &self.policy
327    }
328}
329
330/// A reordering buffer for packets received on multiple paths.
331///
332/// Packets may arrive out of order when using multipath — this buffer
333/// holds them and releases them in sequence order.
334pub struct MultipathReceiver {
335    /// Pending out-of-order packets: seq → (path_id, data).
336    pending: HashMap<u64, (String, Vec<u8>)>,
337    /// Next expected sequence number.
338    next_seq: u64,
339    /// Maximum number of out-of-order packets to buffer.
340    max_buffer: usize,
341    /// Total packets received.
342    total_received: u64,
343    /// Total packets delivered in order.
344    total_delivered: u64,
345    /// Total duplicate packets dropped.
346    total_duplicates: u64,
347}
348
349impl MultipathReceiver {
350    /// Create a new receiver with default buffer size (256).
351    pub fn new() -> Self {
352        MultipathReceiver {
353            pending: HashMap::new(),
354            next_seq: 0,
355            max_buffer: 256,
356            total_received: 0,
357            total_delivered: 0,
358            total_duplicates: 0,
359        }
360    }
361
362    /// Create a receiver with a custom reorder buffer size.
363    pub fn with_buffer_size(max_buffer: usize) -> Self {
364        MultipathReceiver {
365            pending: HashMap::new(),
366            next_seq: 0,
367            max_buffer,
368            total_received: 0,
369            total_delivered: 0,
370            total_duplicates: 0,
371        }
372    }
373
374    /// Add a received packet.
375    ///
376    /// Returns `Some((path_id, data))` if this completes an in-order sequence,
377    /// or `None` if the packet is buffered waiting for earlier packets.
378    ///
379    /// Duplicate packets (same seq already seen) are silently dropped.
380    pub fn add(&mut self, seq: u64, path_id: &str, data: Vec<u8>) -> Option<(String, Vec<u8>)> {
381        self.total_received += 1;
382
383        // Already delivered
384        if seq < self.next_seq {
385            warn!(seq, path = %path_id, "Duplicate/old multipath packet dropped");
386            self.total_duplicates += 1;
387            return None;
388        }
389
390        // In-order delivery
391        if seq == self.next_seq {
392            self.next_seq += 1;
393            self.total_delivered += 1;
394            debug!(seq, path = %path_id, "In-order multipath packet delivered");
395            return Some((path_id.to_string(), data));
396        }
397
398        // Out of order — buffer it
399        if self.pending.len() >= self.max_buffer {
400            warn!(
401                seq,
402                pending = self.pending.len(),
403                max = self.max_buffer,
404                "Reorder buffer full, dropping packet"
405            );
406            return None;
407        }
408
409        // Check for duplicate in pending
410        if self.pending.contains_key(&seq) {
411            warn!(seq, "Duplicate multipath packet in pending buffer");
412            self.total_duplicates += 1;
413            return None;
414        }
415
416        debug!(seq, path = %path_id, pending = self.pending.len(), "Buffering out-of-order packet");
417        self.pending.insert(seq, (path_id.to_string(), data));
418        None
419    }
420
421    /// Try to drain buffered packets that are now in order.
422    ///
423    /// Call after `add()` returns `Some(...)` to flush any buffered packets.
424    /// Returns packets in sequence order.
425    pub fn drain_ordered(&mut self) -> Vec<(u64, String, Vec<u8>)> {
426        let mut result = Vec::new();
427        while let Some((path_id, data)) = self.pending.remove(&self.next_seq) {
428            debug!(seq = self.next_seq, path = %path_id, "Drained buffered packet");
429            result.push((self.next_seq, path_id, data));
430            self.next_seq += 1;
431            self.total_delivered += 1;
432        }
433        result
434    }
435
436    /// Returns the next expected sequence number.
437    pub fn next_seq(&self) -> u64 {
438        self.next_seq
439    }
440
441    /// Returns the number of packets currently buffered.
442    pub fn pending_count(&self) -> usize {
443        self.pending.len()
444    }
445
446    /// Clear all buffered packets.
447    pub fn clear(&mut self) {
448        let dropped = self.pending.len();
449        if dropped > 0 {
450            warn!(dropped, "Multipath receiver buffer cleared");
451        }
452        self.pending.clear();
453    }
454
455    /// Total packets received (including duplicates and out-of-order).
456    pub fn total_received(&self) -> u64 {
457        self.total_received
458    }
459
460    /// Total packets delivered in order.
461    pub fn total_delivered(&self) -> u64 {
462        self.total_delivered
463    }
464
465    /// Total duplicate packets dropped.
466    pub fn total_duplicates(&self) -> u64 {
467        self.total_duplicates
468    }
469}
470
471impl Default for MultipathReceiver {
472    fn default() -> Self {
473        Self::new()
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480
481    fn two_paths() -> Vec<PathInfo> {
482        vec![
483            PathInfo::new("wifi",     "192.168.1.100", 100, 10),
484            PathInfo::new("lte",      "10.0.0.50",     50,  30),
485        ]
486    }
487
488    fn three_paths() -> Vec<PathInfo> {
489        vec![
490            PathInfo::new("wifi",     "192.168.1.100", 100, 10),
491            PathInfo::new("lte",      "10.0.0.50",     50,  30),
492            PathInfo::new("ethernet", "172.16.0.1",    200, 5),
493        ]
494    }
495
496    // ─── PathInfo tests ───────────────────────────────────────────────────────
497
498    #[test]
499    fn test_path_info_new() {
500        let p = PathInfo::new("wifi", "192.168.1.1", 100, 10);
501        assert_eq!(p.name, "wifi");
502        assert_eq!(p.bandwidth_mbps, 100);
503        assert_eq!(p.latency_ms, 10);
504        assert!(p.active);
505        assert_eq!(p.loss_rate(), 0.0);
506    }
507
508    #[test]
509    fn test_path_score() {
510        let p = PathInfo::new("fast", "1.1.1.1", 100, 10);
511        let slow = PathInfo::new("slow", "2.2.2.2", 10, 100);
512        assert!(p.score() > slow.score());
513    }
514
515    #[test]
516    fn test_path_score_inactive() {
517        let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
518        p.active = false;
519        assert_eq!(p.score(), 0.0);
520    }
521
522    #[test]
523    fn test_path_loss_rate() {
524        let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
525        p.packets_sent = 100;
526        p.packets_lost = 10;
527        assert!((p.loss_rate() - 0.1).abs() < f64::EPSILON);
528    }
529
530    #[test]
531    fn test_path_update_srtt() {
532        let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
533        assert!(p.srtt.is_none());
534        p.update_srtt(Duration::from_millis(20));
535        assert!(p.srtt.is_some());
536        p.update_srtt(Duration::from_millis(10));
537        assert!(p.srtt.unwrap() < Duration::from_millis(20));
538    }
539
540    #[test]
541    fn test_path_record_sent() {
542        let mut p = PathInfo::new("wifi", "1.1.1.1", 100, 10);
543        p.record_sent();
544        assert_eq!(p.packets_sent, 1);
545        assert!(p.last_used.is_some());
546    }
547
548    // ─── MultipathSender tests ────────────────────────────────────────────────
549
550    #[test]
551    fn test_sender_best_path() {
552        let mut s = MultipathSender::new(three_paths(), SchedulingPolicy::BestPath);
553        // ethernet has highest score (200/5 = 40)
554        let idx = s.select_path_index(100).unwrap();
555        assert_eq!(s.paths()[idx].name, "ethernet");
556    }
557
558    #[test]
559    fn test_sender_lowest_latency() {
560        let mut s = MultipathSender::new(three_paths(), SchedulingPolicy::LowestLatency);
561        let idx = s.select_path_index(100).unwrap();
562        assert_eq!(s.paths()[idx].name, "ethernet"); // latency=5
563    }
564
565    #[test]
566    fn test_sender_round_robin() {
567        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::RoundRobin);
568        let i0 = s.select_path_index(100).unwrap();
569        let i1 = s.select_path_index(100).unwrap();
570        let i2 = s.select_path_index(100).unwrap();
571        // Should alternate
572        assert_ne!(i0, i1);
573        assert_eq!(i0, i2);
574    }
575
576    #[test]
577    fn test_sender_weighted_round_robin() {
578        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::WeightedRoundRobin);
579        // wifi has higher bandwidth so should be selected more often
580        let mut wifi_count = 0;
581        let mut lte_count = 0;
582        for _ in 0..20 {
583            let idx = s.select_path_index(100).unwrap();
584            if s.paths()[idx].name == "wifi" {
585                wifi_count += 1;
586            } else {
587                lte_count += 1;
588            }
589        }
590        assert!(wifi_count > lte_count);
591    }
592
593    #[test]
594    fn test_sender_redundant_all_paths() {
595        let mut s = MultipathSender::new(three_paths(), SchedulingPolicy::Redundant);
596        let indices = s.select_all_paths();
597        assert_eq!(indices.len(), 3);
598    }
599
600    #[test]
601    fn test_sender_deactivate_path() {
602        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
603        assert_eq!(s.active_path_count(), 2);
604        s.deactivate_path(0);
605        assert_eq!(s.active_path_count(), 1);
606        let idx = s.select_path_index(100).unwrap();
607        assert_eq!(s.paths()[idx].name, "lte");
608    }
609
610    #[test]
611    fn test_sender_activate_path() {
612        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
613        s.deactivate_path(0);
614        assert_eq!(s.active_path_count(), 1);
615        s.activate_path(0);
616        assert_eq!(s.active_path_count(), 2);
617    }
618
619    #[test]
620    fn test_sender_no_active_paths() {
621        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
622        s.deactivate_path(0);
623        s.deactivate_path(1);
624        assert!(s.select_path_index(100).is_none());
625    }
626
627    #[test]
628    fn test_sender_record_loss() {
629        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
630        s.select_path_index(100);
631        s.record_loss(0);
632        assert_eq!(s.paths()[0].packets_lost, 1);
633    }
634
635    #[test]
636    fn test_sender_update_rtt() {
637        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
638        s.update_rtt(0, Duration::from_millis(15));
639        assert!(s.paths()[0].srtt.is_some());
640    }
641
642    #[test]
643    fn test_sender_set_policy() {
644        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::BestPath);
645        assert_eq!(s.policy(), &SchedulingPolicy::BestPath);
646        s.set_policy(SchedulingPolicy::RoundRobin);
647        assert_eq!(s.policy(), &SchedulingPolicy::RoundRobin);
648    }
649
650    #[test]
651    fn test_sender_total_scheduled() {
652        let mut s = MultipathSender::new(two_paths(), SchedulingPolicy::RoundRobin);
653        s.select_path_index(100);
654        s.select_path_index(100);
655        s.select_path_index(100);
656        assert_eq!(s.total_scheduled(), 3);
657    }
658
659    // ─── MultipathReceiver tests ──────────────────────────────────────────────
660
661    #[test]
662    fn test_receiver_in_order() {
663        let mut r = MultipathReceiver::new();
664        let result = r.add(0, "wifi", b"hello".to_vec());
665        assert!(result.is_some());
666        let (path, data) = result.unwrap();
667        assert_eq!(path, "wifi");
668        assert_eq!(data, b"hello");
669        assert_eq!(r.next_seq(), 1);
670    }
671
672    #[test]
673    fn test_receiver_out_of_order() {
674        let mut r = MultipathReceiver::new();
675        // seq=1 arrives before seq=0
676        let r1 = r.add(1, "lte", b"second".to_vec());
677        assert!(r1.is_none()); // buffered
678        assert_eq!(r.pending_count(), 1);
679
680        let r0 = r.add(0, "wifi", b"first".to_vec());
681        assert!(r0.is_some()); // delivered
682        let (_, data) = r0.unwrap();
683        assert_eq!(data, b"first");
684
685        // Now drain seq=1
686        let drained = r.drain_ordered();
687        assert_eq!(drained.len(), 1);
688        assert_eq!(drained[0].0, 1);
689        assert_eq!(drained[0].2, b"second");
690        assert_eq!(r.next_seq(), 2);
691        assert_eq!(r.pending_count(), 0);
692    }
693
694    #[test]
695    fn test_receiver_duplicate() {
696        let mut r = MultipathReceiver::new();
697        r.add(0, "wifi", b"first".to_vec());
698        // Same seq again from different path (redundant mode)
699        let dup = r.add(0, "lte", b"first".to_vec());
700        assert!(dup.is_none());
701        assert_eq!(r.total_duplicates(), 1);
702    }
703
704    #[test]
705    fn test_receiver_drain_multiple() {
706        let mut r = MultipathReceiver::new();
707        r.add(3, "wifi", b"d".to_vec());
708        r.add(2, "lte",  b"c".to_vec());
709        r.add(1, "wifi", b"b".to_vec());
710        r.add(0, "lte",  b"a".to_vec());
711
712        // seq=0 triggers drain of 1,2,3
713        let drained = r.drain_ordered();
714        assert_eq!(drained.len(), 3); // 1, 2, 3
715        assert_eq!(r.next_seq(), 4);
716        assert_eq!(r.pending_count(), 0);
717    }
718
719    #[test]
720    fn test_receiver_buffer_full() {
721        let mut r = MultipathReceiver::with_buffer_size(2);
722        r.add(1, "wifi", b"b".to_vec());
723        r.add(2, "wifi", b"c".to_vec());
724        // Buffer full — seq=3 should be dropped
725        let result = r.add(3, "wifi", b"d".to_vec());
726        assert!(result.is_none());
727        assert_eq!(r.pending_count(), 2); // not 3
728    }
729
730    #[test]
731    fn test_receiver_clear() {
732        let mut r = MultipathReceiver::new();
733        r.add(1, "wifi", b"b".to_vec());
734        r.add(2, "wifi", b"c".to_vec());
735        assert_eq!(r.pending_count(), 2);
736        r.clear();
737        assert_eq!(r.pending_count(), 0);
738    }
739
740    #[test]
741    fn test_receiver_stats() {
742        let mut r = MultipathReceiver::new();
743        r.add(0, "wifi", b"a".to_vec());
744        r.add(1, "lte",  b"b".to_vec());
745        r.add(0, "eth",  b"a".to_vec()); // duplicate
746        assert_eq!(r.total_received(), 3);
747        assert_eq!(r.total_delivered(), 2);
748        assert_eq!(r.total_duplicates(), 1);
749    }
750
751    #[test]
752    fn test_receiver_default() {
753        let r = MultipathReceiver::default();
754        assert_eq!(r.next_seq(), 0);
755        assert_eq!(r.pending_count(), 0);
756    }
757}