1use super::*;
2
3#[derive(Debug, Clone, Copy)]
4pub struct ScalerConfig {
5 pub min_connections: usize,
6 pub max_connections: usize,
7 pub heartbeat_ms: u64,
8}
9
10impl Default for ScalerConfig {
11 fn default() -> Self {
12 Self {
13 min_connections: 1,
14 max_connections: 16,
15 heartbeat_ms: 2000,
16 }
17 }
18}
19
20pub struct TokenBucket {
21 pub quota_bytes_per_sec: Cell<u64>,
22 pub tokens: Cell<i64>,
23 pub refill_interval_ms: Cell<u64>,
24}
25
26impl TokenBucket {
27 pub fn new() -> Self {
28 Self {
29 quota_bytes_per_sec: Cell::new(0),
30 tokens: Cell::new(0),
31 refill_interval_ms: Cell::new(100),
32 }
33 }
34
35 pub fn refill(&self, interval_ms: u64) {
36 let quota = self.quota_bytes_per_sec.get();
37 if quota == 0 {
38 self.tokens.set(i64::MAX / 2);
39 return;
40 }
41 let add = (quota * interval_ms / 1000) as i64;
42 let cap = (quota * 2) as i64;
43 self.tokens.set((self.tokens.get() + add).min(cap));
44 }
45
46 pub fn consume(&self, bytes: usize) -> bool {
47 let quota = self.quota_bytes_per_sec.get();
48 if quota == 0 {
49 return true;
50 }
51 let remaining = self.tokens.get() - bytes as i64;
52 self.tokens.set(remaining);
53 remaining >= 0
54 }
55}
56
57#[derive(Clone, Copy, PartialEq, Debug)]
58pub enum ScalerAction {
59 Grow,
60 Shrink,
61 Hold,
62}
63
64#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)]
65pub enum ProtocolFamily {
66 Http1,
67 Http2,
68 Http3,
69 Other,
70}
71
72impl Default for ProtocolFamily {
73 fn default() -> Self {
74 Self::Other
75 }
76}
77
78impl ProtocolFamily {
79 pub(super) fn as_str(self) -> &'static str {
80 match self {
81 Self::Http1 => "http1",
82 Self::Http2 => "http2",
83 Self::Http3 => "http3",
84 Self::Other => "other",
85 }
86 }
87}
88
89pub(crate) struct Scaler {
90 pub ewma_throughput: Cell<f64>,
91 pub peak_efficiency: Cell<f64>,
92 pub throughput_before_add: Cell<f64>,
93 pub n_active: Cell<usize>,
94 pub last_action: Cell<ScalerAction>,
95 pub slow_start_remaining: Cell<u32>,
96 pub config: Rc<RefCell<ScalerConfig>>,
97 pub sample_ring: RefCell<[f64; 10]>,
98 pub sample_head: Cell<usize>,
99 pub sample_count: Cell<usize>,
100 pub alpha: Cell<f64>,
101 pub cv: Cell<f64>,
102 pub ewma_rtt_ms: Cell<f64>,
103 pub ewma_handshake_ms: Cell<f64>,
104 pub reused_count: Cell<u64>,
105 pub total_request_count: Cell<u64>,
106 pub reuse_rate: Cell<f64>,
107 pub last_reuse_reset: Cell<Instant>,
108 pub effective_add_threshold: Cell<f64>,
109 pub reused_rtt_samples: Cell<u64>,
110 pub skip_growth_sample: Cell<bool>,
111 pub reuse_health_low: Cell<bool>,
112 pub last_add_was_stream: Cell<bool>,
113 pub h2_stream_count: Cell<usize>,
114 pub h2_stream_saturated: Cell<bool>,
115 pub(super) last_protocol: Cell<ProtocolFamily>,
116}
117
118impl Scaler {
119 pub fn from_config_handle(config: Rc<RefCell<ScalerConfig>>) -> Rc<Self> {
120 Rc::new(Self {
121 ewma_throughput: Cell::new(0.0),
122 peak_efficiency: Cell::new(0.0),
123 throughput_before_add: Cell::new(0.0),
124 n_active: Cell::new(1),
125 last_action: Cell::new(ScalerAction::Hold),
126 slow_start_remaining: Cell::new(3),
127 config,
128 sample_ring: RefCell::new([0.0; 10]),
129 sample_head: Cell::new(0),
130 sample_count: Cell::new(0),
131 alpha: Cell::new(0.3),
132 cv: Cell::new(0.0),
133 ewma_rtt_ms: Cell::new(200.0),
134 ewma_handshake_ms: Cell::new(50.0),
135 reused_count: Cell::new(0),
136 total_request_count: Cell::new(0),
137 reuse_rate: Cell::new(1.0),
138 last_reuse_reset: Cell::new(Instant::now()),
139 effective_add_threshold: Cell::new(0.05),
140 reused_rtt_samples: Cell::new(0),
141 skip_growth_sample: Cell::new(false),
142 reuse_health_low: Cell::new(false),
143 last_add_was_stream: Cell::new(false),
144 h2_stream_count: Cell::new(0),
145 h2_stream_saturated: Cell::new(false),
146 last_protocol: Cell::new(ProtocolFamily::Other),
147 })
148 }
149}