Skip to main content

tur_rs/engine/
scaler.rs

1use super::*;
2
3#[derive(Debug, Clone, Copy)]
4pub struct ScalerConfig {
5    pub min_connections: usize,
6    pub max_connections: usize,
7    pub heartbeat_ms: u64,
8}
9
10impl Default for ScalerConfig {
11    fn default() -> Self {
12        Self {
13            min_connections: 1,
14            max_connections: 16,
15            heartbeat_ms: 2000,
16        }
17    }
18}
19
20pub struct TokenBucket {
21    pub quota_bytes_per_sec: Cell<u64>,
22    pub tokens: Cell<i64>,
23    pub refill_interval_ms: Cell<u64>,
24}
25
26impl TokenBucket {
27    pub fn new() -> Self {
28        Self {
29            quota_bytes_per_sec: Cell::new(0),
30            tokens: Cell::new(0),
31            refill_interval_ms: Cell::new(100),
32        }
33    }
34
35    pub fn refill(&self, interval_ms: u64) {
36        let quota = self.quota_bytes_per_sec.get();
37        if quota == 0 {
38            self.tokens.set(i64::MAX / 2);
39            return;
40        }
41        let add = (quota * interval_ms / 1000) as i64;
42        let cap = (quota * 2) as i64;
43        self.tokens.set((self.tokens.get() + add).min(cap));
44    }
45
46    pub fn consume(&self, bytes: usize) -> bool {
47        let quota = self.quota_bytes_per_sec.get();
48        if quota == 0 {
49            return true;
50        }
51        let remaining = self.tokens.get() - bytes as i64;
52        self.tokens.set(remaining);
53        remaining >= 0
54    }
55}
56
57#[derive(Clone, Copy, PartialEq, Debug)]
58pub enum ScalerAction {
59    Grow,
60    Shrink,
61    Hold,
62}
63
64#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)]
65pub enum ProtocolFamily {
66    Http1,
67    Http2,
68    Http3,
69    Other,
70}
71
72impl Default for ProtocolFamily {
73    fn default() -> Self {
74        Self::Other
75    }
76}
77
78impl ProtocolFamily {
79    pub(super) fn as_str(self) -> &'static str {
80        match self {
81            Self::Http1 => "http1",
82            Self::Http2 => "http2",
83            Self::Http3 => "http3",
84            Self::Other => "other",
85        }
86    }
87}
88
89pub(crate) struct Scaler {
90    pub ewma_throughput: Cell<f64>,
91    pub peak_efficiency: Cell<f64>,
92    pub throughput_before_add: Cell<f64>,
93    pub n_active: Cell<usize>,
94    pub last_action: Cell<ScalerAction>,
95    pub slow_start_remaining: Cell<u32>,
96    pub config: Rc<RefCell<ScalerConfig>>,
97    pub sample_ring: RefCell<[f64; 10]>,
98    pub sample_head: Cell<usize>,
99    pub sample_count: Cell<usize>,
100    pub alpha: Cell<f64>,
101    pub cv: Cell<f64>,
102    pub ewma_rtt_ms: Cell<f64>,
103    pub ewma_handshake_ms: Cell<f64>,
104    pub reused_count: Cell<u64>,
105    pub total_request_count: Cell<u64>,
106    pub reuse_rate: Cell<f64>,
107    pub last_reuse_reset: Cell<Instant>,
108    pub effective_add_threshold: Cell<f64>,
109    pub reused_rtt_samples: Cell<u64>,
110    pub skip_growth_sample: Cell<bool>,
111    pub reuse_health_low: Cell<bool>,
112    pub last_add_was_stream: Cell<bool>,
113    pub h2_stream_count: Cell<usize>,
114    pub h2_stream_saturated: Cell<bool>,
115    pub(super) last_protocol: Cell<ProtocolFamily>,
116}
117
118impl Scaler {
119    pub fn from_config_handle(config: Rc<RefCell<ScalerConfig>>) -> Rc<Self> {
120        Rc::new(Self {
121            ewma_throughput: Cell::new(0.0),
122            peak_efficiency: Cell::new(0.0),
123            throughput_before_add: Cell::new(0.0),
124            n_active: Cell::new(1),
125            last_action: Cell::new(ScalerAction::Hold),
126            slow_start_remaining: Cell::new(3),
127            config,
128            sample_ring: RefCell::new([0.0; 10]),
129            sample_head: Cell::new(0),
130            sample_count: Cell::new(0),
131            alpha: Cell::new(0.3),
132            cv: Cell::new(0.0),
133            ewma_rtt_ms: Cell::new(200.0),
134            ewma_handshake_ms: Cell::new(50.0),
135            reused_count: Cell::new(0),
136            total_request_count: Cell::new(0),
137            reuse_rate: Cell::new(1.0),
138            last_reuse_reset: Cell::new(Instant::now()),
139            effective_add_threshold: Cell::new(0.05),
140            reused_rtt_samples: Cell::new(0),
141            skip_growth_sample: Cell::new(false),
142            reuse_health_low: Cell::new(false),
143            last_add_was_stream: Cell::new(false),
144            h2_stream_count: Cell::new(0),
145            h2_stream_saturated: Cell::new(false),
146            last_protocol: Cell::new(ProtocolFamily::Other),
147        })
148    }
149}