flow_guard/strategy/
vegas.rs1use crate::LimitStrategy;
8use parking_lot::RwLock;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::time::Duration;
11
12pub struct VegasStrategy {
13 current_limit: AtomicUsize,
14 base_rtt: RwLock<Duration>,
15 alpha: f64,
16 beta: f64,
17 min_limit: usize,
18 max_limit: usize,
19}
20
21impl VegasStrategy {
22 pub fn new(initial_limit: usize) -> Self {
23 Self {
24 current_limit: AtomicUsize::new(initial_limit),
25 base_rtt: RwLock::new(Duration::from_millis(1000)),
26 alpha: 2.0,
27 beta: 4.0,
28 min_limit: 1,
29 max_limit: initial_limit * 10,
30 }
31 }
32
33 pub fn with_alpha(mut self, alpha: f64) -> Self {
34 self.alpha = alpha;
35 self
36 }
37
38 pub fn with_beta(mut self, beta: f64) -> Self {
39 self.beta = beta;
40 self
41 }
42}
43
44impl LimitStrategy for VegasStrategy {
45 fn current_limit(&self) -> usize {
46 self.current_limit.load(Ordering::Relaxed)
47 }
48
49 fn on_success(&self, latency: Duration) {
50 {
51 let current_base = self.base_rtt.read();
52 if latency < *current_base {
53 drop(current_base);
54 let mut write_base = self.base_rtt.write();
55 if latency < *write_base {
56 *write_base = latency;
57 }
58 }
59 }
60
61 let base_rtt = *self.base_rtt.read();
62 let limit = self.current_limit.load(Ordering::Relaxed);
63
64 if base_rtt.as_nanos() == 0 || latency.as_nanos() == 0 {
65 return; }
67
68 let expected_throughput = limit as f64 / base_rtt.as_secs_f64();
69 let actual_throughput = limit as f64 / latency.as_secs_f64();
70 let diff = (expected_throughput - actual_throughput) * base_rtt.as_secs_f64();
71
72 if diff > self.beta {
73 if limit > self.min_limit {
74 self.current_limit.fetch_sub(1, Ordering::Relaxed);
75 }
76 } else if diff < self.alpha && limit < self.max_limit {
77 self.current_limit.fetch_add(1, Ordering::Relaxed);
78 }
79 }
80
81 fn on_error(&self) {
82 let limit = self.current_limit.load(Ordering::Relaxed);
83 if limit > self.min_limit {
84 let new_limit = (limit * 3 / 4).max(self.min_limit);
85 self.current_limit.store(new_limit, Ordering::Relaxed);
86 }
87 }
88}