Skip to main content

udp_relay_core/
lib.rs

1//! Lock-free, cache-line-aligned core types for building high-performance
2//! UDP relay and tunnel servers.
3//!
4//! # Overview
5//!
6//! This crate provides the shared-state primitives you need when forwarding
7//! UDP packets between many clients at high throughput:
8//!
9//! * [`TunnelClient`] — per-client atomic metrics (bandwidth, latency,
10//!   packet loss, priority scoring) designed for `Arc<TunnelClient>` in a
11//!   [`DashMap`].
12//! * [`QualityAnalyzer`] — lightweight packet loss tracker with automatic
13//!   counter halving so counters never overflow.
14//! * [`validate_address`] — rejects loopback, unspecified, broadcast,
15//!   multicast, and port-0 addresses.
16//! * [`create_dashmap_with_capacity`] — creates a [`DashMap`] with a shard
17//!   count tuned to the CPU count.
18//!
19//! # Quick start
20//!
21//! ```rust
22//! use std::sync::Arc;
23//! use udp_relay_core::{TunnelClient, create_dashmap_with_capacity};
24//!
25//! let clients = create_dashmap_with_capacity::<u32, Arc<TunnelClient>>(200);
26//!
27//! let addr = "1.2.3.4:5678".parse().unwrap();
28//! let client = Arc::new(TunnelClient::new_with_endpoint(addr, 30));
29//! clients.insert(42, client.clone());
30//!
31//! let now = TunnelClient::current_timestamp();
32//! client.update_stats(512, 0, now);
33//! client.set_last_receive_tick_at(now);
34//!
35//! if client.is_timed_out() {
36//!     clients.remove(&42);
37//! }
38//! ```
39
40use std::net::{IpAddr, SocketAddr};
41use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
42
43use dashmap::DashMap;
44
45/// A UDP tunnel client with lock-free atomic metrics.
46///
47/// Designed to live behind `Arc` in a `DashMap`, so all fields use bare atomics
48/// (no `Arc` wrappers). Not `Clone` — share via `Arc<TunnelClient>`.
49///
50/// Fields are ordered so the hot-path data (packet receive + stats) sits on the
51/// first cache line, and cold/periodic data sits on the second.
52#[repr(C, align(128))]
53pub struct TunnelClient {
54    // --- cache line 0: hot path (packet receive + bandwidth) ---
55    last_receive_tick: AtomicU64,
56    packet_count: AtomicU64,
57    bytes_received: AtomicU64,
58    bytes_sent: AtomicU64,
59    bandwidth_estimate: AtomicU64,
60    last_bandwidth_calc: AtomicU64,
61    last_bandwidth_bytes: AtomicU64,
62    timeout_seconds: u64,
63
64    // --- cache line 1: cold / periodic ---
65    priority_score: AtomicU64,
66    connection_start: AtomicU64,
67    priority_dirty: AtomicU32,
68    latency_ms: AtomicU32,
69    packet_loss_rate: AtomicU32,
70    is_slow_connection: AtomicU32,
71    pub remote_ep: Option<SocketAddr>,
72}
73
74impl TunnelClient {
75    /// Create a new client without a remote endpoint.
76    ///
77    /// The client will time out after `timeout_seconds` of inactivity.
78    #[must_use]
79    pub fn new(timeout_seconds: u64) -> Self {
80        let now = Self::current_timestamp();
81        Self {
82            last_receive_tick: AtomicU64::new(now),
83            timeout_seconds,
84            packet_count: AtomicU64::new(0),
85            bytes_received: AtomicU64::new(0),
86            bytes_sent: AtomicU64::new(0),
87            bandwidth_estimate: AtomicU64::new(0),
88            last_bandwidth_calc: AtomicU64::new(now),
89            last_bandwidth_bytes: AtomicU64::new(0),
90            priority_score: AtomicU64::new(1000),
91            priority_dirty: AtomicU32::new(0),
92            latency_ms: AtomicU32::new(0),
93            packet_loss_rate: AtomicU32::new(0),
94            connection_start: AtomicU64::new(now),
95            is_slow_connection: AtomicU32::new(0),
96            remote_ep: None,
97        }
98    }
99
100    /// Create a new client bound to the given remote endpoint.
101    ///
102    /// The client will time out after `timeout_seconds` of inactivity.
103    #[must_use]
104    pub fn new_with_endpoint(addr: SocketAddr, timeout_seconds: u64) -> Self {
105        let now = Self::current_timestamp();
106        Self {
107            last_receive_tick: AtomicU64::new(now),
108            timeout_seconds,
109            packet_count: AtomicU64::new(0),
110            bytes_received: AtomicU64::new(0),
111            bytes_sent: AtomicU64::new(0),
112            bandwidth_estimate: AtomicU64::new(0),
113            last_bandwidth_calc: AtomicU64::new(now),
114            last_bandwidth_bytes: AtomicU64::new(0),
115            priority_score: AtomicU64::new(1000),
116            priority_dirty: AtomicU32::new(0),
117            latency_ms: AtomicU32::new(0),
118            packet_loss_rate: AtomicU32::new(0),
119            connection_start: AtomicU64::new(now),
120            is_slow_connection: AtomicU32::new(0),
121            remote_ep: Some(addr),
122        }
123    }
124
125    /// Mark the client as having received a packet at `now` (unix seconds).
126    /// Call with a cached timestamp from the event loop — avoids a syscall.
127    #[inline]
128    pub fn set_last_receive_tick_at(&self, now: u64) {
129        self.last_receive_tick.store(now, Ordering::Release);
130    }
131
132    /// Mark the client as having received a packet right now.
133    /// Convenience wrapper that fetches the clock; prefer `set_last_receive_tick_at`
134    /// on the hot path.
135    #[inline]
136    pub fn set_last_receive_tick(&self) {
137        self.set_last_receive_tick_at(Self::current_timestamp());
138    }
139
140    /// Returns `true` if no packet has been received within the configured
141    /// timeout window.
142    #[inline]
143    #[must_use]
144    pub fn is_timed_out(&self) -> bool {
145        let current = Self::current_timestamp();
146        let last = self.last_receive_tick.load(Ordering::Acquire);
147        current.saturating_sub(last) >= self.timeout_seconds
148    }
149
150    /// Record packet bytes. `now` should be a cached unix-seconds timestamp
151    /// from the event loop — avoids a per-packet syscall.
152    #[inline]
153    pub fn update_stats(&self, bytes_in: usize, bytes_out: usize, now: u64) {
154        self.packet_count.fetch_add(1, Ordering::Relaxed);
155        self.bytes_received
156            .fetch_add(bytes_in as u64, Ordering::Relaxed);
157        self.bytes_sent
158            .fetch_add(bytes_out as u64, Ordering::Relaxed);
159
160        let last_calc = self.last_bandwidth_calc.load(Ordering::Relaxed);
161
162        if now > last_calc {
163            let time_delta = now - last_calc;
164            if time_delta >= 1 {
165                // CAS ensures only one thread wins; losers skip the recalculation.
166                if self
167                    .last_bandwidth_calc
168                    .compare_exchange(last_calc, now, Ordering::AcqRel, Ordering::Relaxed)
169                    .is_err()
170                {
171                    return;
172                }
173
174                let current_total = self.bytes_received.load(Ordering::Relaxed)
175                    + self.bytes_sent.load(Ordering::Relaxed);
176                let previous_total =
177                    self.last_bandwidth_bytes
178                        .swap(current_total, Ordering::Relaxed);
179                let delta_bytes = current_total.saturating_sub(previous_total);
180                let bandwidth = delta_bytes / time_delta;
181
182                let _ = self.bandwidth_estimate.fetch_update(
183                    Ordering::Relaxed,
184                    Ordering::Relaxed,
185                    |old_estimate| Some((old_estimate * 7 + bandwidth * 3) / 10),
186                );
187
188                self.mark_priority_dirty();
189            }
190        }
191    }
192
193    #[inline]
194    fn mark_priority_dirty(&self) {
195        self.priority_dirty.store(1, Ordering::Relaxed);
196    }
197
198    fn recompute_priority(&self) {
199        let now = Self::current_timestamp();
200        let bandwidth = self.bandwidth_estimate.load(Ordering::Relaxed);
201        let latency = u64::from(self.latency_ms.load(Ordering::Relaxed));
202        let loss_rate = u64::from(self.packet_loss_rate.load(Ordering::Relaxed));
203        let connection_age = now.saturating_sub(self.connection_start.load(Ordering::Relaxed));
204
205        // Approximate 1_000_000 / (bandwidth + 1000) using shifts:
206        //   ~1_048_576 / (bandwidth + 1024) via right-shift by 20 of the numerator.
207        // Avoids a hardware `div` on the hot path.
208        let bw_denom = bandwidth.max(100) + 1024;
209        let bandwidth_score = (1 << 20) / bw_denom;
210        let latency_score = latency.min(500);
211        let loss_score = loss_rate;
212        let age_score = (connection_age / 10).min(100);
213
214        let priority = bandwidth_score
215            .saturating_add(latency_score)
216            .saturating_add(loss_score)
217            .saturating_add(age_score);
218
219        self.priority_score.store(priority, Ordering::Relaxed);
220
221        let is_slow = u32::from(bandwidth < 100_000);
222        self.is_slow_connection.store(is_slow, Ordering::Relaxed);
223    }
224
225    /// Returns the priority score (lower is better), recomputing lazily only
226    /// when metrics have changed since the last read.
227    #[inline]
228    #[must_use]
229    pub fn get_priority(&self) -> u64 {
230        if self.priority_dirty.swap(0, Ordering::Relaxed) != 0 {
231            self.recompute_priority();
232        }
233        self.priority_score.load(Ordering::Relaxed)
234    }
235
236    /// Record the measured round-trip latency in milliseconds.
237    ///
238    /// Marks the priority score as dirty so it will be recomputed on the
239    /// next call to [`Self::get_priority`].
240    #[inline]
241    pub fn set_latency(&self, latency_ms: u32) {
242        self.latency_ms.store(latency_ms, Ordering::Relaxed);
243        self.mark_priority_dirty();
244    }
245
246    /// Record the measured packet loss rate (0–1000, i.e. 0.0%–100.0%).
247    ///
248    /// Values above 1000 are clamped. Marks the priority score as dirty.
249    #[inline]
250    pub fn set_packet_loss_rate(&self, rate: u32) {
251        self.packet_loss_rate
252            .store(rate.min(1000), Ordering::Relaxed);
253        self.mark_priority_dirty();
254    }
255
256    /// Returns `true` if the estimated bandwidth is below 100 KB/s.
257    ///
258    /// Updated as a side-effect of [`Self::get_priority`] (lazy recompute).
259    #[inline]
260    #[must_use]
261    pub fn is_slow_connection(&self) -> bool {
262        self.is_slow_connection.load(Ordering::Relaxed) == 1
263    }
264
265    /// Returns the current unix timestamp in seconds using `coarsetime`.
266    ///
267    /// Uses `CLOCK_MONOTONIC_COARSE` on Linux (~10-25x faster than `SystemTime::now()`).
268    /// For the absolute fastest path, call [`Self::update_clock`] once per event-loop
269    /// iteration and use [`Self::recent_timestamp`] everywhere else.
270    #[inline]
271    #[must_use]
272    pub fn current_timestamp() -> u64 {
273        coarsetime::Clock::now_since_epoch().as_secs()
274    }
275
276    /// Returns the most recently cached unix timestamp in seconds.
277    ///
278    /// Zero overhead — a single atomic load. Requires [`Self::update_clock`] to be
279    /// called periodically (e.g. once per event-loop tick).
280    #[inline]
281    #[must_use]
282    pub fn recent_timestamp() -> u64 {
283        coarsetime::Clock::recent_since_epoch().as_secs()
284    }
285
286    /// Refresh the cached clock value.  Call once per event-loop iteration so
287    /// that [`Self::recent_timestamp`] stays fresh.
288    #[inline]
289    pub fn update_clock() {
290        coarsetime::Clock::update();
291    }
292
293    /// Reset all traffic counters and the bandwidth estimate to zero.
294    ///
295    /// Also resets the connection start time and marks priority as dirty.
296    pub fn reset_stats(&self) {
297        let now = Self::current_timestamp();
298        self.packet_count.store(0, Ordering::Relaxed);
299        self.bytes_received.store(0, Ordering::Relaxed);
300        self.bytes_sent.store(0, Ordering::Relaxed);
301        self.bandwidth_estimate.store(0, Ordering::Relaxed);
302        self.last_bandwidth_calc.store(now, Ordering::Relaxed);
303        self.last_bandwidth_bytes.store(0, Ordering::Relaxed);
304        self.connection_start.store(now, Ordering::Relaxed);
305        self.mark_priority_dirty();
306    }
307}
308
309/// Lock-free packet loss tracker with periodic counter halving to avoid overflow.
310///
311/// Uses atomics so it can be shared across threads behind `Arc` without a mutex.
312pub struct QualityAnalyzer {
313    loss_count: AtomicU32,
314    total_count: AtomicU32,
315}
316
317impl QualityAnalyzer {
318    /// Create a new analyzer with zeroed counters.
319    #[must_use]
320    pub fn new() -> Self {
321        Self {
322            loss_count: AtomicU32::new(0),
323            total_count: AtomicU32::new(0),
324        }
325    }
326
327    /// Record a packet. Pass `true` if the packet was lost.
328    ///
329    /// Thread-safe: multiple threads may call this concurrently.
330    /// The halving step uses a CAS loop so only one thread performs
331    /// the reduction.
332    #[inline]
333    pub fn record_packet(&self, lost: bool) {
334        if lost {
335            self.loss_count.fetch_add(1, Ordering::Relaxed);
336        }
337        let prev = self.total_count.fetch_add(1, Ordering::Relaxed);
338        if prev + 1 > 10_000 {
339            let did_halve = self.total_count.fetch_update(
340                Ordering::Relaxed,
341                Ordering::Relaxed,
342                |t| if t > 10_000 { Some(t / 2) } else { None },
343            ).is_ok();
344
345            if did_halve {
346                let _ = self.loss_count.fetch_update(
347                    Ordering::Relaxed,
348                    Ordering::Relaxed,
349                    |l| Some(l / 2),
350                );
351            }
352        }
353    }
354
355    /// Returns packet loss rate as 0–1000 (i.e. 0.0%–100.0%).
356    #[inline]
357    #[must_use]
358    pub fn get_packet_loss_rate(&self) -> u32 {
359        // Read loss *before* total: if a halving interleaves between the two
360        // loads we overestimate rather than underestimate — safer for QoS.
361        let loss = self.loss_count.load(Ordering::Relaxed);
362        let total = self.total_count.load(Ordering::Relaxed);
363        if total == 0 {
364            return 0;
365        }
366        ((u64::from(loss) * 1000) / u64::from(total)).min(1000) as u32
367    }
368}
369
370impl Default for QualityAnalyzer {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376/// Validate that a socket address is routable (not loopback/unspecified/broadcast/multicast)
377/// and has a non-zero port.
378#[inline]
379#[must_use]
380pub fn validate_address(addr: &SocketAddr) -> bool {
381    let ip_valid = match addr.ip() {
382        IpAddr::V4(v4) => {
383            !v4.is_loopback() && !v4.is_unspecified() && !v4.is_broadcast() && !v4.is_multicast()
384        }
385        IpAddr::V6(v6) => !v6.is_loopback() && !v6.is_unspecified() && !v6.is_multicast(),
386    };
387    ip_valid && addr.port() != 0
388}
389
390/// Create a [`DashMap`] with a shard count tuned to the CPU count.
391///
392/// Uses 4× the CPU count (rounded to next power-of-two, clamped 4..=256) so
393/// `DashMap` can use bitmask-based shard selection.
394pub fn create_dashmap_with_capacity<K, V>(capacity: usize) -> DashMap<K, V>
395where
396    K: Eq + std::hash::Hash,
397{
398    #[allow(clippy::redundant_closure_for_method_calls)]
399    let detected_cpus = std::thread::available_parallelism()
400        .map(|n| n.get())
401        .unwrap_or(1);
402
403    let shard_amount = (detected_cpus * 4).next_power_of_two().clamp(4, 256);
404
405    tracing::debug!(
406        "Creating DashMap with {} shards for {} CPUs (capacity: {})",
407        shard_amount,
408        detected_cpus,
409        capacity
410    );
411
412    DashMap::with_capacity_and_shard_amount(capacity, shard_amount)
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use std::net::{Ipv4Addr, Ipv6Addr};
419    use std::sync::Arc;
420
421    // ──────────────────────────────────────────────
422    // TunnelClient — construction
423    // ──────────────────────────────────────────────
424
425    #[test]
426    fn new_with_endpoint_stores_address() {
427        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 9000);
428        let client = TunnelClient::new_with_endpoint(addr, 30);
429        assert_eq!(client.remote_ep, Some(addr));
430    }
431
432    #[test]
433    fn new_without_endpoint_is_none() {
434        let client = TunnelClient::new(30);
435        assert_eq!(client.remote_ep, None);
436    }
437
438    // ──────────────────────────────────────────────
439    // TunnelClient — timeout
440    // ──────────────────────────────────────────────
441
442    #[test]
443    fn tunnel_client_timeout() {
444        let client = TunnelClient::new(0);
445        assert!(client.is_timed_out());
446    }
447
448    #[test]
449    fn tunnel_client_not_timed_out() {
450        let client = TunnelClient::new(60);
451        assert!(!client.is_timed_out());
452    }
453
454    #[test]
455    fn set_last_receive_tick_resets_timeout() {
456        let client = TunnelClient::new(0);
457        assert!(client.is_timed_out());
458        client.set_last_receive_tick();
459        let client2 = TunnelClient::new(60);
460        client2.set_last_receive_tick_at(TunnelClient::current_timestamp());
461        assert!(!client2.is_timed_out());
462    }
463
464    #[test]
465    fn set_last_receive_tick_at_with_old_timestamp_times_out() {
466        let client = TunnelClient::new(5);
467        client.set_last_receive_tick_at(0);
468        assert!(client.is_timed_out());
469    }
470
471    // ──────────────────────────────────────────────
472    // TunnelClient — update_stats
473    // ──────────────────────────────────────────────
474
475    #[test]
476    fn update_stats_increments_counters() {
477        let client = TunnelClient::new(60);
478        let now = TunnelClient::current_timestamp();
479        client.update_stats(100, 50, now);
480        assert_eq!(client.packet_count.load(Ordering::Relaxed), 1);
481        assert_eq!(client.bytes_received.load(Ordering::Relaxed), 100);
482        assert_eq!(client.bytes_sent.load(Ordering::Relaxed), 50);
483    }
484
485    #[test]
486    fn update_stats_accumulates_multiple_packets() {
487        let client = TunnelClient::new(60);
488        let now = TunnelClient::current_timestamp();
489        client.update_stats(100, 50, now);
490        client.update_stats(200, 75, now);
491        client.update_stats(300, 25, now);
492        assert_eq!(client.packet_count.load(Ordering::Relaxed), 3);
493        assert_eq!(client.bytes_received.load(Ordering::Relaxed), 600);
494        assert_eq!(client.bytes_sent.load(Ordering::Relaxed), 150);
495    }
496
497    #[test]
498    fn update_stats_computes_bandwidth_after_time_delta() {
499        let client = TunnelClient::new(60);
500        let t0 = TunnelClient::current_timestamp();
501        client.update_stats(5000, 5000, t0);
502
503        let t1 = t0 + 2;
504        client.update_stats(5000, 5000, t1);
505
506        let bw = client.bandwidth_estimate.load(Ordering::Relaxed);
507        assert!(bw > 0, "bandwidth should be non-zero after time delta, got {bw}");
508    }
509
510    #[test]
511    fn update_stats_skips_bandwidth_when_now_equals_last_calc() {
512        let client = TunnelClient::new(60);
513        let now = TunnelClient::current_timestamp();
514        client.update_stats(1000, 1000, now);
515        client.update_stats(1000, 1000, now);
516        let bw = client.bandwidth_estimate.load(Ordering::Relaxed);
517        assert_eq!(bw, 0, "bandwidth should not update when time_delta == 0");
518    }
519
520    #[test]
521    fn update_stats_skips_bandwidth_when_now_is_in_the_past() {
522        let client = TunnelClient::new(60);
523        let now = TunnelClient::current_timestamp();
524        client.update_stats(1000, 1000, now);
525        client.update_stats(1000, 1000, now.saturating_sub(10));
526        let bw = client.bandwidth_estimate.load(Ordering::Relaxed);
527        assert_eq!(bw, 0, "bandwidth should not update when now < last_calc");
528    }
529
530    // ──────────────────────────────────────────────
531    // TunnelClient — priority, latency, loss, slow
532    // ──────────────────────────────────────────────
533
534    #[test]
535    fn priority_is_lazy() {
536        let client = TunnelClient::new(60);
537        let initial = client.get_priority();
538        assert!(initial > 0);
539        client.set_latency(100);
540        let score_before_read = client.priority_score.load(Ordering::Relaxed);
541        let score_after_read = client.get_priority();
542        assert_eq!(score_before_read, initial);
543        assert_ne!(score_after_read, initial);
544    }
545
546    #[test]
547    fn set_latency_stores_value() {
548        let client = TunnelClient::new(60);
549        client.set_latency(42);
550        assert_eq!(client.latency_ms.load(Ordering::Relaxed), 42);
551    }
552
553    #[test]
554    fn set_packet_loss_rate_clamps_to_1000() {
555        let client = TunnelClient::new(60);
556        client.set_packet_loss_rate(9999);
557        assert_eq!(client.packet_loss_rate.load(Ordering::Relaxed), 1000);
558    }
559
560    #[test]
561    fn set_packet_loss_rate_stores_valid_value() {
562        let client = TunnelClient::new(60);
563        client.set_packet_loss_rate(250);
564        assert_eq!(client.packet_loss_rate.load(Ordering::Relaxed), 250);
565    }
566
567    #[test]
568    fn high_latency_increases_priority_score() {
569        let client = TunnelClient::new(60);
570        let low = {
571            client.set_latency(0);
572            client.get_priority()
573        };
574        let high = {
575            client.set_latency(400);
576            client.get_priority()
577        };
578        assert!(high > low, "higher latency should increase priority score (lower is better)");
579    }
580
581    #[test]
582    fn is_slow_connection_reflects_bandwidth() {
583        let client = TunnelClient::new(60);
584        client.set_latency(0);
585        let _ = client.get_priority();
586        assert!(client.is_slow_connection());
587    }
588
589    // ──────────────────────────────────────────────
590    // TunnelClient — reset_stats
591    // ──────────────────────────────────────────────
592
593    #[test]
594    fn reset_stats_zeroes_counters() {
595        let client = TunnelClient::new(60);
596        let now = TunnelClient::current_timestamp();
597        client.update_stats(1000, 500, now);
598        client.update_stats(1000, 500, now + 2);
599        client.reset_stats();
600
601        assert_eq!(client.packet_count.load(Ordering::Relaxed), 0);
602        assert_eq!(client.bytes_received.load(Ordering::Relaxed), 0);
603        assert_eq!(client.bytes_sent.load(Ordering::Relaxed), 0);
604        assert_eq!(client.bandwidth_estimate.load(Ordering::Relaxed), 0);
605        assert_eq!(client.last_bandwidth_bytes.load(Ordering::Relaxed), 0);
606    }
607
608    #[test]
609    fn reset_stats_marks_priority_dirty() {
610        let client = TunnelClient::new(60);
611        let _ = client.get_priority(); // clear dirty flag
612        client.reset_stats();
613        assert_eq!(
614            client.priority_dirty.load(Ordering::Relaxed),
615            1,
616            "reset_stats should mark priority as dirty"
617        );
618    }
619
620    // ──────────────────────────────────────────────
621    // TunnelClient — timestamps
622    // ──────────────────────────────────────────────
623
624    #[test]
625    fn recent_timestamp_works() {
626        TunnelClient::update_clock();
627        let recent = TunnelClient::recent_timestamp();
628        let now = TunnelClient::current_timestamp();
629        assert!(now.abs_diff(recent) <= 1);
630    }
631
632    #[test]
633    fn current_timestamp_is_plausible() {
634        let ts = TunnelClient::current_timestamp();
635        // Should be after 2024-01-01 and before 2100-01-01
636        assert!(ts > 1_704_067_200, "timestamp too small: {ts}");
637        assert!(ts < 4_102_444_800, "timestamp too large: {ts}");
638    }
639
640    // ──────────────────────────────────────────────
641    // TunnelClient — thread safety
642    // ──────────────────────────────────────────────
643
644    #[test]
645    fn tunnel_client_is_send_and_sync() {
646        fn assert_send_sync<T: Send + Sync>() {}
647        assert_send_sync::<TunnelClient>();
648    }
649
650    #[test]
651    fn tunnel_client_concurrent_update_stats() {
652        let client = Arc::new(TunnelClient::new(60));
653        let threads: Vec<_> = (0..4)
654            .map(|i| {
655                let c = Arc::clone(&client);
656                std::thread::spawn(move || {
657                    let base = TunnelClient::current_timestamp() + i;
658                    for j in 0..1000 {
659                        c.update_stats(100, 50, base + j);
660                    }
661                })
662            })
663            .collect();
664
665        for t in threads {
666            t.join().unwrap();
667        }
668
669        let total_packets = client.packet_count.load(Ordering::Relaxed);
670        assert_eq!(total_packets, 4000, "all packets should be counted");
671
672        let total_in = client.bytes_received.load(Ordering::Relaxed);
673        assert_eq!(total_in, 400_000, "all bytes_in should be counted");
674    }
675
676    // ──────────────────────────────────────────────
677    // QualityAnalyzer
678    // ──────────────────────────────────────────────
679
680    #[test]
681    fn quality_analyzer_no_loss() {
682        let qa = QualityAnalyzer::new();
683        for _ in 0..100 {
684            qa.record_packet(false);
685        }
686        assert_eq!(qa.get_packet_loss_rate(), 0);
687    }
688
689    #[test]
690    fn quality_analyzer_50_percent_loss() {
691        let qa = QualityAnalyzer::new();
692        for i in 0..100 {
693            qa.record_packet(i % 2 == 0);
694        }
695        assert_eq!(qa.get_packet_loss_rate(), 500);
696    }
697
698    #[test]
699    fn quality_analyzer_100_percent_loss() {
700        let qa = QualityAnalyzer::new();
701        for _ in 0..100 {
702            qa.record_packet(true);
703        }
704        assert_eq!(qa.get_packet_loss_rate(), 1000);
705    }
706
707    #[test]
708    fn quality_analyzer_empty_returns_zero() {
709        let qa = QualityAnalyzer::new();
710        assert_eq!(qa.get_packet_loss_rate(), 0);
711    }
712
713    #[test]
714    fn quality_analyzer_halving() {
715        let qa = QualityAnalyzer::new();
716        for _ in 0..10_001 {
717            qa.record_packet(false);
718        }
719        let total = qa.total_count.load(Ordering::Relaxed);
720        assert!(total <= 5_001, "expected halving, got total={total}");
721    }
722
723    #[test]
724    fn quality_analyzer_halving_preserves_rate() {
725        let qa = QualityAnalyzer::new();
726        // ~25% loss: every 4th packet is lost
727        for i in 0..10_001 {
728            qa.record_packet(i % 4 == 0);
729        }
730        let rate = qa.get_packet_loss_rate();
731        // After halving, the ratio should remain approximately 250 (25%)
732        // Allow some tolerance since halving is integer division
733        assert!(
734            (200..=300).contains(&rate),
735            "loss rate should be ~250 after halving, got {rate}"
736        );
737    }
738
739    #[test]
740    fn quality_analyzer_default_trait() {
741        let qa = QualityAnalyzer::default();
742        assert_eq!(qa.get_packet_loss_rate(), 0);
743        assert_eq!(qa.total_count.load(Ordering::Relaxed), 0);
744    }
745
746    #[test]
747    fn quality_analyzer_is_send_and_sync() {
748        fn assert_send_sync<T: Send + Sync>() {}
749        assert_send_sync::<QualityAnalyzer>();
750    }
751
752    #[test]
753    fn quality_analyzer_concurrent_recording() {
754        let qa = Arc::new(QualityAnalyzer::new());
755        let threads: Vec<_> = (0..4)
756            .map(|_| {
757                let q = Arc::clone(&qa);
758                std::thread::spawn(move || {
759                    for i in 0..2500 {
760                        q.record_packet(i % 2 == 0);
761                    }
762                })
763            })
764            .collect();
765
766        for t in threads {
767            t.join().unwrap();
768        }
769
770        let rate = qa.get_packet_loss_rate();
771        // 50% loss across all threads, allow tolerance due to halving
772        assert!(
773            (400..=600).contains(&rate),
774            "concurrent 50% loss rate should be ~500, got {rate}"
775        );
776    }
777
778    // ──────────────────────────────────────────────
779    // validate_address — IPv4
780    // ──────────────────────────────────────────────
781
782    #[test]
783    fn validate_address_rejects_loopback() {
784        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234);
785        assert!(!validate_address(&addr));
786    }
787
788    #[test]
789    fn validate_address_rejects_zero_port() {
790        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 0);
791        assert!(!validate_address(&addr));
792    }
793
794    #[test]
795    fn validate_address_rejects_unspecified() {
796        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234);
797        assert!(!validate_address(&addr));
798    }
799
800    #[test]
801    fn validate_address_rejects_broadcast() {
802        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 1234);
803        assert!(!validate_address(&addr));
804    }
805
806    #[test]
807    fn validate_address_rejects_multicast_v4() {
808        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(224, 0, 0, 1)), 1234);
809        assert!(!validate_address(&addr));
810    }
811
812    #[test]
813    fn validate_address_accepts_valid() {
814        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 1234);
815        assert!(validate_address(&addr));
816    }
817
818    #[test]
819    fn validate_address_accepts_private_v4() {
820        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080);
821        assert!(validate_address(&addr));
822    }
823
824    // ──────────────────────────────────────────────
825    // validate_address — IPv6
826    // ──────────────────────────────────────────────
827
828    #[test]
829    fn validate_address_rejects_ipv6_loopback() {
830        let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 1234);
831        assert!(!validate_address(&addr));
832    }
833
834    #[test]
835    fn validate_address_rejects_ipv6_unspecified() {
836        let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 1234);
837        assert!(!validate_address(&addr));
838    }
839
840    #[test]
841    fn validate_address_rejects_ipv6_multicast() {
842        // ff02::1 is a well-known multicast address
843        let addr = SocketAddr::new(
844            IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 1)),
845            1234,
846        );
847        assert!(!validate_address(&addr));
848    }
849
850    #[test]
851    fn validate_address_accepts_valid_ipv6() {
852        let addr = SocketAddr::new(
853            IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
854            443,
855        );
856        assert!(validate_address(&addr));
857    }
858
859    #[test]
860    fn validate_address_rejects_ipv6_zero_port() {
861        let addr = SocketAddr::new(
862            IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
863            0,
864        );
865        assert!(!validate_address(&addr));
866    }
867
868    // ──────────────────────────────────────────────
869    // DashMap creation
870    // ──────────────────────────────────────────────
871
872    #[test]
873    fn dashmap_creation() {
874        let map: DashMap<u32, String> = create_dashmap_with_capacity(100);
875        map.insert(1, "test".to_string());
876        assert_eq!(map.len(), 1);
877    }
878
879    #[test]
880    fn dashmap_creation_zero_capacity() {
881        let map: DashMap<u32, u32> = create_dashmap_with_capacity(0);
882        map.insert(1, 42);
883        assert_eq!(*map.get(&1).unwrap(), 42);
884    }
885}