triglav/multipath/
scheduler.rs

1//! Intelligent packet scheduler for multi-path distribution.
2//!
3//! Implements multiple scheduling strategies:
4//! - Weighted round-robin
5//! - Lowest latency first
6//! - Lowest loss first
7//! - Adaptive (combines multiple signals)
8//! - Redundant (send on multiple paths)
9//! - EffectiveThroughput (optimized for maximum throughput)
10//! - LatencyAware (prevents high-latency blocking)
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18
19use super::throughput::{EffectiveThroughput, ThroughputConfig};
20use super::Uplink;
21
22/// Scheduling strategy.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum SchedulingStrategy {
26    /// Weighted round-robin based on configured weights.
27    WeightedRoundRobin,
28    /// Always choose lowest latency uplink.
29    LowestLatency,
30    /// Always choose lowest loss uplink.
31    LowestLoss,
32    /// Adaptive selection based on multiple factors.
33    #[default]
34    Adaptive,
35    /// Send on all uplinks for redundancy.
36    Redundant,
37    /// Send on primary, failover to secondary.
38    PrimaryBackup,
39    /// Load balance based on available bandwidth.
40    BandwidthProportional,
41    /// ECMP-aware selection using flow hash for path stickiness.
42    /// Packets with the same flow hash use the same uplink.
43    EcmpAware,
44    /// Effective throughput: combines bandwidth, latency, and loss for true throughput.
45    /// Best for maximizing actual data transfer speed.
46    EffectiveThroughput,
47    /// Latency-aware: prevents high-latency paths from blocking transfers.
48    /// Considers transfer size to pick the fastest path for that specific transfer.
49    LatencyAware,
50    /// Size-based: automatically chooses strategy based on transfer size.
51    /// Small transfers prefer low latency, large transfers prefer high bandwidth.
52    SizeBased,
53}
54
55/// Scheduler configuration.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SchedulerConfig {
58    /// Scheduling strategy.
59    #[serde(default)]
60    pub strategy: SchedulingStrategy,
61
62    /// Minimum RTT difference to prefer one uplink (ms).
63    #[serde(default = "default_rtt_threshold")]
64    pub rtt_threshold_ms: u32,
65
66    /// Minimum loss difference to prefer one uplink (%).
67    #[serde(default = "default_loss_threshold")]
68    pub loss_threshold_percent: f32,
69
70    /// Weight for RTT in adaptive scoring (0-1).
71    #[serde(default = "default_rtt_weight")]
72    pub rtt_weight: f32,
73
74    /// Weight for loss in adaptive scoring (0-1).
75    #[serde(default = "default_loss_weight")]
76    pub loss_weight: f32,
77
78    /// Weight for bandwidth in adaptive scoring (0-1).
79    #[serde(default = "default_bw_weight")]
80    pub bandwidth_weight: f32,
81
82    /// Weight for NAT penalty in adaptive scoring (0-1).
83    /// Higher values penalize NATted uplinks more heavily.
84    #[serde(default = "default_nat_weight")]
85    pub nat_penalty_weight: f32,
86
87    /// Enable path stickiness (prefer same path for related packets).
88    #[serde(default = "default_sticky")]
89    pub sticky_paths: bool,
90
91    /// Stickiness timeout.
92    #[serde(default = "default_sticky_timeout", with = "humantime_serde")]
93    pub sticky_timeout: Duration,
94
95    /// Enable proactive probing of backup paths.
96    #[serde(default = "default_probe")]
97    pub probe_backup_paths: bool,
98
99    /// Probe interval for backup paths.
100    #[serde(default = "default_probe_interval", with = "humantime_serde")]
101    pub probe_interval: Duration,
102
103    /// Throughput optimization configuration.
104    #[serde(default)]
105    pub throughput: ThroughputConfig,
106
107    /// Maximum acceptable latency before heavy penalty (ms).
108    /// Used by LatencyAware and EffectiveThroughput strategies.
109    #[serde(default = "default_max_latency")]
110    pub max_acceptable_latency_ms: u32,
111
112    /// Threshold size for size-based strategy (bytes).
113    /// Transfers smaller than this prefer latency; larger prefer bandwidth.
114    #[serde(default = "default_size_threshold")]
115    pub size_threshold_bytes: u64,
116
117    /// Enable effective throughput scoring for all strategies.
118    /// When true, even Adaptive strategy uses throughput-aware scoring.
119    #[serde(default = "default_throughput_aware")]
120    pub throughput_aware: bool,
121
122    /// Weight for effective throughput in scoring (0-1).
123    /// Higher values prioritize throughput over raw metrics.
124    #[serde(default = "default_effective_throughput_weight")]
125    pub effective_throughput_weight: f32,
126
127    /// Enable latency-blocking prevention.
128    /// Prevents high-latency paths from being selected when faster paths exist.
129    #[serde(default = "default_prevent_latency_blocking")]
130    pub prevent_latency_blocking: bool,
131
132    /// Latency blocking threshold ratio.
133    /// If an uplink's latency exceeds best_latency * ratio, it's blocked.
134    #[serde(default = "default_latency_blocking_ratio")]
135    pub latency_blocking_ratio: f32,
136}
137
138fn default_rtt_threshold() -> u32 {
139    10
140}
141fn default_loss_threshold() -> f32 {
142    2.0
143}
144fn default_rtt_weight() -> f32 {
145    0.25
146}
147fn default_loss_weight() -> f32 {
148    0.25
149}
150fn default_bw_weight() -> f32 {
151    0.35
152}
153fn default_nat_weight() -> f32 {
154    0.05
155}
156fn default_sticky() -> bool {
157    true
158}
159fn default_sticky_timeout() -> Duration {
160    Duration::from_secs(5)
161}
162fn default_probe() -> bool {
163    true
164}
165fn default_probe_interval() -> Duration {
166    Duration::from_secs(1)
167}
168fn default_max_latency() -> u32 {
169    500
170}
171fn default_size_threshold() -> u64 {
172    64 * 1024
173} // 64 KB
174fn default_throughput_aware() -> bool {
175    true
176}
177fn default_effective_throughput_weight() -> f32 {
178    0.10
179}
180fn default_prevent_latency_blocking() -> bool {
181    true
182}
183fn default_latency_blocking_ratio() -> f32 {
184    10.0
185}
186
187impl Default for SchedulerConfig {
188    fn default() -> Self {
189        Self {
190            strategy: SchedulingStrategy::default(),
191            rtt_threshold_ms: default_rtt_threshold(),
192            loss_threshold_percent: default_loss_threshold(),
193            rtt_weight: default_rtt_weight(),
194            loss_weight: default_loss_weight(),
195            bandwidth_weight: default_bw_weight(),
196            nat_penalty_weight: default_nat_weight(),
197            sticky_paths: default_sticky(),
198            sticky_timeout: default_sticky_timeout(),
199            probe_backup_paths: default_probe(),
200            probe_interval: default_probe_interval(),
201            throughput: ThroughputConfig::default(),
202            max_acceptable_latency_ms: default_max_latency(),
203            size_threshold_bytes: default_size_threshold(),
204            throughput_aware: default_throughput_aware(),
205            effective_throughput_weight: default_effective_throughput_weight(),
206            prevent_latency_blocking: default_prevent_latency_blocking(),
207            latency_blocking_ratio: default_latency_blocking_ratio(),
208        }
209    }
210}
211
212/// State for weighted round-robin.
213#[derive(Debug, Default)]
214struct WrrState {
215    current_index: usize,
216    current_weight: u32,
217}
218
219/// Path stickiness tracking.
220#[derive(Debug)]
221struct PathStickiness {
222    /// Flow -> (uplink_id, last_used)
223    flows: HashMap<u64, (u16, Instant)>,
224}
225
226impl PathStickiness {
227    fn new() -> Self {
228        Self {
229            flows: HashMap::new(),
230        }
231    }
232
233    fn get(&self, flow_id: u64, timeout: Duration) -> Option<u16> {
234        self.flows.get(&flow_id).and_then(|(uplink, last)| {
235            if last.elapsed() < timeout {
236                Some(*uplink)
237            } else {
238                None
239            }
240        })
241    }
242
243    fn set(&mut self, flow_id: u64, uplink_id: u16) {
244        self.flows.insert(flow_id, (uplink_id, Instant::now()));
245    }
246
247    fn cleanup(&mut self, timeout: Duration) {
248        self.flows.retain(|_, (_, last)| last.elapsed() < timeout);
249    }
250}
251
252/// Packet scheduler.
253pub struct Scheduler {
254    config: SchedulerConfig,
255    wrr_state: RwLock<WrrState>,
256    stickiness: RwLock<PathStickiness>,
257    last_probe: RwLock<HashMap<u16, Instant>>,
258    /// Cached effective throughput scores.
259    throughput_cache: RwLock<HashMap<u16, (Instant, EffectiveThroughput)>>,
260    /// Cache TTL.
261    cache_ttl: Duration,
262}
263
264impl Scheduler {
265    /// Create a new scheduler.
266    pub fn new(config: SchedulerConfig) -> Self {
267        Self {
268            config,
269            wrr_state: RwLock::new(WrrState::default()),
270            stickiness: RwLock::new(PathStickiness::new()),
271            last_probe: RwLock::new(HashMap::new()),
272            throughput_cache: RwLock::new(HashMap::new()),
273            cache_ttl: Duration::from_millis(100),
274        }
275    }
276
277    /// Get configuration.
278    pub fn config(&self) -> &SchedulerConfig {
279        &self.config
280    }
281
282    /// Select uplink(s) for a packet.
283    ///
284    /// Returns a list of uplink IDs in priority order.
285    /// For most strategies, this returns a single uplink.
286    /// For Redundant strategy, returns multiple uplinks.
287    pub fn select(&self, uplinks: &[Arc<Uplink>], flow_id: Option<u64>) -> Vec<u16> {
288        self.select_for_size(uplinks, flow_id, None)
289    }
290
291    /// Select uplink(s) for a packet of known size.
292    /// This enables size-aware scheduling to optimize for actual transfer time.
293    pub fn select_for_size(
294        &self,
295        uplinks: &[Arc<Uplink>],
296        flow_id: Option<u64>,
297        size_bytes: Option<u64>,
298    ) -> Vec<u16> {
299        // Filter to usable uplinks, applying latency blocking prevention if enabled
300        let usable: Vec<_> = if self.config.prevent_latency_blocking {
301            self.filter_latency_blocked(uplinks)
302        } else {
303            uplinks.iter().filter(|u| u.is_usable()).collect()
304        };
305
306        if usable.is_empty() {
307            return vec![];
308        }
309
310        // Check stickiness first
311        if self.config.sticky_paths {
312            if let Some(flow) = flow_id {
313                let sticky = self.stickiness.read().get(flow, self.config.sticky_timeout);
314                if let Some(sticky_uplink) = sticky {
315                    if usable.iter().any(|u| u.numeric_id() == sticky_uplink) {
316                        return vec![sticky_uplink];
317                    }
318                }
319            }
320        }
321
322        let selected = match self.config.strategy {
323            SchedulingStrategy::WeightedRoundRobin => self.select_wrr(&usable),
324            SchedulingStrategy::LowestLatency => Self::select_lowest_latency(&usable),
325            SchedulingStrategy::LowestLoss => Self::select_lowest_loss(&usable),
326            SchedulingStrategy::Adaptive => self.select_adaptive(&usable),
327            SchedulingStrategy::Redundant => Self::select_redundant(&usable),
328            SchedulingStrategy::PrimaryBackup => Self::select_primary_backup(&usable),
329            SchedulingStrategy::BandwidthProportional => {
330                self.select_bandwidth_proportional(&usable)
331            }
332            SchedulingStrategy::EcmpAware => Self::select_ecmp_aware(&usable, flow_id),
333            SchedulingStrategy::EffectiveThroughput => self.select_effective_throughput(&usable),
334            SchedulingStrategy::LatencyAware => self.select_latency_aware(&usable, size_bytes),
335            SchedulingStrategy::SizeBased => self.select_size_based(&usable, size_bytes),
336        };
337
338        // Update stickiness
339        if self.config.sticky_paths && !selected.is_empty() {
340            if let Some(flow) = flow_id {
341                self.stickiness.write().set(flow, selected[0]);
342            }
343        }
344
345        selected
346    }
347
348    /// Filter out uplinks that would cause latency blocking.
349    /// An uplink is blocked if its RTT exceeds best_rtt * blocking_ratio.
350    fn filter_latency_blocked<'a>(&self, uplinks: &'a [Arc<Uplink>]) -> Vec<&'a Arc<Uplink>> {
351        let usable: Vec<_> = uplinks.iter().filter(|u| u.is_usable()).collect();
352
353        if usable.is_empty() {
354            return usable;
355        }
356
357        // Find minimum RTT
358        let min_rtt = usable
359            .iter()
360            .map(|u| u.rtt())
361            .min()
362            .unwrap_or(Duration::ZERO);
363
364        if min_rtt == Duration::ZERO {
365            return usable;
366        }
367
368        let threshold = Duration::from_secs_f64(
369            min_rtt.as_secs_f64() * self.config.latency_blocking_ratio as f64,
370        );
371
372        usable
373            .into_iter()
374            .filter(|u| u.rtt() <= threshold)
375            .collect()
376    }
377
378    /// Weighted round-robin selection.
379    fn select_wrr(&self, uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
380        if uplinks.is_empty() {
381            return vec![];
382        }
383
384        let mut state = self.wrr_state.write();
385
386        // Find uplink with remaining weight
387        let max_weight: u32 = uplinks.iter().map(|u| u.config().weight).max().unwrap_or(1);
388
389        loop {
390            state.current_index = (state.current_index + 1) % uplinks.len();
391
392            if state.current_index == 0 {
393                if state.current_weight == 0 {
394                    state.current_weight = max_weight;
395                } else {
396                    state.current_weight -= 1;
397                }
398            }
399
400            let uplink = &uplinks[state.current_index];
401            if uplink.config().weight >= state.current_weight && uplink.can_send() {
402                return vec![uplink.numeric_id()];
403            }
404
405            // Safety: prevent infinite loop
406            if state.current_weight == 0 && state.current_index == 0 {
407                break;
408            }
409        }
410
411        // Fallback to first usable
412        uplinks
413            .first()
414            .map(|u| vec![u.numeric_id()])
415            .unwrap_or_default()
416    }
417
418    /// Select lowest latency uplink.
419    fn select_lowest_latency(uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
420        uplinks
421            .iter()
422            .filter(|u| u.can_send())
423            .min_by_key(|u| u.rtt())
424            .map(|u| vec![u.numeric_id()])
425            .unwrap_or_default()
426    }
427
428    /// Select lowest loss uplink.
429    fn select_lowest_loss(uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
430        uplinks
431            .iter()
432            .filter(|u| u.can_send())
433            .min_by(|a, b| {
434                a.loss_ratio()
435                    .partial_cmp(&b.loss_ratio())
436                    .unwrap_or(std::cmp::Ordering::Equal)
437            })
438            .map(|u| vec![u.numeric_id()])
439            .unwrap_or_default()
440    }
441
442    /// Adaptive selection based on multiple factors.
443    /// When throughput_aware is enabled, also includes effective throughput scoring.
444    fn select_adaptive(&self, uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
445        // Calculate scores for each uplink
446        let mut scored: Vec<_> = uplinks
447            .iter()
448            .filter(|u| u.can_send())
449            .map(|u| {
450                let rtt_score = Self::rtt_score(u);
451                let loss_score = Self::loss_score(u);
452                let bw_score = Self::bandwidth_score(u, uplinks);
453                let nat_score = Self::nat_score(u);
454
455                let mut total_score = rtt_score * self.config.rtt_weight
456                    + loss_score * self.config.loss_weight
457                    + bw_score * self.config.bandwidth_weight
458                    + nat_score * self.config.nat_penalty_weight;
459
460                // Add effective throughput component if enabled
461                if self.config.throughput_aware {
462                    let eff_throughput = self.calculate_effective_throughput(u);
463                    total_score +=
464                        eff_throughput.score as f32 * self.config.effective_throughput_weight;
465                }
466
467                (u.numeric_id(), total_score)
468            })
469            .collect();
470
471        // Sort by score (highest first)
472        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
473
474        scored.into_iter().map(|(id, _)| id).take(1).collect()
475    }
476
477    /// Select based on effective throughput (combines bandwidth, latency, loss).
478    fn select_effective_throughput(&self, uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
479        let mut scored: Vec<_> = uplinks
480            .iter()
481            .filter(|u| u.can_send())
482            .map(|u| {
483                let throughput = self.calculate_effective_throughput(u);
484                (u.numeric_id(), throughput.score)
485            })
486            .collect();
487
488        scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
489
490        scored.into_iter().map(|(id, _)| id).take(1).collect()
491    }
492
493    /// Select with latency awareness - considers transfer size to pick fastest path.
494    fn select_latency_aware(&self, uplinks: &[&Arc<Uplink>], size_bytes: Option<u64>) -> Vec<u16> {
495        let size = size_bytes.unwrap_or(self.config.size_threshold_bytes);
496
497        let mut scored: Vec<_> = uplinks
498            .iter()
499            .filter(|u| u.can_send())
500            .map(|u| {
501                let throughput = self.calculate_effective_throughput(u);
502                let transfer_time = throughput.transfer_time(size);
503                (u.numeric_id(), transfer_time)
504            })
505            .collect();
506
507        // Sort by transfer time (lowest first)
508        scored.sort_by(|a, b| a.1.cmp(&b.1));
509
510        scored.into_iter().map(|(id, _)| id).take(1).collect()
511    }
512
513    /// Size-based strategy: small transfers prefer latency, large prefer bandwidth.
514    fn select_size_based(&self, uplinks: &[&Arc<Uplink>], size_bytes: Option<u64>) -> Vec<u16> {
515        let size = size_bytes.unwrap_or(self.config.size_threshold_bytes);
516
517        if size < self.config.size_threshold_bytes {
518            // Small transfer: prefer lowest latency
519            Self::select_lowest_latency(uplinks)
520        } else {
521            // Large transfer: use effective throughput
522            self.select_effective_throughput(uplinks)
523        }
524    }
525
526    /// Calculate effective throughput for an uplink.
527    fn calculate_effective_throughput(&self, uplink: &Uplink) -> EffectiveThroughput {
528        let uplink_id = uplink.numeric_id();
529
530        // Check cache first
531        {
532            let cache = self.throughput_cache.read();
533            if let Some((cached_at, throughput)) = cache.get(&uplink_id) {
534                if cached_at.elapsed() < self.cache_ttl {
535                    return *throughput;
536                }
537            }
538        }
539
540        // Calculate fresh
541        let metrics = uplink.quality_metrics();
542        let throughput = EffectiveThroughput::calculate(
543            uplink.bandwidth().bytes_per_sec,
544            uplink.rtt(),
545            uplink.loss_ratio(),
546            metrics.jitter,
547            &self.config.throughput,
548        );
549
550        // Update cache
551        {
552            let mut cache = self.throughput_cache.write();
553            cache.insert(uplink_id, (Instant::now(), throughput));
554        }
555
556        throughput
557    }
558
559    /// Redundant selection (all usable uplinks).
560    fn select_redundant(uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
561        uplinks
562            .iter()
563            .filter(|u| u.can_send())
564            .map(|u| u.numeric_id())
565            .collect()
566    }
567
568    /// Primary-backup selection.
569    fn select_primary_backup(uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
570        // Sort by priority (highest first)
571        let mut sorted: Vec<_> = uplinks.iter().collect();
572        sorted.sort_by_key(|u| std::cmp::Reverse(u.priority_score()));
573
574        // Return first usable, with backup
575        let mut result = Vec::new();
576        for uplink in sorted {
577            if uplink.can_send() {
578                result.push(uplink.numeric_id());
579                if result.len() >= 2 {
580                    break;
581                }
582            }
583        }
584        result
585    }
586
587    /// Bandwidth-proportional selection.
588    fn select_bandwidth_proportional(&self, uplinks: &[&Arc<Uplink>]) -> Vec<u16> {
589        // Select based on available bandwidth ratio
590        let total_bw: f64 = uplinks
591            .iter()
592            .filter(|u| u.can_send())
593            .map(|u| u.bandwidth().bytes_per_sec)
594            .sum();
595
596        if total_bw == 0.0 {
597            return self.select_wrr(uplinks);
598        }
599
600        // Use randomized selection weighted by bandwidth
601        let r: f64 = rand::random();
602        let mut cumulative = 0.0;
603
604        for uplink in uplinks.iter().filter(|u| u.can_send()) {
605            cumulative += uplink.bandwidth().bytes_per_sec / total_bw;
606            if r <= cumulative {
607                return vec![uplink.numeric_id()];
608            }
609        }
610
611        uplinks
612            .first()
613            .map(|u| vec![u.numeric_id()])
614            .unwrap_or_default()
615    }
616
617    /// ECMP-aware selection using flow hash for consistent path selection.
618    ///
619    /// This strategy uses the flow ID (which can be derived from a flow hash)
620    /// to consistently select the same uplink for packets belonging to the same flow.
621    /// This mimics ECMP router behavior where packets with the same 5-tuple hash
622    /// traverse the same path.
623    fn select_ecmp_aware(uplinks: &[&Arc<Uplink>], flow_id: Option<u64>) -> Vec<u16> {
624        let sendable: Vec<_> = uplinks.iter().filter(|u| u.can_send()).collect();
625
626        if sendable.is_empty() {
627            return vec![];
628        }
629
630        // Use flow_id to select uplink (consistent hashing)
631        if let Some(flow) = flow_id {
632            // Use the flow ID to select an uplink index
633            // This ensures packets with the same flow use the same uplink
634            let index = (flow as usize) % sendable.len();
635            return vec![sendable[index].numeric_id()];
636        }
637
638        // Fallback: select based on uplink with best combined score
639        sendable
640            .iter()
641            .max_by(|a, b| {
642                let score_a = a.priority_score();
643                let score_b = b.priority_score();
644                score_a.cmp(&score_b)
645            })
646            .map(|u| vec![u.numeric_id()])
647            .unwrap_or_default()
648    }
649
650    /// Calculate RTT score (0-1, higher is better).
651    fn rtt_score(uplink: &Uplink) -> f32 {
652        let rtt = uplink.rtt().as_secs_f32() * 1000.0; // ms
653                                                       // Score decreases with RTT
654        1.0 / (1.0 + rtt / 50.0)
655    }
656
657    /// Calculate loss score (0-1, higher is better).
658    fn loss_score(uplink: &Uplink) -> f32 {
659        let loss = uplink.loss_ratio() as f32;
660        1.0 - loss.min(1.0)
661    }
662
663    /// Calculate bandwidth score (0-1, higher is better).
664    fn bandwidth_score(uplink: &Uplink, all: &[&Arc<Uplink>]) -> f32 {
665        let bw = uplink.bandwidth().bytes_per_sec;
666        let max_bw: f64 = all
667            .iter()
668            .map(|u| u.bandwidth().bytes_per_sec)
669            .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
670            .unwrap_or(1.0);
671
672        if max_bw == 0.0 {
673            0.5
674        } else {
675            (bw / max_bw) as f32
676        }
677    }
678
679    /// Calculate NAT score (0-1, higher is better = not NATted).
680    /// Non-NATted uplinks score 1.0, NATted uplinks score lower based on NAT type.
681    fn nat_score(uplink: &Uplink) -> f32 {
682        if !uplink.is_natted() {
683            return 1.0;
684        }
685
686        // Score based on NAT type (some NAT types are more problematic than others)
687        match uplink.nat_type() {
688            super::nat::NatType::None => 1.0,
689            super::nat::NatType::Unknown => 0.7, // Unknown might not be NAT
690            super::nat::NatType::FullCone => 0.8, // Most permissive NAT
691            super::nat::NatType::RestrictedCone => 0.6,
692            super::nat::NatType::PortRestrictedCone => 0.4,
693            super::nat::NatType::Symmetric => 0.2, // Most restrictive NAT
694        }
695    }
696
697    /// Check if an uplink needs probing.
698    pub fn needs_probe(&self, uplink: &Uplink) -> bool {
699        if !self.config.probe_backup_paths {
700            return false;
701        }
702
703        let probes = self.last_probe.read();
704        match probes.get(&uplink.numeric_id()) {
705            Some(last) => last.elapsed() >= self.config.probe_interval,
706            None => true,
707        }
708    }
709
710    /// Record that an uplink was probed.
711    pub fn record_probe(&self, uplink_id: u16) {
712        self.last_probe.write().insert(uplink_id, Instant::now());
713    }
714
715    /// Cleanup stale state.
716    pub fn cleanup(&self) {
717        self.stickiness.write().cleanup(self.config.sticky_timeout);
718
719        // Cleanup old probe records
720        let timeout = self.config.probe_interval * 10;
721        self.last_probe
722            .write()
723            .retain(|_, last| last.elapsed() < timeout);
724
725        // Cleanup throughput cache
726        self.throughput_cache
727            .write()
728            .retain(|_, (cached_at, _)| cached_at.elapsed() < self.cache_ttl * 10);
729    }
730
731    /// Get uplinks that should be probed.
732    pub fn uplinks_to_probe(&self, uplinks: &[Arc<Uplink>]) -> Vec<u16> {
733        uplinks
734            .iter()
735            .filter(|u| u.is_usable() && self.needs_probe(u))
736            .map(|u| u.numeric_id())
737            .collect()
738    }
739}
740
741// Intentionally abbreviated Debug output - internal state not useful for debugging
742#[allow(clippy::missing_fields_in_debug)]
743impl std::fmt::Debug for Scheduler {
744    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
745        f.debug_struct("Scheduler")
746            .field("strategy", &self.config.strategy)
747            .finish()
748    }
749}
750
751#[cfg(test)]
752mod tests {
753    use super::*;
754
755    #[test]
756    fn test_scheduler_creation() {
757        let scheduler = Scheduler::new(SchedulerConfig::default());
758        assert_eq!(scheduler.config.strategy, SchedulingStrategy::Adaptive);
759    }
760
761    #[test]
762    fn test_empty_uplinks() {
763        let scheduler = Scheduler::new(SchedulerConfig::default());
764        let result = scheduler.select(&[], None);
765        assert!(result.is_empty());
766    }
767}