reddb_server/server/
http_connection_limiter.rs1use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::Arc;
16
17#[derive(Debug)]
18struct Inner {
19 cap: usize,
20 in_use: AtomicUsize,
21}
22
23#[derive(Debug)]
27pub struct HttpConnectionPermit {
28 inner: Arc<Inner>,
29}
30
31impl Drop for HttpConnectionPermit {
32 fn drop(&mut self) {
33 self.inner.in_use.fetch_sub(1, Ordering::Release);
39 }
40}
41
42#[derive(Debug, Clone)]
43pub struct HttpConnectionLimiter {
44 inner: Arc<Inner>,
45}
46
47impl HttpConnectionLimiter {
48 pub fn new(cap: usize) -> Self {
49 assert!(cap > 0, "HttpConnectionLimiter cap must be positive");
50 Self {
51 inner: Arc::new(Inner {
52 cap,
53 in_use: AtomicUsize::new(0),
54 }),
55 }
56 }
57
58 pub fn with_default_cap() -> Self {
60 let cores = std::thread::available_parallelism()
61 .map(|n| n.get())
62 .unwrap_or(1);
63 let cap = (2 * cores).clamp(8, 256);
64 Self::new(cap)
65 }
66
67 pub fn cap(&self) -> usize {
68 self.inner.cap
69 }
70
71 pub fn current(&self) -> usize {
72 self.inner.in_use.load(Ordering::Relaxed)
73 }
74
75 pub fn try_acquire(&self) -> Option<HttpConnectionPermit> {
78 let mut observed = self.inner.in_use.load(Ordering::Relaxed);
79 loop {
80 if observed >= self.inner.cap {
81 return None;
82 }
83 match self.inner.in_use.compare_exchange_weak(
84 observed,
85 observed + 1,
86 Ordering::Acquire,
87 Ordering::Relaxed,
88 ) {
89 Ok(_) => {
90 return Some(HttpConnectionPermit {
91 inner: Arc::clone(&self.inner),
92 });
93 }
94 Err(actual) => observed = actual,
95 }
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use std::sync::atomic::AtomicUsize;
104 use std::sync::Arc;
105 use std::thread;
106
107 #[test]
108 fn cap_and_current_track_observed_state() {
109 let limiter = HttpConnectionLimiter::new(3);
110 assert_eq!(limiter.cap(), 3);
111 assert_eq!(limiter.current(), 0);
112
113 let p1 = limiter.try_acquire().expect("slot 1");
114 assert_eq!(limiter.current(), 1);
115 let p2 = limiter.try_acquire().expect("slot 2");
116 assert_eq!(limiter.current(), 2);
117 let p3 = limiter.try_acquire().expect("slot 3");
118 assert_eq!(limiter.current(), 3);
119
120 assert!(limiter.try_acquire().is_none());
121 assert_eq!(limiter.current(), 3);
122
123 drop(p2);
124 assert_eq!(limiter.current(), 2);
125 let p4 = limiter.try_acquire().expect("slot reused");
126 assert_eq!(limiter.current(), 3);
127 drop((p1, p3, p4));
128 assert_eq!(limiter.current(), 0);
129 }
130
131 #[test]
132 fn permit_drop_restores_capacity() {
133 let limiter = HttpConnectionLimiter::new(1);
134 {
135 let _p = limiter.try_acquire().expect("acquired");
136 assert!(limiter.try_acquire().is_none());
137 }
138 assert_eq!(limiter.current(), 0);
139 let _p = limiter.try_acquire().expect("reacquired after drop");
140 assert_eq!(limiter.current(), 1);
141 }
142
143 #[test]
144 fn cap_enforced_under_thread_storm_no_over_issue() {
145 let cap = 8;
149 let limiter = HttpConnectionLimiter::new(cap);
150 let success = Arc::new(AtomicUsize::new(0));
151 let denied = Arc::new(AtomicUsize::new(0));
152 let max_seen = Arc::new(AtomicUsize::new(0));
153 let permits: Arc<std::sync::Mutex<Vec<HttpConnectionPermit>>> =
154 Arc::new(std::sync::Mutex::new(Vec::new()));
155
156 let mut handles = Vec::new();
157 for _ in 0..64 {
158 let l = limiter.clone();
159 let s = Arc::clone(&success);
160 let d = Arc::clone(&denied);
161 let m = Arc::clone(&max_seen);
162 let permits = Arc::clone(&permits);
163 handles.push(thread::spawn(move || match l.try_acquire() {
164 Some(p) => {
165 s.fetch_add(1, Ordering::Relaxed);
166 let now = l.current();
167 m.fetch_max(now, Ordering::Relaxed);
168 permits.lock().unwrap().push(p);
169 }
170 None => {
171 d.fetch_add(1, Ordering::Relaxed);
172 }
173 }));
174 }
175 for h in handles {
176 h.join().unwrap();
177 }
178
179 assert_eq!(success.load(Ordering::Relaxed), cap);
180 assert_eq!(denied.load(Ordering::Relaxed), 64 - cap);
181 assert!(max_seen.load(Ordering::Relaxed) <= cap);
182 assert_eq!(limiter.current(), cap);
183
184 permits.lock().unwrap().clear();
185 assert_eq!(limiter.current(), 0);
186 }
187
188 #[test]
189 fn clone_shares_state() {
190 let a = HttpConnectionLimiter::new(2);
191 let b = a.clone();
192 let _p = a.try_acquire().unwrap();
193 assert_eq!(b.current(), 1);
194 assert_eq!(b.cap(), 2);
195 }
196
197 #[test]
198 fn default_cap_in_bounds() {
199 let limiter = HttpConnectionLimiter::with_default_cap();
200 assert!(limiter.cap() >= 8);
201 assert!(limiter.cap() <= 256);
202 }
203}