dig_rpc/middleware/rate_limit.rs
1//! Per-(peer, bucket) token-bucket rate limiting.
2//!
3//! The server maintains a `HashMap<(PeerKey, RateBucket), Bucket>` where
4//! `Bucket = { tokens: f64, last_refill: Instant }`. Every request
5//! debits one token; refills are lazy (computed at check time from
6//! `fill_per_sec * elapsed`).
7//!
8//! A request that cannot debit returns [`RateLimitOutcome::Deny`]; the
9//! caller turns that into an `ErrorCode::RateLimited` JSON-RPC response
10//! with a `Retry-After` HTTP header.
11
12use std::collections::HashMap;
13use std::time::Instant;
14
15use parking_lot::Mutex;
16
17use crate::method::RateBucket;
18
19/// Opaque per-peer identifier the middleware uses as a hash key.
20///
21/// For internal servers, binaries typically use `SHA256(cert_spki)`; for
22/// public servers, they use `IP:port` or a hashed form thereof. The
23/// middleware doesn't care — it just hashes the bytes.
24pub type PeerKey = Vec<u8>;
25
26/// Per-bucket configuration.
27#[derive(Debug, Clone, Copy)]
28pub struct BucketSpec {
29 /// Tokens added per second.
30 pub fill_per_sec: f64,
31 /// Max tokens the bucket can hold.
32 pub capacity: f64,
33}
34
35/// Full rate-limit configuration: one [`BucketSpec`] per [`RateBucket`].
36#[derive(Debug, Clone)]
37pub struct RateLimitConfig {
38 /// Per-bucket fill rates.
39 pub buckets: HashMap<RateBucket, BucketSpec>,
40}
41
42impl RateLimitConfig {
43 /// Sane defaults suitable for a validator / fullnode.
44 pub fn defaults() -> Self {
45 let mut buckets = HashMap::new();
46 buckets.insert(
47 RateBucket::ReadLight,
48 BucketSpec {
49 fill_per_sec: 50.0,
50 capacity: 100.0,
51 },
52 );
53 buckets.insert(
54 RateBucket::ReadHeavy,
55 BucketSpec {
56 fill_per_sec: 5.0,
57 capacity: 10.0,
58 },
59 );
60 buckets.insert(
61 RateBucket::WriteLight,
62 BucketSpec {
63 fill_per_sec: 10.0,
64 capacity: 20.0,
65 },
66 );
67 buckets.insert(
68 RateBucket::WriteHeavy,
69 BucketSpec {
70 fill_per_sec: 1.0,
71 capacity: 5.0,
72 },
73 );
74 buckets.insert(
75 RateBucket::AdminOnly,
76 BucketSpec {
77 fill_per_sec: 1.0,
78 capacity: 3.0,
79 },
80 );
81 Self { buckets }
82 }
83}
84
85impl Default for RateLimitConfig {
86 fn default() -> Self {
87 Self::defaults()
88 }
89}
90
91/// Outcome of a rate-limit check.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum RateLimitOutcome {
94 /// Request is within budget; one token was debited.
95 Allow,
96 /// Request was denied. The bucket will refill after approximately
97 /// `retry_after` seconds.
98 Deny {
99 /// Suggested retry delay in whole seconds (minimum 1).
100 retry_after_secs: u64,
101 },
102}
103
104/// Mutable state for per-peer rate-limit tracking.
105///
106/// Cheap clone (`Arc` internally).
107#[derive(Debug, Clone)]
108pub struct RateLimitState {
109 inner: std::sync::Arc<Mutex<InnerState>>,
110 config: std::sync::Arc<RateLimitConfig>,
111}
112
113#[derive(Debug)]
114struct InnerState {
115 buckets: HashMap<(PeerKey, RateBucket), Bucket>,
116}
117
118#[derive(Debug)]
119struct Bucket {
120 tokens: f64,
121 last_refill: Instant,
122}
123
124impl RateLimitState {
125 /// Construct fresh state with the given config.
126 pub fn new(config: RateLimitConfig) -> Self {
127 Self {
128 inner: std::sync::Arc::new(Mutex::new(InnerState {
129 buckets: HashMap::new(),
130 })),
131 config: std::sync::Arc::new(config),
132 }
133 }
134
135 /// Attempt to debit one token from the `(peer, bucket)` bucket.
136 pub fn check(&self, peer: &PeerKey, bucket: RateBucket) -> RateLimitOutcome {
137 let spec = match self.config.buckets.get(&bucket) {
138 Some(s) => *s,
139 None => {
140 // Bucket not configured → allow (fail-open). Log a warning
141 // so the misconfiguration is visible.
142 tracing::warn!(?bucket, "rate bucket not configured; allowing");
143 return RateLimitOutcome::Allow;
144 }
145 };
146
147 let mut g = self.inner.lock();
148 let now = Instant::now();
149 let key = (peer.clone(), bucket);
150 let b = g.buckets.entry(key).or_insert(Bucket {
151 tokens: spec.capacity,
152 last_refill: now,
153 });
154 // Refill.
155 let elapsed = now.duration_since(b.last_refill).as_secs_f64();
156 b.tokens = (b.tokens + spec.fill_per_sec * elapsed).min(spec.capacity);
157 b.last_refill = now;
158
159 if b.tokens >= 1.0 {
160 b.tokens -= 1.0;
161 RateLimitOutcome::Allow
162 } else {
163 // Deficit is (1 - b.tokens) tokens; wait time in seconds.
164 let deficit = 1.0 - b.tokens;
165 let wait_s = (deficit / spec.fill_per_sec).ceil() as u64;
166 RateLimitOutcome::Deny {
167 retry_after_secs: wait_s.max(1),
168 }
169 }
170 }
171}
172
173/// Tower layer wrapper. v0.1 exposes the [`RateLimitState`] directly;
174/// servers call `.check()` in the request-handling path. A full `Tower`
175/// integration is a v0.2 enhancement.
176#[derive(Debug, Clone)]
177pub struct RateLimitLayer {
178 /// Rate limit state shared across request handlers.
179 pub state: RateLimitState,
180}
181
182impl RateLimitLayer {
183 /// Construct a layer from [`RateLimitState`].
184 pub fn new(state: RateLimitState) -> Self {
185 Self { state }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 /// **Proves:** the first request against a fresh bucket is allowed
194 /// (because the bucket is initialised to full capacity).
195 ///
196 /// **Why it matters:** Cold-start requests should not be rejected. If
197 /// the bucket started at zero and refilled over time, every client's
198 /// first call would be denied.
199 ///
200 /// **Catches:** a regression that initialises `tokens: 0.0`.
201 #[test]
202 fn first_request_allowed() {
203 let s = RateLimitState::new(RateLimitConfig::defaults());
204 let outcome = s.check(&vec![0; 32], RateBucket::ReadLight);
205 assert_eq!(outcome, RateLimitOutcome::Allow);
206 }
207
208 /// **Proves:** exhausting a bucket by calling faster than its fill rate
209 /// produces [`RateLimitOutcome::Deny`] with a non-zero `retry_after`.
210 ///
211 /// **Why it matters:** This is the core bucket behaviour. A broken
212 /// rate limiter (no-op) would let abusive clients flood the server.
213 ///
214 /// **Catches:** a regression where `tokens` never decrements, or where
215 /// `capacity` is treated as an infinite pool.
216 #[test]
217 fn exhaust_bucket_denies() {
218 let mut buckets = HashMap::new();
219 buckets.insert(
220 RateBucket::ReadLight,
221 BucketSpec {
222 fill_per_sec: 1.0,
223 capacity: 3.0,
224 },
225 );
226 let cfg = RateLimitConfig { buckets };
227 let s = RateLimitState::new(cfg);
228
229 for _ in 0..3 {
230 assert_eq!(
231 s.check(&vec![0; 32], RateBucket::ReadLight),
232 RateLimitOutcome::Allow
233 );
234 }
235 // Fourth request in immediate succession should be denied.
236 let outcome = s.check(&vec![0; 32], RateBucket::ReadLight);
237 match outcome {
238 RateLimitOutcome::Deny { retry_after_secs } => {
239 assert!(retry_after_secs >= 1);
240 }
241 _ => panic!("expected Deny"),
242 }
243 }
244
245 /// **Proves:** buckets are accounted per-peer — one peer exhausting
246 /// its budget does not affect another peer.
247 ///
248 /// **Why it matters:** A single malicious peer must not be able to
249 /// starve everyone else out via the shared server. Per-peer keying is
250 /// the required behavior.
251 ///
252 /// **Catches:** a regression where the HashMap key drops `peer` (e.g.,
253 /// a global counter).
254 #[test]
255 fn buckets_are_per_peer() {
256 let mut buckets = HashMap::new();
257 buckets.insert(
258 RateBucket::ReadLight,
259 BucketSpec {
260 fill_per_sec: 1.0,
261 capacity: 2.0,
262 },
263 );
264 let s = RateLimitState::new(RateLimitConfig { buckets });
265
266 let peer_a = vec![0xAA; 32];
267 let peer_b = vec![0xBB; 32];
268
269 // Exhaust peer_a.
270 for _ in 0..2 {
271 assert_eq!(
272 s.check(&peer_a, RateBucket::ReadLight),
273 RateLimitOutcome::Allow
274 );
275 }
276 assert!(matches!(
277 s.check(&peer_a, RateBucket::ReadLight),
278 RateLimitOutcome::Deny { .. }
279 ));
280 // peer_b is unaffected.
281 assert_eq!(
282 s.check(&peer_b, RateBucket::ReadLight),
283 RateLimitOutcome::Allow
284 );
285 }
286
287 /// **Proves:** an unconfigured bucket fails open (allows the request)
288 /// rather than rejecting it.
289 ///
290 /// **Why it matters:** If an operator forgets to configure a bucket,
291 /// the server should still function. Fail-closed would brick the
292 /// server on startup with typo-level config errors. A warning is
293 /// logged so the misconfiguration is visible.
294 ///
295 /// **Catches:** a regression that routes unknown buckets to Deny.
296 #[test]
297 fn unconfigured_bucket_allows() {
298 let s = RateLimitState::new(RateLimitConfig {
299 buckets: HashMap::new(),
300 });
301 let outcome = s.check(&vec![0; 32], RateBucket::ReadLight);
302 assert_eq!(outcome, RateLimitOutcome::Allow);
303 }
304}