1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
//! Latency-based auto-throttle for polite, adaptive crawling.
//!
//! Feature-gated behind `auto_throttle`. Tracks per-domain response latency
//! via exponential moving average (EMA) and dynamically computes a crawl delay
//! so that the target number of concurrent requests remains within a latency
//! window.
//!
//! Inspired by Scrapy's AUTOTHROTTLE — increases delay when servers respond
//! slowly, decreases when they are fast. All operations are lock-free
//! (DashMap + atomics).
#[cfg(feature = "auto_throttle")]
mod inner {
use crate::compact_str::CompactString;
use dashmap::DashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
/// Maximum tracked domains before LRU eviction.
const MAX_ENTRIES: usize = 10_000;
/// Default smoothing factor for EMA (0..1). Higher = more responsive.
const DEFAULT_ALPHA: f64 = 0.15;
/// Per-domain latency state stored as atomic u64 bit-patterns.
struct DomainLatency {
/// EMA of response time in microseconds, stored as f64 bits.
ema_us: AtomicU64,
/// Number of samples recorded (saturates at u64::MAX).
samples: AtomicU64,
/// Monotonic access counter for LRU eviction.
last_access: AtomicU64,
}
impl DomainLatency {
fn new(access_counter: u64) -> Self {
Self {
ema_us: AtomicU64::new(0),
samples: AtomicU64::new(0),
last_access: AtomicU64::new(access_counter),
}
}
/// Load the current EMA in microseconds.
fn ema_micros(&self) -> f64 {
f64::from_bits(self.ema_us.load(Ordering::Relaxed))
}
/// Record a new latency sample and update the EMA.
fn record(&self, latency_us: f64, alpha: f64) {
let prev_count = self.samples.fetch_add(1, Ordering::Relaxed);
if prev_count == 0 {
// First sample — seed the EMA.
self.ema_us.store(latency_us.to_bits(), Ordering::Relaxed);
} else {
// CAS loop to atomically update f64 EMA.
let _ = self
.ema_us
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |bits| {
let old = f64::from_bits(bits);
let new = old + alpha * (latency_us - old);
// Guard: never store NaN/Inf.
if new.is_finite() && new >= 0.0 {
Some(new.to_bits())
} else {
Some(old.to_bits())
}
});
}
}
}
/// Configuration for the auto-throttle.
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AutoThrottleConfig {
/// Target concurrency per domain. The throttle aims to keep
/// `delay ≈ latency / target_concurrency` so that approximately this
/// many requests are in-flight per domain at steady state.
/// Default: 2.0.
pub target_concurrency: f64,
/// Minimum delay between requests in milliseconds. Default: 0.
pub min_delay_ms: u64,
/// Maximum delay between requests in milliseconds. Default: 60_000.
pub max_delay_ms: u64,
/// EMA smoothing factor (0..1). Higher = more responsive to latency
/// changes. Default: 0.15.
pub alpha: f64,
/// Whether auto-throttle is enabled. Default: true.
pub enabled: bool,
}
impl Default for AutoThrottleConfig {
fn default() -> Self {
Self {
target_concurrency: 2.0,
min_delay_ms: 0,
max_delay_ms: 60_000,
alpha: DEFAULT_ALPHA,
enabled: true,
}
}
}
/// Per-domain auto-throttle that dynamically computes crawl delay.
///
/// Thread-safe: `DashMap` for concurrent access, atomics for per-domain state.
pub struct AutoThrottle {
domains: DashMap<CompactString, DomainLatency>,
config: AutoThrottleConfig,
/// Monotonically increasing counter for LRU.
access_counter: AtomicU64,
}
impl AutoThrottle {
/// Create a new auto-throttle with the given configuration.
pub fn new(config: AutoThrottleConfig) -> Self {
Self {
domains: DashMap::with_capacity(64),
config,
access_counter: AtomicU64::new(0),
}
}
/// Create with default configuration.
pub fn with_defaults() -> Self {
Self::new(AutoThrottleConfig::default())
}
/// Record a response latency for a domain.
///
/// Call this after each successful (or failed) fetch with the elapsed
/// wall-clock time. The EMA is updated atomically.
pub fn record_latency(&self, domain: &str, latency: Duration) {
let us = latency.as_micros() as f64;
let counter = self.access_counter.fetch_add(1, Ordering::Relaxed);
let key = CompactString::new(domain);
let alpha = self.config.alpha.clamp(0.01, 1.0);
if let Some(entry) = self.domains.get(&key) {
entry.last_access.store(counter, Ordering::Relaxed);
entry.record(us, alpha);
} else {
self.maybe_evict();
let entry = DomainLatency::new(counter);
entry.record(us, alpha);
self.domains.insert(key, entry);
}
}
/// Compute the adaptive delay for a domain.
///
/// Formula: `delay = ema_latency / target_concurrency`, clamped to
/// `[min_delay_ms, max_delay_ms]`.
///
/// Returns `Duration::ZERO` if no samples have been recorded yet
/// (cold-start: don't delay until we have data).
pub fn delay_for(&self, domain: &str) -> Duration {
if !self.config.enabled {
return Duration::ZERO;
}
let key = CompactString::new(domain);
let ema_us = match self.domains.get(&key) {
Some(entry) => {
if entry.samples.load(Ordering::Relaxed) == 0 {
return Duration::ZERO;
}
entry.ema_micros()
}
None => return Duration::ZERO,
};
let target = self.config.target_concurrency.max(0.1);
let delay_us = ema_us / target;
let delay_ms = (delay_us / 1000.0) as u64;
let clamped = delay_ms.clamp(self.config.min_delay_ms, self.config.max_delay_ms);
Duration::from_millis(clamped)
}
/// Get the current EMA latency for a domain in milliseconds.
/// Returns `None` if no samples recorded.
pub fn latency_ms(&self, domain: &str) -> Option<f64> {
let key = CompactString::new(domain);
self.domains.get(&key).and_then(|entry| {
if entry.samples.load(Ordering::Relaxed) == 0 {
None
} else {
Some(entry.ema_micros() / 1000.0)
}
})
}
/// Number of tracked domains.
pub fn len(&self) -> usize {
self.domains.len()
}
/// Whether the throttle is tracking any domains.
pub fn is_empty(&self) -> bool {
self.domains.is_empty()
}
/// Evict the least-recently-used entry if over capacity.
fn maybe_evict(&self) {
if self.domains.len() < MAX_ENTRIES {
return;
}
let mut oldest_key: Option<CompactString> = None;
let mut oldest_access = u64::MAX;
for entry in self.domains.iter() {
let access = entry.value().last_access.load(Ordering::Relaxed);
if access < oldest_access {
oldest_access = access;
oldest_key = Some(entry.key().clone());
}
}
if let Some(key) = oldest_key {
self.domains.remove(&key);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn cold_start_returns_zero_delay() {
let at = AutoThrottle::with_defaults();
assert_eq!(at.delay_for("example.com"), Duration::ZERO);
}
#[test]
fn first_sample_seeds_ema() {
let at = AutoThrottle::with_defaults();
at.record_latency("example.com", Duration::from_millis(200));
let lat = at.latency_ms("example.com").unwrap();
assert!((lat - 200.0).abs() < 1.0, "expected ~200ms, got {lat}");
}
#[test]
fn ema_converges_toward_new_value() {
let config = AutoThrottleConfig {
alpha: 0.5,
..Default::default()
};
let at = AutoThrottle::new(config);
// Seed with 100ms
at.record_latency("a.com", Duration::from_millis(100));
// Record 300ms → EMA should move toward 300
at.record_latency("a.com", Duration::from_millis(300));
let lat = at.latency_ms("a.com").unwrap();
// EMA = 100 + 0.5*(300-100) = 200
assert!((lat - 200.0).abs() < 5.0, "expected ~200ms, got {lat}");
}
#[test]
fn delay_respects_target_concurrency() {
let config = AutoThrottleConfig {
target_concurrency: 4.0,
min_delay_ms: 0,
max_delay_ms: 60_000,
..Default::default()
};
let at = AutoThrottle::new(config);
// 400ms latency, target concurrency 4 → delay = 400/4 = 100ms
at.record_latency("fast.com", Duration::from_millis(400));
let delay = at.delay_for("fast.com");
assert!(
delay.as_millis() >= 90 && delay.as_millis() <= 110,
"expected ~100ms delay, got {:?}",
delay
);
}
#[test]
fn delay_clamped_to_min_max() {
let config = AutoThrottleConfig {
target_concurrency: 1.0,
min_delay_ms: 50,
max_delay_ms: 500,
..Default::default()
};
let at = AutoThrottle::new(config);
// Very fast server: 5ms → delay = 5/1 = 5ms, but min is 50
at.record_latency("fast.com", Duration::from_millis(5));
assert_eq!(at.delay_for("fast.com").as_millis(), 50);
// Very slow server: 2000ms → delay = 2000/1 = 2000ms, but max is 500
at.record_latency("slow.com", Duration::from_millis(2000));
assert_eq!(at.delay_for("slow.com").as_millis(), 500);
}
#[test]
fn disabled_returns_zero() {
let config = AutoThrottleConfig {
enabled: false,
..Default::default()
};
let at = AutoThrottle::new(config);
at.record_latency("example.com", Duration::from_millis(500));
assert_eq!(at.delay_for("example.com"), Duration::ZERO);
}
#[test]
fn different_domains_independent() {
let at = AutoThrottle::with_defaults();
at.record_latency("a.com", Duration::from_millis(100));
at.record_latency("b.com", Duration::from_millis(1000));
let a = at.latency_ms("a.com").unwrap();
let b = at.latency_ms("b.com").unwrap();
assert!(a < 200.0);
assert!(b > 800.0);
}
#[test]
fn eviction_at_capacity() {
let at = AutoThrottle::with_defaults();
for i in 0..=MAX_ENTRIES {
at.record_latency(&format!("domain-{i}.com"), Duration::from_millis(50));
}
assert!(at.len() <= MAX_ENTRIES);
}
#[test]
fn no_panic_on_zero_latency() {
let at = AutoThrottle::with_defaults();
at.record_latency("zero.com", Duration::ZERO);
let delay = at.delay_for("zero.com");
assert_eq!(delay, Duration::ZERO);
}
#[test]
fn no_panic_on_extreme_latency() {
let at = AutoThrottle::with_defaults();
at.record_latency("extreme.com", Duration::from_secs(3600));
// Should be clamped to max_delay_ms (60s)
let delay = at.delay_for("extreme.com");
assert!(delay <= Duration::from_millis(60_000));
}
#[test]
fn concurrent_recording_no_panic() {
use std::sync::Arc;
let at = Arc::new(AutoThrottle::with_defaults());
let handles: Vec<_> = (0..8)
.map(|t| {
let at = at.clone();
std::thread::spawn(move || {
for i in 0..100 {
at.record_latency(
"shared.com",
Duration::from_millis(50 + (t * 10) + i),
);
let _ = at.delay_for("shared.com");
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert!(at.latency_ms("shared.com").is_some());
}
}
}
#[cfg(feature = "auto_throttle")]
pub use inner::{AutoThrottle, AutoThrottleConfig};