flow_guard/strategy/
vegas.rs1use crate::LimitStrategy;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::Duration;
9use parking_lot::RwLock;
10use tracing; pub struct VegasStrategy {
13 current_limit: AtomicUsize,
14 base_rtt: RwLock<Duration>,
15 alpha: usize,
16 beta: usize,
17 min_limit: usize,
18 max_limit: usize,
19}
20
21impl VegasStrategy {
22 pub fn new(initial_limit: usize, min_limit: usize, max_limit: usize) -> Self {
23 Self {
24 current_limit: AtomicUsize::new(initial_limit),
25 base_rtt: RwLock::new(Duration::from_secs(3600)),
26 alpha: 3,
27 beta: 6,
28 min_limit,
29 max_limit,
30 }
31 }
32}
33
34impl LimitStrategy for VegasStrategy {
35 fn current_limit(&self) -> usize {
36 self.current_limit.load(Ordering::Relaxed)
37 }
38
39 fn on_success(&self, latency: Duration) {
40 {
42 let current_base = self.base_rtt.read();
43 if latency < *current_base {
44 drop(current_base); let mut write_base = self.base_rtt.write();
46 if latency < *write_base {
47 *write_base = latency;
48 tracing::debug!(target: "flow_guard", "New Base RTT discovered: {:?}", latency);
49 }
50 }
51 }
52
53 let base_rtt = *self.base_rtt.read();
54 let limit = self.current_limit.load(Ordering::Relaxed);
55
56 let expected = (limit as f64 * base_rtt.as_secs_f64()) / latency.as_secs_f64();
58 let diff = limit as f64 - expected;
59
60 if diff < self.alpha as f64 {
61 if limit < self.max_limit {
62 let new_limit = self.current_limit.fetch_add(1, Ordering::Relaxed) + 1;
63 tracing::trace!(target: "flow_guard", "Limit increased: {}", new_limit);
64 }
65 } else if diff > self.beta as f64 {
66 if limit > self.min_limit {
67 let new_limit = self.current_limit.fetch_sub(1, Ordering::Relaxed) - 1;
68 tracing::warn!(target: "flow_guard", "Congestion detected! Limit decreased: {}", new_limit);
69 }
70 }
71 }
72
73 fn on_error(&self) {
74 let limit = self.current_limit.load(Ordering::Relaxed);
75 if limit > self.min_limit {
76 let new_limit = (limit / 2).max(self.min_limit);
77 self.current_limit.store(new_limit, Ordering::Relaxed);
78 tracing::error!(target: "flow_guard", "Application error! Backing off to limit: {}", new_limit);
79 }
80 }
81}