flow_guard/strategy/
vegas.rs

1/* * Created by: Cleiton Augusto Correa Bezerra
2 * Project: FlowGuard - Adaptive Backpressure for Rust
3 * Algorithm: Optimized TCP Vegas for Concurrency Control
4 */
5
6use crate::LimitStrategy;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::time::Duration;
9use parking_lot::RwLock;
10use tracing; // Para o usuário ver a mágica acontecer
11
12pub struct VegasStrategy {
13    current_limit: AtomicUsize,
14    base_rtt: RwLock<Duration>,
15    alpha: usize,
16    beta: usize,
17    min_limit: usize,
18    max_limit: usize,
19}
20
21impl VegasStrategy {
22    pub fn new(initial_limit: usize, min_limit: usize, max_limit: usize) -> Self {
23        Self {
24            current_limit: AtomicUsize::new(initial_limit),
25            base_rtt: RwLock::new(Duration::from_secs(3600)),
26            alpha: 3,
27            beta: 6,
28            min_limit,
29            max_limit,
30        }
31    }
32}
33
34impl LimitStrategy for VegasStrategy {
35    fn current_limit(&self) -> usize {
36        self.current_limit.load(Ordering::Relaxed)
37    }
38
39    fn on_success(&self, latency: Duration) {
40        // 1. OTIMIZAÇÃO: Só atualiza o base_rtt se necessário (Read-first pattern)
41        {
42            let current_base = self.base_rtt.read();
43            if latency < *current_base {
44                drop(current_base); // Libera o lock de leitura antes de pedir o de escrita
45                let mut write_base = self.base_rtt.write();
46                if latency < *write_base {
47                    *write_base = latency;
48                    tracing::debug!(target: "flow_guard", "New Base RTT discovered: {:?}", latency);
49                }
50            }
51        }
52
53        let base_rtt = *self.base_rtt.read();
54        let limit = self.current_limit.load(Ordering::Relaxed);
55
56        // 2. CÁLCULO: diff = L * (1 - RTT_base / RTT_actual)
57        let expected = (limit as f64 * base_rtt.as_secs_f64()) / latency.as_secs_f64();
58        let diff = limit as f64 - expected;
59
60        if diff < self.alpha as f64 {
61            if limit < self.max_limit {
62                let new_limit = self.current_limit.fetch_add(1, Ordering::Relaxed) + 1;
63                tracing::trace!(target: "flow_guard", "Limit increased: {}", new_limit);
64            }
65        } else if diff > self.beta as f64 {
66            if limit > self.min_limit {
67                let new_limit = self.current_limit.fetch_sub(1, Ordering::Relaxed) - 1;
68                tracing::warn!(target: "flow_guard", "Congestion detected! Limit decreased: {}", new_limit);
69            }
70        }
71    }
72
73    fn on_error(&self) {
74        let limit = self.current_limit.load(Ordering::Relaxed);
75        if limit > self.min_limit {
76            let new_limit = (limit / 2).max(self.min_limit);
77            self.current_limit.store(new_limit, Ordering::Relaxed);
78            tracing::error!(target: "flow_guard", "Application error! Backing off to limit: {}", new_limit);
79        }
80    }
81}