Skip to main content

phantom_protocol/transport/
scheduler.rs

1//! Phantom Transport - Multi-Path Scheduler
2//!
3//! Round-robin and low-latency scheduling across multiple transport legs.
4
5use crate::transport::types::{LegType, SchedulerMode};
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9
10pub struct PathInfo {
11    pub leg_type: LegType,
12    pub rtt_ms: u32,
13    pub loss_percent: u8,
14    pub active: bool,
15    pub bytes_sent: u64,
16}
17
18impl PathInfo {
19    pub fn new(leg_type: LegType) -> Self {
20        Self {
21            leg_type,
22            rtt_ms: 150, // Default estimate
23            loss_percent: 0,
24            active: true,
25            bytes_sent: 0,
26        }
27    }
28}
29
30pub struct Scheduler {
31    mode: RwLock<SchedulerMode>,
32    paths: RwLock<HashMap<LegType, PathInfo>>,
33    #[allow(dead_code)]
34    rr_counter: AtomicU32,
35    total_bytes: AtomicU64,
36}
37
38impl Scheduler {
39    pub fn new(mode: SchedulerMode) -> Self {
40        Self {
41            mode: RwLock::new(mode),
42            paths: RwLock::new(HashMap::new()),
43            rr_counter: AtomicU32::new(0),
44            total_bytes: AtomicU64::new(0),
45        }
46    }
47
48    pub fn register_path(&self, leg_type: LegType) {
49        let mut paths = self.paths.write();
50        paths
51            .entry(leg_type)
52            .or_insert_with(|| PathInfo::new(leg_type));
53    }
54
55    pub fn set_path_available(&self, leg_type: LegType, available: bool) {
56        let mut paths = self.paths.write();
57        if let Some(path) = paths.get_mut(&leg_type) {
58            path.active = available;
59        }
60    }
61
62    pub fn select_paths(&self, is_priority: bool) -> Vec<LegType> {
63        let paths = self.paths.read();
64        let mode = self.mode.read();
65
66        let mut available: Vec<_> = paths.iter().filter(|(_, p)| p.active).collect();
67
68        if available.is_empty() {
69            return Vec::new();
70        }
71
72        if is_priority {
73            // Pick the single best path (lowest RTT)
74            available.sort_by_key(|(_, p)| p.rtt_ms);
75            return vec![*available[0].0];
76        }
77
78        match *mode {
79            SchedulerMode::LowLatency => {
80                available.sort_by_key(|(_, p)| p.rtt_ms);
81                vec![*available[0].0]
82            }
83            SchedulerMode::HighThroughput => {
84                // Return all active paths for multi-path bonding
85                available.iter().map(|(t, _)| **t).collect()
86            }
87            SchedulerMode::Reliability | SchedulerMode::Stealth => {
88                // Duplicate across all paths? No, just pick two best
89                available.sort_by_key(|(_, p)| p.rtt_ms);
90                available.iter().take(2).map(|(t, _)| **t).collect()
91            }
92        }
93    }
94
95    pub fn update_rtt(&self, leg_type: LegType, rtt: u32) {
96        let mut paths = self.paths.write();
97        if let Some(path) = paths.get_mut(&leg_type) {
98            path.rtt_ms = rtt;
99        }
100    }
101
102    pub fn record_sent(&self, leg_type: LegType, bytes: u64) {
103        let mut paths = self.paths.write();
104        if let Some(path) = paths.get_mut(&leg_type) {
105            path.bytes_sent += bytes;
106        }
107        self.total_bytes.fetch_add(bytes, Ordering::Relaxed);
108    }
109}