Skip to main content

reddb_server/server/
http_connection_limiter.rs

1//! Bounded handler-thread admission for the clear-text HTTP accept loop.
2//!
3//! Slice 1 of issue #570 / parent #569. The synchronous HTTP transport
4//! spawns one OS thread per accepted connection. Without an admission
5//! cap the server can degrade into thread-storm and lock starvation
6//! under load. `HttpConnectionLimiter` is a single `AtomicUsize`-backed
7//! semaphore consulted in the accept loop *before* parsing or handler
8//! work. A rejected connection gets a static `503 + Retry-After` written
9//! and the socket closed without ever entering the runtime.
10//!
11//! Hard cap for this slice is `(2 * available_parallelism).clamp(8, 256)`.
12//! Config knobs (env / CLI) land in slice 5 per the parent brief.
13
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::Arc;
16
17#[derive(Debug)]
18struct Inner {
19    cap: usize,
20    in_use: AtomicUsize,
21}
22
23/// Permit handle — owns one slot of the limiter. Dropping the permit
24/// returns the slot. The permit is intentionally `!Clone` so the slot
25/// accounting can't drift.
26#[derive(Debug)]
27pub struct HttpConnectionPermit {
28    inner: Arc<Inner>,
29}
30
31impl Drop for HttpConnectionPermit {
32    fn drop(&mut self) {
33        // Release is correct here: we want any writes the handler made
34        // to be visible to a thread that subsequently re-acquires this
35        // logical slot. Cap is fixed at construction so no need to
36        // gate readers behind Acquire — readers of `current()` use
37        // Relaxed below for observability only.
38        self.inner.in_use.fetch_sub(1, Ordering::Release);
39    }
40}
41
42#[derive(Debug, Clone)]
43pub struct HttpConnectionLimiter {
44    inner: Arc<Inner>,
45}
46
47impl HttpConnectionLimiter {
48    pub fn new(cap: usize) -> Self {
49        assert!(cap > 0, "HttpConnectionLimiter cap must be positive");
50        Self {
51            inner: Arc::new(Inner {
52                cap,
53                in_use: AtomicUsize::new(0),
54            }),
55        }
56    }
57
58    /// Default cap: `(2 * available_parallelism).clamp(8, 256)`.
59    pub fn with_default_cap() -> Self {
60        let cores = std::thread::available_parallelism()
61            .map(|n| n.get())
62            .unwrap_or(1);
63        let cap = (2 * cores).clamp(8, 256);
64        Self::new(cap)
65    }
66
67    pub fn cap(&self) -> usize {
68        self.inner.cap
69    }
70
71    pub fn current(&self) -> usize {
72        self.inner.in_use.load(Ordering::Relaxed)
73    }
74
75    /// Returns `Some(permit)` on success, `None` if the cap is full.
76    /// No blocking, no allocation on the hot path.
77    pub fn try_acquire(&self) -> Option<HttpConnectionPermit> {
78        let mut observed = self.inner.in_use.load(Ordering::Relaxed);
79        loop {
80            if observed >= self.inner.cap {
81                return None;
82            }
83            match self.inner.in_use.compare_exchange_weak(
84                observed,
85                observed + 1,
86                Ordering::Acquire,
87                Ordering::Relaxed,
88            ) {
89                Ok(_) => {
90                    return Some(HttpConnectionPermit {
91                        inner: Arc::clone(&self.inner),
92                    });
93                }
94                Err(actual) => observed = actual,
95            }
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use std::sync::atomic::AtomicUsize;
104    use std::sync::Arc;
105    use std::thread;
106
107    #[test]
108    fn cap_and_current_track_observed_state() {
109        let limiter = HttpConnectionLimiter::new(3);
110        assert_eq!(limiter.cap(), 3);
111        assert_eq!(limiter.current(), 0);
112
113        let p1 = limiter.try_acquire().expect("slot 1");
114        assert_eq!(limiter.current(), 1);
115        let p2 = limiter.try_acquire().expect("slot 2");
116        assert_eq!(limiter.current(), 2);
117        let p3 = limiter.try_acquire().expect("slot 3");
118        assert_eq!(limiter.current(), 3);
119
120        assert!(limiter.try_acquire().is_none());
121        assert_eq!(limiter.current(), 3);
122
123        drop(p2);
124        assert_eq!(limiter.current(), 2);
125        let p4 = limiter.try_acquire().expect("slot reused");
126        assert_eq!(limiter.current(), 3);
127        drop((p1, p3, p4));
128        assert_eq!(limiter.current(), 0);
129    }
130
131    #[test]
132    fn permit_drop_restores_capacity() {
133        let limiter = HttpConnectionLimiter::new(1);
134        {
135            let _p = limiter.try_acquire().expect("acquired");
136            assert!(limiter.try_acquire().is_none());
137        }
138        assert_eq!(limiter.current(), 0);
139        let _p = limiter.try_acquire().expect("reacquired after drop");
140        assert_eq!(limiter.current(), 1);
141    }
142
143    #[test]
144    fn cap_enforced_under_thread_storm_no_over_issue() {
145        // Many threads race try_acquire; verify the high-water-mark
146        // never exceeds the cap, and the total successful acquires
147        // matches the cap when permits are held.
148        let cap = 8;
149        let limiter = HttpConnectionLimiter::new(cap);
150        let success = Arc::new(AtomicUsize::new(0));
151        let denied = Arc::new(AtomicUsize::new(0));
152        let max_seen = Arc::new(AtomicUsize::new(0));
153        let permits: Arc<std::sync::Mutex<Vec<HttpConnectionPermit>>> =
154            Arc::new(std::sync::Mutex::new(Vec::new()));
155
156        let mut handles = Vec::new();
157        for _ in 0..64 {
158            let l = limiter.clone();
159            let s = Arc::clone(&success);
160            let d = Arc::clone(&denied);
161            let m = Arc::clone(&max_seen);
162            let permits = Arc::clone(&permits);
163            handles.push(thread::spawn(move || match l.try_acquire() {
164                Some(p) => {
165                    s.fetch_add(1, Ordering::Relaxed);
166                    let now = l.current();
167                    m.fetch_max(now, Ordering::Relaxed);
168                    permits.lock().unwrap().push(p);
169                }
170                None => {
171                    d.fetch_add(1, Ordering::Relaxed);
172                }
173            }));
174        }
175        for h in handles {
176            h.join().unwrap();
177        }
178
179        assert_eq!(success.load(Ordering::Relaxed), cap);
180        assert_eq!(denied.load(Ordering::Relaxed), 64 - cap);
181        assert!(max_seen.load(Ordering::Relaxed) <= cap);
182        assert_eq!(limiter.current(), cap);
183
184        permits.lock().unwrap().clear();
185        assert_eq!(limiter.current(), 0);
186    }
187
188    #[test]
189    fn clone_shares_state() {
190        let a = HttpConnectionLimiter::new(2);
191        let b = a.clone();
192        let _p = a.try_acquire().unwrap();
193        assert_eq!(b.current(), 1);
194        assert_eq!(b.cap(), 2);
195    }
196
197    #[test]
198    fn default_cap_in_bounds() {
199        let limiter = HttpConnectionLimiter::with_default_cap();
200        assert!(limiter.cap() >= 8);
201        assert!(limiter.cap() <= 256);
202    }
203}