flow_guard/strategy/
vegas.rs

1/*
2 * Created by: Cleiton Augusto Correa Bezerra
3 * Project: FlowGuard - Adaptive Backpressure for Rust
4 * Algorithm: Optimized TCP Vegas for Concurrency Control
5 */
6
7use crate::LimitStrategy;
8use parking_lot::RwLock;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::time::Duration;
11
12pub struct VegasStrategy {
13    current_limit: AtomicUsize,
14    base_rtt: RwLock<Duration>,
15    alpha: f64,
16    beta: f64,
17    min_limit: usize,
18    max_limit: usize,
19}
20
21impl VegasStrategy {
22    pub fn new(initial_limit: usize) -> Self {
23        Self {
24            current_limit: AtomicUsize::new(initial_limit),
25            base_rtt: RwLock::new(Duration::from_millis(1000)),
26            alpha: 2.0,
27            beta: 4.0,
28            min_limit: 1,
29            max_limit: initial_limit * 10,
30        }
31    }
32
33    pub fn with_alpha(mut self, alpha: f64) -> Self {
34        self.alpha = alpha;
35        self
36    }
37
38    pub fn with_beta(mut self, beta: f64) -> Self {
39        self.beta = beta;
40        self
41    }
42}
43
44impl LimitStrategy for VegasStrategy {
45    fn current_limit(&self) -> usize {
46        self.current_limit.load(Ordering::Relaxed)
47    }
48
49    fn on_success(&self, latency: Duration) {
50        {
51            let current_base = self.base_rtt.read();
52            if latency < *current_base {
53                drop(current_base);
54                let mut write_base = self.base_rtt.write();
55                if latency < *write_base {
56                    *write_base = latency;
57                }
58            }
59        }
60
61        let base_rtt = *self.base_rtt.read();
62        let limit = self.current_limit.load(Ordering::Relaxed);
63
64        if base_rtt.as_nanos() == 0 || latency.as_nanos() == 0 {
65            return; // Evita divisão por zero
66        }
67
68        let expected_throughput = limit as f64 / base_rtt.as_secs_f64();
69        let actual_throughput = limit as f64 / latency.as_secs_f64();
70        let diff = (expected_throughput - actual_throughput) * base_rtt.as_secs_f64();
71
72        if diff > self.beta {
73            if limit > self.min_limit {
74                self.current_limit.fetch_sub(1, Ordering::Relaxed);
75            }
76        } else if diff < self.alpha && limit < self.max_limit {
77            self.current_limit.fetch_add(1, Ordering::Relaxed);
78        }
79    }
80
81    fn on_error(&self) {
82        let limit = self.current_limit.load(Ordering::Relaxed);
83        if limit > self.min_limit {
84            let new_limit = (limit * 3 / 4).max(self.min_limit);
85            self.current_limit.store(new_limit, Ordering::Relaxed);
86        }
87    }
88}