flow_guard/strategy/
vegas.rs

1/*
2 * Created by: Cleiton Augusto Correa Bezerra
3 * Project: FlowGuard - Adaptive Backpressure for Rust
4 * Algorithm: Optimized TCP Vegas for Concurrency Control
5 */
6
7use crate::LimitStrategy;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::Duration;
10use parking_lot::RwLock;
11
12pub struct VegasStrategy {
13    current_limit: AtomicUsize,
14    base_rtt: RwLock<Duration>,
15    alpha: f64,
16    beta: f64,
17    min_limit: usize,
18    max_limit: usize,
19}
20
21impl VegasStrategy {
22    pub fn new(initial_limit: usize) -> Self {
23        Self {
24            current_limit: AtomicUsize::new(initial_limit),
25            base_rtt: RwLock::new(Duration::from_millis(1000)),
26            alpha: 2.0,
27            beta: 4.0,
28            min_limit: 1,
29            max_limit: initial_limit * 10,
30        }
31    }
32
33    pub fn with_alpha(mut self, alpha: f64) -> Self {
34        self.alpha = alpha;
35        self
36    }
37
38    pub fn with_beta(mut self, beta: f64) -> Self {
39        self.beta = beta;
40        self
41    }
42}
43
44impl LimitStrategy for VegasStrategy {
45    fn current_limit(&self) -> usize {
46        self.current_limit.load(Ordering::Relaxed)
47    }
48
49    fn on_success(&self, latency: Duration) {
50
51        {
52            let current_base = self.base_rtt.read();
53            if latency < *current_base {
54                drop(current_base);
55                let mut write_base = self.base_rtt.write();
56                if latency < *write_base {
57                    *write_base = latency;
58                }
59            }
60        }
61
62        let base_rtt = *self.base_rtt.read();
63        let limit = self.current_limit.load(Ordering::Relaxed);
64
65
66        if base_rtt.as_nanos() == 0 || latency.as_nanos() == 0 {
67            return; // Evita divisão por zero
68        }
69
70        let expected_throughput = limit as f64 / base_rtt.as_secs_f64();
71        let actual_throughput = limit as f64 / latency.as_secs_f64();
72        let diff = (expected_throughput - actual_throughput) * base_rtt.as_secs_f64();
73
74
75        if diff > self.beta {
76
77            if limit > self.min_limit {
78                self.current_limit.fetch_sub(1, Ordering::Relaxed);
79            }
80        } else if diff < self.alpha && limit < self.max_limit {
81
82            self.current_limit.fetch_add(1, Ordering::Relaxed);
83        }
84    }
85
86    fn on_error(&self) {
87        let limit = self.current_limit.load(Ordering::Relaxed);
88        if limit > self.min_limit {
89            let new_limit = (limit * 3 / 4).max(self.min_limit);
90            self.current_limit.store(new_limit, Ordering::Relaxed);
91        }
92    }
93}