Skip to main content

reddb_server/server/
http_connection_limiter.rs

1//! Bounded handler-thread admission for the clear-text HTTP accept loop.
2//!
3//! Slice 1 of issue #570 / parent #569. The synchronous HTTP transport
4//! spawns one OS thread per accepted connection. Without an admission
5//! cap the server can degrade into thread-storm and lock starvation
6//! under load. `HttpConnectionLimiter` is a single `AtomicUsize`-backed
7//! semaphore consulted in the accept loop *before* parsing or handler
8//! work. A rejected connection gets a static `503 + Retry-After` written
9//! and the socket closed without ever entering the runtime.
10//!
11//! Hard cap for this slice is `(2 * available_parallelism).clamp(8, 256)`.
12//! Config knobs (env / CLI) land in slice 5 per the parent brief.
13//!
14//! Beyond admission, the limiter keeps a single rejection counter and an
15//! injectable monotonic clock (issue #620). Every `try_acquire` that hits
16//! the cap bumps the counter; `observe()` snapshots-and-resets it against
17//! the elapsed wall to derive a rejection rate. v1 ships a constant
18//! `Retry-After`; the rate signal is what a future v2 will use to make
19//! `Retry-After` adaptive. The clock is a trait so tests drive the rate
20//! deterministically without sleeping.
21
22use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26/// Monotonic clock abstraction. Production uses [`SystemMonotonicClock`]
27/// (a process-start `Instant` baseline); tests inject a fake that can be
28/// advanced by hand so the rejection-rate derivation is deterministic.
29pub trait MonotonicClock: Send + Sync + std::fmt::Debug {
30    /// Nanoseconds elapsed since an arbitrary, fixed epoch. Only
31    /// differences are meaningful; the absolute value carries no meaning.
32    fn now_nanos(&self) -> u64;
33}
34
35/// Real monotonic clock: nanoseconds since the limiter's construction.
36#[derive(Debug)]
37pub struct SystemMonotonicClock {
38    base: Instant,
39}
40
41impl SystemMonotonicClock {
42    pub fn new() -> Self {
43        Self {
44            base: Instant::now(),
45        }
46    }
47}
48
49impl Default for SystemMonotonicClock {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl MonotonicClock for SystemMonotonicClock {
56    fn now_nanos(&self) -> u64 {
57        // u64 nanos overflows ~584 years after construction; safe.
58        self.base.elapsed().as_nanos() as u64
59    }
60}
61
62/// Snapshot returned by [`HttpConnectionLimiter::observe`]: the rejections
63/// accumulated since the previous observe, the wall elapsed across that
64/// window, and the derived rate. `rejections_per_sec` is `0.0` for a
65/// zero-length window (no time has passed) so callers never divide by
66/// zero.
67#[derive(Debug, Clone, Copy, PartialEq)]
68pub struct LimiterObservation {
69    pub rejected: u64,
70    pub elapsed: Duration,
71    pub rejections_per_sec: f64,
72}
73
74#[derive(Debug)]
75struct Inner {
76    cap: usize,
77    in_use: AtomicUsize,
78    /// Rejections accumulated since the last `observe()`. Bumped on every
79    /// `try_acquire` that finds the cap full; reset to 0 by `observe()`.
80    rejected: AtomicU64,
81    /// Clock reading (nanos) captured at the last `observe()`, used as the
82    /// lower bound of the next window. Seeded at construction.
83    last_observe_nanos: AtomicU64,
84    clock: Arc<dyn MonotonicClock>,
85}
86
87/// Permit handle — owns one slot of the limiter. Dropping the permit
88/// returns the slot. The permit is intentionally `!Clone` so the slot
89/// accounting can't drift.
90#[derive(Debug)]
91pub struct HttpConnectionPermit {
92    inner: Arc<Inner>,
93}
94
95impl Drop for HttpConnectionPermit {
96    fn drop(&mut self) {
97        // Release is correct here: we want any writes the handler made
98        // to be visible to a thread that subsequently re-acquires this
99        // logical slot. Cap is fixed at construction so no need to
100        // gate readers behind Acquire — readers of `current()` use
101        // Relaxed below for observability only.
102        self.inner.in_use.fetch_sub(1, Ordering::Release);
103    }
104}
105
106#[derive(Debug, Clone)]
107pub struct HttpConnectionLimiter {
108    inner: Arc<Inner>,
109}
110
111impl HttpConnectionLimiter {
112    pub fn new(cap: usize) -> Self {
113        Self::with_clock(cap, Arc::new(SystemMonotonicClock::new()))
114    }
115
116    /// Construct with an explicit clock. Production uses [`new`], which
117    /// wires the real monotonic clock; tests inject a fake to drive the
118    /// rejection-rate derivation deterministically.
119    pub fn with_clock(cap: usize, clock: Arc<dyn MonotonicClock>) -> Self {
120        assert!(cap > 0, "HttpConnectionLimiter cap must be positive");
121        let base = clock.now_nanos();
122        Self {
123            inner: Arc::new(Inner {
124                cap,
125                in_use: AtomicUsize::new(0),
126                rejected: AtomicU64::new(0),
127                last_observe_nanos: AtomicU64::new(base),
128                clock,
129            }),
130        }
131    }
132
133    /// Default cap: `(2 * available_parallelism).clamp(8, 256)`.
134    pub fn with_default_cap() -> Self {
135        let cores = std::thread::available_parallelism()
136            .map(|n| n.get())
137            .unwrap_or(1);
138        let cap = (2 * cores).clamp(8, 256);
139        Self::new(cap)
140    }
141
142    pub fn cap(&self) -> usize {
143        self.inner.cap
144    }
145
146    pub fn current(&self) -> usize {
147        self.inner.in_use.load(Ordering::Relaxed)
148    }
149
150    /// Returns `Some(permit)` on success, `None` if the cap is full.
151    /// No blocking, no allocation on the hot path.
152    pub fn try_acquire(&self) -> Option<HttpConnectionPermit> {
153        let mut observed = self.inner.in_use.load(Ordering::Relaxed);
154        loop {
155            if observed >= self.inner.cap {
156                // Cap full: count the rejection for the rate signal. This
157                // is the only mutation on the reject path — no alloc, no
158                // lock, no parsing, as the accept loop requires.
159                self.inner.rejected.fetch_add(1, Ordering::Relaxed);
160                return None;
161            }
162            match self.inner.in_use.compare_exchange_weak(
163                observed,
164                observed + 1,
165                Ordering::Acquire,
166                Ordering::Relaxed,
167            ) {
168                Ok(_) => {
169                    return Some(HttpConnectionPermit {
170                        inner: Arc::clone(&self.inner),
171                    });
172                }
173                Err(actual) => observed = actual,
174            }
175        }
176    }
177
178    /// Rejections accumulated since the last [`observe`](Self::observe).
179    /// Read-only: it accumulates monotonically within a window and is
180    /// reset only by `observe`.
181    pub fn rejected_since_last_observe(&self) -> u64 {
182        self.inner.rejected.load(Ordering::Relaxed)
183    }
184
185    /// Snapshot-and-reset the rejection window: returns the rejections
186    /// since the previous `observe` together with the elapsed wall and the
187    /// derived per-second rate, then resets the counter and arms the next
188    /// window at the current clock reading. `rejections_per_sec` is `0.0`
189    /// when no time has elapsed (avoids a divide-by-zero on back-to-back
190    /// observes).
191    pub fn observe(&self) -> LimiterObservation {
192        let now = self.inner.clock.now_nanos();
193        let last = self.inner.last_observe_nanos.swap(now, Ordering::Relaxed);
194        let rejected = self.inner.rejected.swap(0, Ordering::Relaxed);
195        let elapsed_nanos = now.saturating_sub(last);
196        let rejections_per_sec = if elapsed_nanos == 0 {
197            0.0
198        } else {
199            rejected as f64 * 1_000_000_000.0 / elapsed_nanos as f64
200        };
201        LimiterObservation {
202            rejected,
203            elapsed: Duration::from_nanos(elapsed_nanos),
204            rejections_per_sec,
205        }
206    }
207}
208
209/// Per-handler total wall-clock deadline (issue #621), armed against the
210/// same [`MonotonicClock`] abstraction the limiter uses. The clear-text
211/// (and TLS) HTTP handler arms one of these at spawn and polls
212/// [`expired`](Self::expired) at coarse boundaries (between parse, route
213/// dispatch, and write). Production wires [`SystemMonotonicClock`], so the
214/// deadline tracks real wall time; tests inject a fake clock to drive
215/// expiry deterministically without `sleep()`.
216///
217/// This bounds — but does not pre-empt — handler lifetime: a thread blocked
218/// inside a true syscall is still released only by the per-socket
219/// read/write timeouts. The deadline reclaims a limiter slot for the
220/// internal-lock-contention case the PRD (#569) targets.
221#[derive(Debug, Clone)]
222pub struct HandlerDeadline {
223    clock: Arc<dyn MonotonicClock>,
224    /// Absolute clock reading (nanos) at or after which the handler is
225    /// over budget. Saturating-added at arm time so a near-`u64::MAX`
226    /// base can never wrap into the past.
227    deadline_nanos: u64,
228}
229
230impl HandlerDeadline {
231    /// Arm a deadline `timeout` from now, read off `clock`. The clock is
232    /// shared (`Arc`) so the same instance can be reused across handlers.
233    pub fn arm(clock: Arc<dyn MonotonicClock>, timeout: Duration) -> Self {
234        let now = clock.now_nanos();
235        let deadline_nanos = now.saturating_add(timeout.as_nanos() as u64);
236        Self {
237            clock,
238            deadline_nanos,
239        }
240    }
241
242    /// `true` once the clock has reached the armed deadline. Checked at
243    /// coarse boundaries — never inside a blocking call.
244    pub fn expired(&self) -> bool {
245        self.clock.now_nanos() >= self.deadline_nanos
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use std::sync::atomic::AtomicUsize;
253    use std::sync::Arc;
254    use std::thread;
255
256    /// Hand-advanced clock for deterministic rejection-rate tests.
257    #[derive(Debug, Default)]
258    struct FakeClock {
259        nanos: AtomicU64,
260    }
261
262    impl FakeClock {
263        fn advance(&self, d: Duration) {
264            self.nanos.fetch_add(d.as_nanos() as u64, Ordering::Relaxed);
265        }
266    }
267
268    impl MonotonicClock for FakeClock {
269        fn now_nanos(&self) -> u64 {
270            self.nanos.load(Ordering::Relaxed)
271        }
272    }
273
274    #[test]
275    fn cap_and_current_track_observed_state() {
276        let limiter = HttpConnectionLimiter::new(3);
277        assert_eq!(limiter.cap(), 3);
278        assert_eq!(limiter.current(), 0);
279
280        let p1 = limiter.try_acquire().expect("slot 1");
281        assert_eq!(limiter.current(), 1);
282        let p2 = limiter.try_acquire().expect("slot 2");
283        assert_eq!(limiter.current(), 2);
284        let p3 = limiter.try_acquire().expect("slot 3");
285        assert_eq!(limiter.current(), 3);
286
287        assert!(limiter.try_acquire().is_none());
288        assert_eq!(limiter.current(), 3);
289
290        drop(p2);
291        assert_eq!(limiter.current(), 2);
292        let p4 = limiter.try_acquire().expect("slot reused");
293        assert_eq!(limiter.current(), 3);
294        drop((p1, p3, p4));
295        assert_eq!(limiter.current(), 0);
296    }
297
298    #[test]
299    fn permit_drop_restores_capacity() {
300        let limiter = HttpConnectionLimiter::new(1);
301        {
302            let _p = limiter.try_acquire().expect("acquired");
303            assert!(limiter.try_acquire().is_none());
304        }
305        assert_eq!(limiter.current(), 0);
306        let _p = limiter.try_acquire().expect("reacquired after drop");
307        assert_eq!(limiter.current(), 1);
308    }
309
310    #[test]
311    fn cap_enforced_under_thread_storm_no_over_issue() {
312        // Many threads race try_acquire; verify the high-water-mark
313        // never exceeds the cap, and the total successful acquires
314        // matches the cap when permits are held.
315        let cap = 8;
316        let limiter = HttpConnectionLimiter::new(cap);
317        let success = Arc::new(AtomicUsize::new(0));
318        let denied = Arc::new(AtomicUsize::new(0));
319        let max_seen = Arc::new(AtomicUsize::new(0));
320        let permits: Arc<std::sync::Mutex<Vec<HttpConnectionPermit>>> =
321            Arc::new(std::sync::Mutex::new(Vec::new()));
322
323        let mut handles = Vec::new();
324        for _ in 0..64 {
325            let l = limiter.clone();
326            let s = Arc::clone(&success);
327            let d = Arc::clone(&denied);
328            let m = Arc::clone(&max_seen);
329            let permits = Arc::clone(&permits);
330            handles.push(thread::spawn(move || match l.try_acquire() {
331                Some(p) => {
332                    s.fetch_add(1, Ordering::Relaxed);
333                    let now = l.current();
334                    m.fetch_max(now, Ordering::Relaxed);
335                    permits.lock().unwrap().push(p);
336                }
337                None => {
338                    d.fetch_add(1, Ordering::Relaxed);
339                }
340            }));
341        }
342        for h in handles {
343            h.join().unwrap();
344        }
345
346        assert_eq!(success.load(Ordering::Relaxed), cap);
347        assert_eq!(denied.load(Ordering::Relaxed), 64 - cap);
348        assert!(max_seen.load(Ordering::Relaxed) <= cap);
349        assert_eq!(limiter.current(), cap);
350
351        permits.lock().unwrap().clear();
352        assert_eq!(limiter.current(), 0);
353    }
354
355    #[test]
356    fn clone_shares_state() {
357        let a = HttpConnectionLimiter::new(2);
358        let b = a.clone();
359        let _p = a.try_acquire().unwrap();
360        assert_eq!(b.current(), 1);
361        assert_eq!(b.cap(), 2);
362    }
363
364    #[test]
365    fn default_cap_in_bounds() {
366        let limiter = HttpConnectionLimiter::with_default_cap();
367        assert!(limiter.cap() >= 8);
368        assert!(limiter.cap() <= 256);
369    }
370
371    #[test]
372    fn rejected_accumulates_within_window_and_resets_on_observe() {
373        let limiter = HttpConnectionLimiter::new(1);
374        let _held = limiter.try_acquire().expect("first slot");
375
376        assert_eq!(limiter.rejected_since_last_observe(), 0);
377        // Each over-cap acquire bumps the counter monotonically.
378        for expected in 1..=4 {
379            assert!(limiter.try_acquire().is_none());
380            assert_eq!(limiter.rejected_since_last_observe(), expected);
381        }
382
383        // observe() drains the window; the counter resets.
384        let obs = limiter.observe();
385        assert_eq!(obs.rejected, 4);
386        assert_eq!(limiter.rejected_since_last_observe(), 0);
387
388        // A subsequent observe with no rejections reports zero.
389        assert!(limiter.try_acquire().is_none());
390        assert_eq!(limiter.observe().rejected, 1);
391        assert_eq!(limiter.observe().rejected, 0);
392    }
393
394    #[test]
395    fn fake_clock_rejection_rate_derivation() {
396        let clock = Arc::new(FakeClock::default());
397        let limiter = HttpConnectionLimiter::with_clock(1, clock.clone());
398        let _held = limiter.try_acquire().expect("first slot");
399
400        // 10 rejections across a 2s window -> 5 rejections/sec.
401        for _ in 0..10 {
402            assert!(limiter.try_acquire().is_none());
403        }
404        clock.advance(Duration::from_secs(2));
405        let obs = limiter.observe();
406        assert_eq!(obs.rejected, 10);
407        assert_eq!(obs.elapsed, Duration::from_secs(2));
408        assert!((obs.rejections_per_sec - 5.0).abs() < 1e-9);
409
410        // Next window: 3 rejections across 500ms -> 6 rejections/sec.
411        for _ in 0..3 {
412            assert!(limiter.try_acquire().is_none());
413        }
414        clock.advance(Duration::from_millis(500));
415        let obs = limiter.observe();
416        assert_eq!(obs.rejected, 3);
417        assert!((obs.rejections_per_sec - 6.0).abs() < 1e-9);
418    }
419
420    #[test]
421    fn observe_with_zero_elapsed_reports_zero_rate_not_nan() {
422        let clock = Arc::new(FakeClock::default());
423        let limiter = HttpConnectionLimiter::with_clock(1, clock.clone());
424        let _held = limiter.try_acquire().expect("first slot");
425        assert!(limiter.try_acquire().is_none());
426        // No clock advance: back-to-back observe must not divide by zero.
427        let obs = limiter.observe();
428        assert_eq!(obs.elapsed, Duration::ZERO);
429        assert_eq!(obs.rejected, 1);
430        assert_eq!(obs.rejections_per_sec, 0.0);
431    }
432
433    #[test]
434    fn handler_deadline_not_expired_before_timeout() {
435        let clock = Arc::new(FakeClock::default());
436        let deadline = HandlerDeadline::arm(clock.clone(), Duration::from_millis(200));
437        // Right after arming: not expired.
438        assert!(!deadline.expired());
439        // Advance to just under the budget: still not expired.
440        clock.advance(Duration::from_millis(199));
441        assert!(!deadline.expired());
442    }
443
444    #[test]
445    fn handler_deadline_expires_at_and_after_timeout() {
446        let clock = Arc::new(FakeClock::default());
447        let deadline = HandlerDeadline::arm(clock.clone(), Duration::from_millis(200));
448        // Exactly at the deadline: expired (`>=`).
449        clock.advance(Duration::from_millis(200));
450        assert!(deadline.expired());
451        // And it stays expired as time marches on — no real sleeps used.
452        clock.advance(Duration::from_secs(5));
453        assert!(deadline.expired());
454    }
455
456    #[test]
457    fn handler_deadline_arm_saturates_without_wrapping() {
458        // A near-u64::MAX base must not wrap the deadline into the past.
459        #[derive(Debug)]
460        struct MaxClock;
461        impl MonotonicClock for MaxClock {
462            fn now_nanos(&self) -> u64 {
463                u64::MAX - 10
464            }
465        }
466        let deadline = HandlerDeadline::arm(Arc::new(MaxClock), Duration::from_secs(30));
467        // now (u64::MAX - 10) < saturated deadline (u64::MAX) -> not expired.
468        assert!(!deadline.expired());
469    }
470}