phantom_protocol/transport/
scheduler.rs1use crate::transport::types::{LegType, SchedulerMode};
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9
10pub struct PathInfo {
11 pub leg_type: LegType,
12 pub rtt_ms: u32,
13 pub loss_percent: u8,
14 pub active: bool,
15 pub bytes_sent: u64,
16}
17
18impl PathInfo {
19 pub fn new(leg_type: LegType) -> Self {
20 Self {
21 leg_type,
22 rtt_ms: 150, loss_percent: 0,
24 active: true,
25 bytes_sent: 0,
26 }
27 }
28}
29
30pub struct Scheduler {
31 mode: RwLock<SchedulerMode>,
32 paths: RwLock<HashMap<LegType, PathInfo>>,
33 #[allow(dead_code)]
34 rr_counter: AtomicU32,
35 total_bytes: AtomicU64,
36}
37
38impl Scheduler {
39 pub fn new(mode: SchedulerMode) -> Self {
40 Self {
41 mode: RwLock::new(mode),
42 paths: RwLock::new(HashMap::new()),
43 rr_counter: AtomicU32::new(0),
44 total_bytes: AtomicU64::new(0),
45 }
46 }
47
48 pub fn register_path(&self, leg_type: LegType) {
49 let mut paths = self.paths.write();
50 paths
51 .entry(leg_type)
52 .or_insert_with(|| PathInfo::new(leg_type));
53 }
54
55 pub fn set_path_available(&self, leg_type: LegType, available: bool) {
56 let mut paths = self.paths.write();
57 if let Some(path) = paths.get_mut(&leg_type) {
58 path.active = available;
59 }
60 }
61
62 pub fn select_paths(&self, is_priority: bool) -> Vec<LegType> {
63 let paths = self.paths.read();
64 let mode = self.mode.read();
65
66 let mut available: Vec<_> = paths.iter().filter(|(_, p)| p.active).collect();
67
68 if available.is_empty() {
69 return Vec::new();
70 }
71
72 if is_priority {
73 available.sort_by_key(|(_, p)| p.rtt_ms);
75 return vec![*available[0].0];
76 }
77
78 match *mode {
79 SchedulerMode::LowLatency => {
80 available.sort_by_key(|(_, p)| p.rtt_ms);
81 vec![*available[0].0]
82 }
83 SchedulerMode::HighThroughput => {
84 available.iter().map(|(t, _)| **t).collect()
86 }
87 SchedulerMode::Reliability | SchedulerMode::Stealth => {
88 available.sort_by_key(|(_, p)| p.rtt_ms);
90 available.iter().take(2).map(|(t, _)| **t).collect()
91 }
92 }
93 }
94
95 pub fn update_rtt(&self, leg_type: LegType, rtt: u32) {
96 let mut paths = self.paths.write();
97 if let Some(path) = paths.get_mut(&leg_type) {
98 path.rtt_ms = rtt;
99 }
100 }
101
102 pub fn record_sent(&self, leg_type: LegType, bytes: u64) {
103 let mut paths = self.paths.write();
104 if let Some(path) = paths.get_mut(&leg_type) {
105 path.bytes_sent += bytes;
106 }
107 self.total_bytes.fetch_add(bytes, Ordering::Relaxed);
108 }
109}