phantom_protocol/transport/
fallback.rs1use crate::transport::scheduler::Scheduler;
7use parking_lot::RwLock;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Instant;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum TransportMode {
15 Turbo,
17 Reliable,
19 Stealth,
21}
22
23#[derive(Debug, Clone)]
25pub struct FallbackTrigger {
26 pub max_rtt: u32,
27 pub max_loss: u8,
28 pub failure_threshold: u32,
29}
30
31impl Default for FallbackTrigger {
32 fn default() -> Self {
33 Self {
34 max_rtt: 500,
35 max_loss: 10,
36 failure_threshold: 3,
37 }
38 }
39}
40
41#[derive(Debug, Default)]
42pub struct FallbackMetrics {
43 pub packets_sent: AtomicU64,
44 pub packets_acked: AtomicU64,
45 pub connection_failures: AtomicU32,
46 pub last_success_ms: AtomicU64,
47}
48
49impl FallbackMetrics {
50 pub fn record_sent(&self) {
51 self.packets_sent.fetch_add(1, Ordering::Relaxed);
52 }
53
54 pub fn record_success(&self) {
55 self.packets_acked.fetch_add(1, Ordering::Relaxed);
56 self.connection_failures.store(0, Ordering::Relaxed);
57 let now = std::time::SystemTime::now()
58 .duration_since(std::time::UNIX_EPOCH)
59 .map(|d| d.as_millis() as u64)
60 .unwrap_or_default();
61 self.last_success_ms.store(now, Ordering::Relaxed);
62 }
63
64 pub fn record_failure(&self) {
65 self.connection_failures.fetch_add(1, Ordering::Relaxed);
66 }
67}
68
69pub struct FallbackStateMachine {
71 current_mode: RwLock<TransportMode>,
73 trigger: FallbackTrigger,
75 metrics: FallbackMetrics,
77 last_change: RwLock<Instant>,
79 last_probe: RwLock<Instant>,
81 best_mode: TransportMode,
83 #[allow(dead_code)]
85 scheduler: Option<Arc<Scheduler>>,
86}
87
88impl FallbackStateMachine {
89 pub fn with_defaults() -> Self {
90 Self::new(FallbackTrigger::default())
91 }
92
93 pub fn new(trigger: FallbackTrigger) -> Self {
94 Self {
95 current_mode: RwLock::new(TransportMode::Turbo),
96 best_mode: TransportMode::Turbo,
97 trigger,
98 metrics: FallbackMetrics::default(),
99 last_change: RwLock::new(Instant::now()),
100 last_probe: RwLock::new(Instant::now()),
101 scheduler: None,
102 }
103 }
104
105 pub fn metrics(&self) -> &FallbackMetrics {
106 &self.metrics
107 }
108
109 pub fn current_mode(&self) -> TransportMode {
110 *self.current_mode.read()
111 }
112
113 pub fn check_and_fallback(&self) -> bool {
114 let failures = self.metrics.connection_failures.load(Ordering::Relaxed);
115 if failures >= self.trigger.failure_threshold {
116 self.degrade();
117 return true;
118 }
119 false
120 }
121
122 pub fn record_failure(&self) {
123 self.metrics.record_failure();
124 let _ = self.check_and_fallback();
125 }
126
127 fn degrade(&self) {
128 let mut mode = self.current_mode.write();
129 let new_mode = match *mode {
130 TransportMode::Turbo => TransportMode::Reliable,
131 TransportMode::Reliable => TransportMode::Stealth,
132 TransportMode::Stealth => TransportMode::Stealth,
133 };
134
135 if new_mode != *mode {
136 log::warn!("Transport degradation: {:?} -> {:?}", *mode, new_mode);
137 *mode = new_mode;
138 *self.last_change.write() = Instant::now();
139 }
140 }
141
142 pub fn upgrade(&self) {
143 let mut mode = self.current_mode.write();
144 if *mode != self.best_mode {
145 log::info!("Transport healing: {:?} -> {:?}", *mode, self.best_mode);
146 *mode = self.best_mode;
147 *self.last_change.write() = Instant::now();
148 self.metrics.connection_failures.store(0, Ordering::Relaxed);
150 }
151 }
152
153 pub fn should_probe(&self) -> bool {
154 let mode = self.current_mode.read();
155 if *mode == self.best_mode {
156 return false;
157 }
158
159 let last_probe = self.last_probe.read();
160 let last_change = self.last_change.read();
161
162 last_probe.elapsed() > std::time::Duration::from_secs(30)
164 && last_change.elapsed() > std::time::Duration::from_secs(30)
165 }
166
167 pub fn record_probe(&self) {
168 *self.last_probe.write() = Instant::now();
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn test_fallback_cycle() {
178 let fsm = FallbackStateMachine::with_defaults();
179 assert_eq!(fsm.current_mode(), TransportMode::Turbo);
180
181 fsm.degrade();
183 assert_eq!(fsm.current_mode(), TransportMode::Reliable);
184
185 fsm.degrade();
187 assert_eq!(fsm.current_mode(), TransportMode::Stealth);
188
189 fsm.upgrade();
191 assert_eq!(fsm.current_mode(), TransportMode::Turbo);
192 }
193
194 #[test]
195 fn test_should_probe() {
196 let fsm = FallbackStateMachine::with_defaults();
197
198 assert!(!fsm.should_probe());
200
201 fsm.degrade();
202 assert_eq!(fsm.current_mode(), TransportMode::Reliable);
203
204 assert!(!fsm.should_probe());
206
207 fsm.record_probe();
210 assert!(!fsm.should_probe());
211 }
212}