1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26pub trait MonotonicClock: Send + Sync + std::fmt::Debug {
30 fn now_nanos(&self) -> u64;
33}
34
35#[derive(Debug)]
37pub struct SystemMonotonicClock {
38 base: Instant,
39}
40
41impl SystemMonotonicClock {
42 pub fn new() -> Self {
43 Self {
44 base: Instant::now(),
45 }
46 }
47}
48
49impl Default for SystemMonotonicClock {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl MonotonicClock for SystemMonotonicClock {
56 fn now_nanos(&self) -> u64 {
57 self.base.elapsed().as_nanos() as u64
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq)]
68pub struct LimiterObservation {
69 pub rejected: u64,
70 pub elapsed: Duration,
71 pub rejections_per_sec: f64,
72}
73
74#[derive(Debug)]
75struct Inner {
76 cap: usize,
77 in_use: AtomicUsize,
78 rejected: AtomicU64,
81 last_observe_nanos: AtomicU64,
84 clock: Arc<dyn MonotonicClock>,
85}
86
87#[derive(Debug)]
91pub struct HttpConnectionPermit {
92 inner: Arc<Inner>,
93}
94
95impl Drop for HttpConnectionPermit {
96 fn drop(&mut self) {
97 self.inner.in_use.fetch_sub(1, Ordering::Release);
103 }
104}
105
106#[derive(Debug, Clone)]
107pub struct HttpConnectionLimiter {
108 inner: Arc<Inner>,
109}
110
111impl HttpConnectionLimiter {
112 pub fn new(cap: usize) -> Self {
113 Self::with_clock(cap, Arc::new(SystemMonotonicClock::new()))
114 }
115
116 pub fn with_clock(cap: usize, clock: Arc<dyn MonotonicClock>) -> Self {
120 assert!(cap > 0, "HttpConnectionLimiter cap must be positive");
121 let base = clock.now_nanos();
122 Self {
123 inner: Arc::new(Inner {
124 cap,
125 in_use: AtomicUsize::new(0),
126 rejected: AtomicU64::new(0),
127 last_observe_nanos: AtomicU64::new(base),
128 clock,
129 }),
130 }
131 }
132
133 pub fn with_default_cap() -> Self {
135 let cores = std::thread::available_parallelism()
136 .map(|n| n.get())
137 .unwrap_or(1);
138 let cap = (2 * cores).clamp(8, 256);
139 Self::new(cap)
140 }
141
142 pub fn cap(&self) -> usize {
143 self.inner.cap
144 }
145
146 pub fn current(&self) -> usize {
147 self.inner.in_use.load(Ordering::Relaxed)
148 }
149
150 pub fn try_acquire(&self) -> Option<HttpConnectionPermit> {
153 let mut observed = self.inner.in_use.load(Ordering::Relaxed);
154 loop {
155 if observed >= self.inner.cap {
156 self.inner.rejected.fetch_add(1, Ordering::Relaxed);
160 return None;
161 }
162 match self.inner.in_use.compare_exchange_weak(
163 observed,
164 observed + 1,
165 Ordering::Acquire,
166 Ordering::Relaxed,
167 ) {
168 Ok(_) => {
169 return Some(HttpConnectionPermit {
170 inner: Arc::clone(&self.inner),
171 });
172 }
173 Err(actual) => observed = actual,
174 }
175 }
176 }
177
178 pub fn rejected_since_last_observe(&self) -> u64 {
182 self.inner.rejected.load(Ordering::Relaxed)
183 }
184
185 pub fn observe(&self) -> LimiterObservation {
192 let now = self.inner.clock.now_nanos();
193 let last = self.inner.last_observe_nanos.swap(now, Ordering::Relaxed);
194 let rejected = self.inner.rejected.swap(0, Ordering::Relaxed);
195 let elapsed_nanos = now.saturating_sub(last);
196 let rejections_per_sec = if elapsed_nanos == 0 {
197 0.0
198 } else {
199 rejected as f64 * 1_000_000_000.0 / elapsed_nanos as f64
200 };
201 LimiterObservation {
202 rejected,
203 elapsed: Duration::from_nanos(elapsed_nanos),
204 rejections_per_sec,
205 }
206 }
207}
208
209#[derive(Debug, Clone)]
222pub struct HandlerDeadline {
223 clock: Arc<dyn MonotonicClock>,
224 deadline_nanos: u64,
228}
229
230impl HandlerDeadline {
231 pub fn arm(clock: Arc<dyn MonotonicClock>, timeout: Duration) -> Self {
234 let now = clock.now_nanos();
235 let deadline_nanos = now.saturating_add(timeout.as_nanos() as u64);
236 Self {
237 clock,
238 deadline_nanos,
239 }
240 }
241
242 pub fn expired(&self) -> bool {
245 self.clock.now_nanos() >= self.deadline_nanos
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use std::sync::atomic::AtomicUsize;
253 use std::sync::Arc;
254 use std::thread;
255
256 #[derive(Debug, Default)]
258 struct FakeClock {
259 nanos: AtomicU64,
260 }
261
262 impl FakeClock {
263 fn advance(&self, d: Duration) {
264 self.nanos.fetch_add(d.as_nanos() as u64, Ordering::Relaxed);
265 }
266 }
267
268 impl MonotonicClock for FakeClock {
269 fn now_nanos(&self) -> u64 {
270 self.nanos.load(Ordering::Relaxed)
271 }
272 }
273
274 #[test]
275 fn cap_and_current_track_observed_state() {
276 let limiter = HttpConnectionLimiter::new(3);
277 assert_eq!(limiter.cap(), 3);
278 assert_eq!(limiter.current(), 0);
279
280 let p1 = limiter.try_acquire().expect("slot 1");
281 assert_eq!(limiter.current(), 1);
282 let p2 = limiter.try_acquire().expect("slot 2");
283 assert_eq!(limiter.current(), 2);
284 let p3 = limiter.try_acquire().expect("slot 3");
285 assert_eq!(limiter.current(), 3);
286
287 assert!(limiter.try_acquire().is_none());
288 assert_eq!(limiter.current(), 3);
289
290 drop(p2);
291 assert_eq!(limiter.current(), 2);
292 let p4 = limiter.try_acquire().expect("slot reused");
293 assert_eq!(limiter.current(), 3);
294 drop((p1, p3, p4));
295 assert_eq!(limiter.current(), 0);
296 }
297
298 #[test]
299 fn permit_drop_restores_capacity() {
300 let limiter = HttpConnectionLimiter::new(1);
301 {
302 let _p = limiter.try_acquire().expect("acquired");
303 assert!(limiter.try_acquire().is_none());
304 }
305 assert_eq!(limiter.current(), 0);
306 let _p = limiter.try_acquire().expect("reacquired after drop");
307 assert_eq!(limiter.current(), 1);
308 }
309
310 #[test]
311 fn cap_enforced_under_thread_storm_no_over_issue() {
312 let cap = 8;
316 let limiter = HttpConnectionLimiter::new(cap);
317 let success = Arc::new(AtomicUsize::new(0));
318 let denied = Arc::new(AtomicUsize::new(0));
319 let max_seen = Arc::new(AtomicUsize::new(0));
320 let permits: Arc<std::sync::Mutex<Vec<HttpConnectionPermit>>> =
321 Arc::new(std::sync::Mutex::new(Vec::new()));
322
323 let mut handles = Vec::new();
324 for _ in 0..64 {
325 let l = limiter.clone();
326 let s = Arc::clone(&success);
327 let d = Arc::clone(&denied);
328 let m = Arc::clone(&max_seen);
329 let permits = Arc::clone(&permits);
330 handles.push(thread::spawn(move || match l.try_acquire() {
331 Some(p) => {
332 s.fetch_add(1, Ordering::Relaxed);
333 let now = l.current();
334 m.fetch_max(now, Ordering::Relaxed);
335 permits.lock().unwrap().push(p);
336 }
337 None => {
338 d.fetch_add(1, Ordering::Relaxed);
339 }
340 }));
341 }
342 for h in handles {
343 h.join().unwrap();
344 }
345
346 assert_eq!(success.load(Ordering::Relaxed), cap);
347 assert_eq!(denied.load(Ordering::Relaxed), 64 - cap);
348 assert!(max_seen.load(Ordering::Relaxed) <= cap);
349 assert_eq!(limiter.current(), cap);
350
351 permits.lock().unwrap().clear();
352 assert_eq!(limiter.current(), 0);
353 }
354
355 #[test]
356 fn clone_shares_state() {
357 let a = HttpConnectionLimiter::new(2);
358 let b = a.clone();
359 let _p = a.try_acquire().unwrap();
360 assert_eq!(b.current(), 1);
361 assert_eq!(b.cap(), 2);
362 }
363
364 #[test]
365 fn default_cap_in_bounds() {
366 let limiter = HttpConnectionLimiter::with_default_cap();
367 assert!(limiter.cap() >= 8);
368 assert!(limiter.cap() <= 256);
369 }
370
371 #[test]
372 fn rejected_accumulates_within_window_and_resets_on_observe() {
373 let limiter = HttpConnectionLimiter::new(1);
374 let _held = limiter.try_acquire().expect("first slot");
375
376 assert_eq!(limiter.rejected_since_last_observe(), 0);
377 for expected in 1..=4 {
379 assert!(limiter.try_acquire().is_none());
380 assert_eq!(limiter.rejected_since_last_observe(), expected);
381 }
382
383 let obs = limiter.observe();
385 assert_eq!(obs.rejected, 4);
386 assert_eq!(limiter.rejected_since_last_observe(), 0);
387
388 assert!(limiter.try_acquire().is_none());
390 assert_eq!(limiter.observe().rejected, 1);
391 assert_eq!(limiter.observe().rejected, 0);
392 }
393
394 #[test]
395 fn fake_clock_rejection_rate_derivation() {
396 let clock = Arc::new(FakeClock::default());
397 let limiter = HttpConnectionLimiter::with_clock(1, clock.clone());
398 let _held = limiter.try_acquire().expect("first slot");
399
400 for _ in 0..10 {
402 assert!(limiter.try_acquire().is_none());
403 }
404 clock.advance(Duration::from_secs(2));
405 let obs = limiter.observe();
406 assert_eq!(obs.rejected, 10);
407 assert_eq!(obs.elapsed, Duration::from_secs(2));
408 assert!((obs.rejections_per_sec - 5.0).abs() < 1e-9);
409
410 for _ in 0..3 {
412 assert!(limiter.try_acquire().is_none());
413 }
414 clock.advance(Duration::from_millis(500));
415 let obs = limiter.observe();
416 assert_eq!(obs.rejected, 3);
417 assert!((obs.rejections_per_sec - 6.0).abs() < 1e-9);
418 }
419
420 #[test]
421 fn observe_with_zero_elapsed_reports_zero_rate_not_nan() {
422 let clock = Arc::new(FakeClock::default());
423 let limiter = HttpConnectionLimiter::with_clock(1, clock.clone());
424 let _held = limiter.try_acquire().expect("first slot");
425 assert!(limiter.try_acquire().is_none());
426 let obs = limiter.observe();
428 assert_eq!(obs.elapsed, Duration::ZERO);
429 assert_eq!(obs.rejected, 1);
430 assert_eq!(obs.rejections_per_sec, 0.0);
431 }
432
433 #[test]
434 fn handler_deadline_not_expired_before_timeout() {
435 let clock = Arc::new(FakeClock::default());
436 let deadline = HandlerDeadline::arm(clock.clone(), Duration::from_millis(200));
437 assert!(!deadline.expired());
439 clock.advance(Duration::from_millis(199));
441 assert!(!deadline.expired());
442 }
443
444 #[test]
445 fn handler_deadline_expires_at_and_after_timeout() {
446 let clock = Arc::new(FakeClock::default());
447 let deadline = HandlerDeadline::arm(clock.clone(), Duration::from_millis(200));
448 clock.advance(Duration::from_millis(200));
450 assert!(deadline.expired());
451 clock.advance(Duration::from_secs(5));
453 assert!(deadline.expired());
454 }
455
456 #[test]
457 fn handler_deadline_arm_saturates_without_wrapping() {
458 #[derive(Debug)]
460 struct MaxClock;
461 impl MonotonicClock for MaxClock {
462 fn now_nanos(&self) -> u64 {
463 u64::MAX - 10
464 }
465 }
466 let deadline = HandlerDeadline::arm(Arc::new(MaxClock), Duration::from_secs(30));
467 assert!(!deadline.expired());
469 }
470}