Skip to main content

dig_rpc/middleware/
rate_limit.rs

1//! Per-(peer, bucket) token-bucket rate limiting.
2//!
3//! The server maintains a `HashMap<(PeerKey, RateBucket), Bucket>` where
4//! `Bucket = { tokens: f64, last_refill: Instant }`. Every request
5//! debits one token; refills are lazy (computed at check time from
6//! `fill_per_sec * elapsed`).
7//!
8//! A request that cannot debit returns [`RateLimitOutcome::Deny`]; the
9//! caller turns that into an `ErrorCode::RateLimited` JSON-RPC response
10//! with a `Retry-After` HTTP header.
11
12use std::collections::HashMap;
13use std::time::Instant;
14
15use parking_lot::Mutex;
16
17use crate::method::RateBucket;
18
19/// Opaque per-peer identifier the middleware uses as a hash key.
20///
21/// For internal servers, binaries typically use `SHA256(cert_spki)`; for
22/// public servers, they use `IP:port` or a hashed form thereof. The
23/// middleware doesn't care — it just hashes the bytes.
24pub type PeerKey = Vec<u8>;
25
26/// Per-bucket configuration.
27#[derive(Debug, Clone, Copy)]
28pub struct BucketSpec {
29    /// Tokens added per second.
30    pub fill_per_sec: f64,
31    /// Max tokens the bucket can hold.
32    pub capacity: f64,
33}
34
35/// Full rate-limit configuration: one [`BucketSpec`] per [`RateBucket`].
36#[derive(Debug, Clone)]
37pub struct RateLimitConfig {
38    /// Per-bucket fill rates.
39    pub buckets: HashMap<RateBucket, BucketSpec>,
40}
41
42impl RateLimitConfig {
43    /// Sane defaults suitable for a validator / fullnode.
44    pub fn defaults() -> Self {
45        let mut buckets = HashMap::new();
46        buckets.insert(
47            RateBucket::ReadLight,
48            BucketSpec {
49                fill_per_sec: 50.0,
50                capacity: 100.0,
51            },
52        );
53        buckets.insert(
54            RateBucket::ReadHeavy,
55            BucketSpec {
56                fill_per_sec: 5.0,
57                capacity: 10.0,
58            },
59        );
60        buckets.insert(
61            RateBucket::WriteLight,
62            BucketSpec {
63                fill_per_sec: 10.0,
64                capacity: 20.0,
65            },
66        );
67        buckets.insert(
68            RateBucket::WriteHeavy,
69            BucketSpec {
70                fill_per_sec: 1.0,
71                capacity: 5.0,
72            },
73        );
74        buckets.insert(
75            RateBucket::AdminOnly,
76            BucketSpec {
77                fill_per_sec: 1.0,
78                capacity: 3.0,
79            },
80        );
81        Self { buckets }
82    }
83}
84
85impl Default for RateLimitConfig {
86    fn default() -> Self {
87        Self::defaults()
88    }
89}
90
91/// Outcome of a rate-limit check.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum RateLimitOutcome {
94    /// Request is within budget; one token was debited.
95    Allow,
96    /// Request was denied. The bucket will refill after approximately
97    /// `retry_after` seconds.
98    Deny {
99        /// Suggested retry delay in whole seconds (minimum 1).
100        retry_after_secs: u64,
101    },
102}
103
104/// Mutable state for per-peer rate-limit tracking.
105///
106/// Cheap clone (`Arc` internally).
107#[derive(Debug, Clone)]
108pub struct RateLimitState {
109    inner: std::sync::Arc<Mutex<InnerState>>,
110    config: std::sync::Arc<RateLimitConfig>,
111}
112
113#[derive(Debug)]
114struct InnerState {
115    buckets: HashMap<(PeerKey, RateBucket), Bucket>,
116}
117
118#[derive(Debug)]
119struct Bucket {
120    tokens: f64,
121    last_refill: Instant,
122}
123
124impl RateLimitState {
125    /// Construct fresh state with the given config.
126    pub fn new(config: RateLimitConfig) -> Self {
127        Self {
128            inner: std::sync::Arc::new(Mutex::new(InnerState {
129                buckets: HashMap::new(),
130            })),
131            config: std::sync::Arc::new(config),
132        }
133    }
134
135    /// Attempt to debit one token from the `(peer, bucket)` bucket.
136    pub fn check(&self, peer: &PeerKey, bucket: RateBucket) -> RateLimitOutcome {
137        let spec = match self.config.buckets.get(&bucket) {
138            Some(s) => *s,
139            None => {
140                // Bucket not configured → allow (fail-open). Log a warning
141                // so the misconfiguration is visible.
142                tracing::warn!(?bucket, "rate bucket not configured; allowing");
143                return RateLimitOutcome::Allow;
144            }
145        };
146
147        let mut g = self.inner.lock();
148        let now = Instant::now();
149        let key = (peer.clone(), bucket);
150        let b = g.buckets.entry(key).or_insert(Bucket {
151            tokens: spec.capacity,
152            last_refill: now,
153        });
154        // Refill.
155        let elapsed = now.duration_since(b.last_refill).as_secs_f64();
156        b.tokens = (b.tokens + spec.fill_per_sec * elapsed).min(spec.capacity);
157        b.last_refill = now;
158
159        if b.tokens >= 1.0 {
160            b.tokens -= 1.0;
161            RateLimitOutcome::Allow
162        } else {
163            // Deficit is (1 - b.tokens) tokens; wait time in seconds.
164            let deficit = 1.0 - b.tokens;
165            let wait_s = (deficit / spec.fill_per_sec).ceil() as u64;
166            RateLimitOutcome::Deny {
167                retry_after_secs: wait_s.max(1),
168            }
169        }
170    }
171}
172
173/// Tower layer wrapper. v0.1 exposes the [`RateLimitState`] directly;
174/// servers call `.check()` in the request-handling path. A full `Tower`
175/// integration is a v0.2 enhancement.
176#[derive(Debug, Clone)]
177pub struct RateLimitLayer {
178    /// Rate limit state shared across request handlers.
179    pub state: RateLimitState,
180}
181
182impl RateLimitLayer {
183    /// Construct a layer from [`RateLimitState`].
184    pub fn new(state: RateLimitState) -> Self {
185        Self { state }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    /// **Proves:** the first request against a fresh bucket is allowed
194    /// (because the bucket is initialised to full capacity).
195    ///
196    /// **Why it matters:** Cold-start requests should not be rejected. If
197    /// the bucket started at zero and refilled over time, every client's
198    /// first call would be denied.
199    ///
200    /// **Catches:** a regression that initialises `tokens: 0.0`.
201    #[test]
202    fn first_request_allowed() {
203        let s = RateLimitState::new(RateLimitConfig::defaults());
204        let outcome = s.check(&vec![0; 32], RateBucket::ReadLight);
205        assert_eq!(outcome, RateLimitOutcome::Allow);
206    }
207
208    /// **Proves:** exhausting a bucket by calling faster than its fill rate
209    /// produces [`RateLimitOutcome::Deny`] with a non-zero `retry_after`.
210    ///
211    /// **Why it matters:** This is the core bucket behaviour. A broken
212    /// rate limiter (no-op) would let abusive clients flood the server.
213    ///
214    /// **Catches:** a regression where `tokens` never decrements, or where
215    /// `capacity` is treated as an infinite pool.
216    #[test]
217    fn exhaust_bucket_denies() {
218        let mut buckets = HashMap::new();
219        buckets.insert(
220            RateBucket::ReadLight,
221            BucketSpec {
222                fill_per_sec: 1.0,
223                capacity: 3.0,
224            },
225        );
226        let cfg = RateLimitConfig { buckets };
227        let s = RateLimitState::new(cfg);
228
229        for _ in 0..3 {
230            assert_eq!(
231                s.check(&vec![0; 32], RateBucket::ReadLight),
232                RateLimitOutcome::Allow
233            );
234        }
235        // Fourth request in immediate succession should be denied.
236        let outcome = s.check(&vec![0; 32], RateBucket::ReadLight);
237        match outcome {
238            RateLimitOutcome::Deny { retry_after_secs } => {
239                assert!(retry_after_secs >= 1);
240            }
241            _ => panic!("expected Deny"),
242        }
243    }
244
245    /// **Proves:** buckets are accounted per-peer — one peer exhausting
246    /// its budget does not affect another peer.
247    ///
248    /// **Why it matters:** A single malicious peer must not be able to
249    /// starve everyone else out via the shared server. Per-peer keying is
250    /// the required behavior.
251    ///
252    /// **Catches:** a regression where the HashMap key drops `peer` (e.g.,
253    /// a global counter).
254    #[test]
255    fn buckets_are_per_peer() {
256        let mut buckets = HashMap::new();
257        buckets.insert(
258            RateBucket::ReadLight,
259            BucketSpec {
260                fill_per_sec: 1.0,
261                capacity: 2.0,
262            },
263        );
264        let s = RateLimitState::new(RateLimitConfig { buckets });
265
266        let peer_a = vec![0xAA; 32];
267        let peer_b = vec![0xBB; 32];
268
269        // Exhaust peer_a.
270        for _ in 0..2 {
271            assert_eq!(
272                s.check(&peer_a, RateBucket::ReadLight),
273                RateLimitOutcome::Allow
274            );
275        }
276        assert!(matches!(
277            s.check(&peer_a, RateBucket::ReadLight),
278            RateLimitOutcome::Deny { .. }
279        ));
280        // peer_b is unaffected.
281        assert_eq!(
282            s.check(&peer_b, RateBucket::ReadLight),
283            RateLimitOutcome::Allow
284        );
285    }
286
287    /// **Proves:** an unconfigured bucket fails open (allows the request)
288    /// rather than rejecting it.
289    ///
290    /// **Why it matters:** If an operator forgets to configure a bucket,
291    /// the server should still function. Fail-closed would brick the
292    /// server on startup with typo-level config errors. A warning is
293    /// logged so the misconfiguration is visible.
294    ///
295    /// **Catches:** a regression that routes unknown buckets to Deny.
296    #[test]
297    fn unconfigured_bucket_allows() {
298        let s = RateLimitState::new(RateLimitConfig {
299            buckets: HashMap::new(),
300        });
301        let outcome = s.check(&vec![0; 32], RateBucket::ReadLight);
302        assert_eq!(outcome, RateLimitOutcome::Allow);
303    }
304}