tako/plugins/rate_limiter.rs
1#![cfg_attr(docsrs, doc(cfg(feature = "plugins")))]
2//! Rate limiting plugin using token bucket algorithm for controlling request frequency.
3//!
4//! This module provides rate limiting functionality to protect Tako applications from abuse
5//! and ensure fair resource usage. It implements a token bucket algorithm with per-IP tracking,
6//! configurable burst sizes, and automatic token replenishment. The plugin maintains state
7//! using a concurrent hash map and spawns a background task for token replenishment and
8//! cleanup of inactive buckets.
9//!
10//! The rate limiter plugin can be applied at both router-level (all routes) and route-level
11//! (specific routes), allowing different rate limits for different endpoints.
12//!
13//! # Examples
14//!
15//! ```rust
16//! use tako::plugins::rate_limiter::{RateLimiterPlugin, RateLimiterBuilder};
17//! use tako::plugins::TakoPlugin;
18//! use tako::router::Router;
19//! use tako::Method;
20//! use http::StatusCode;
21//!
22//! async fn handler(_req: tako::types::Request) -> &'static str {
23//! "Response"
24//! }
25//!
26//! async fn api_handler(_req: tako::types::Request) -> &'static str {
27//! "API response"
28//! }
29//!
30//! let mut router = Router::new();
31//!
32//! // Router-level: Basic rate limiting (50 req/sec, burst 100)
33//! let global_limiter = RateLimiterBuilder::new()
34//! .max_requests(100)
35//! .refill_rate(50)
36//! .refill_interval_ms(1000)
37//! .build();
38//! router.plugin(global_limiter);
39//!
40//! // Route-level: Stricter rate limiting for API (5 req/sec, burst 10)
41//! let api_route = router.route(Method::POST, "/api/sensitive", api_handler);
42//! let api_limiter = RateLimiterBuilder::new()
43//! .max_requests(10)
44//! .refill_rate(5)
45//! .refill_interval_ms(1000)
46//! .status(StatusCode::TOO_MANY_REQUESTS)
47//! .build();
48//! api_route.plugin(api_limiter);
49//! ```
50
51use std::{
52 net::{IpAddr, SocketAddr},
53 sync::{
54 Arc,
55 atomic::{AtomicBool, Ordering},
56 },
57 time::{Duration, Instant},
58};
59
60use anyhow::Result;
61use dashmap::DashMap;
62use http::StatusCode;
63use tokio::time;
64
65use crate::{
66 body::TakoBody, middleware::Next, plugins::TakoPlugin, responder::Responder, router::Router,
67 types::Request,
68};
69
70/// Rate limiter configuration parameters.
71///
72/// `Config` defines the behavior of the rate limiter including the maximum
73/// number of requests allowed (capacity), request quota replenishment rate,
74/// update frequency, and HTTP status code for rate limit violations. The rate limiter
75/// allows for burst traffic up to the max capacity while maintaining an average rate over time.
76///
77/// # Examples
78///
79/// ```rust
80/// use tako::plugins::rate_limiter::Config;
81/// use http::StatusCode;
82///
83/// // Allow 100 requests per second with burst up to 200
84/// let config = Config {
85/// max_requests: 200,
86/// refill_rate: 100,
87/// refill_interval_ms: 1000,
88/// status_on_limit: StatusCode::TOO_MANY_REQUESTS,
89/// };
90/// ```
91#[derive(Clone)]
92pub struct Config {
93 /// Maximum number of requests that can be made (capacity/burst limit).
94 pub max_requests: u32,
95 /// Number of requests to allow per refill interval.
96 pub refill_rate: u32,
97 /// Interval in milliseconds at which request quota is refilled.
98 pub refill_interval_ms: u64,
99 /// HTTP status code returned when the rate limit is exceeded.
100 pub status_on_limit: StatusCode,
101}
102
103impl Default for Config {
104 /// Provides sensible default rate limiting configuration: 60 requests per second.
105 fn default() -> Self {
106 Self {
107 max_requests: 60,
108 refill_rate: 60,
109 refill_interval_ms: 1000,
110 status_on_limit: StatusCode::TOO_MANY_REQUESTS,
111 }
112 }
113}
114
115/// Builder for configuring rate limiter settings with a fluent API.
116///
117/// `RateLimiterBuilder` provides a convenient way to construct rate limiter configurations
118/// using method chaining. The rate limiter works by maintaining a quota of available requests where:
119/// - `max_requests`: Maximum burst capacity (how many requests can be made at once)
120/// - `refill_rate`: Number of requests allowed per refill interval
121/// - `refill_interval_ms`: How often to refill the request quota (in milliseconds)
122///
123/// # Examples
124///
125/// ```rust
126/// use tako::plugins::rate_limiter::RateLimiterBuilder;
127/// use http::StatusCode;
128///
129/// // Allow 100 requests per second with burst up to 1000
130/// let high_traffic = RateLimiterBuilder::new()
131/// .max_requests(1000)
132/// .refill_rate(100)
133/// .refill_interval_ms(1000)
134/// .build();
135///
136/// // Allow 5 requests per second, max 10 burst
137/// let conservative = RateLimiterBuilder::new()
138/// .max_requests(10)
139/// .refill_rate(5)
140/// .refill_interval_ms(1000)
141/// .build();
142///
143/// // Allow 1 request per 500ms (2 per second)
144/// let strict = RateLimiterBuilder::new()
145/// .max_requests(1)
146/// .refill_rate(1)
147/// .refill_interval_ms(500)
148/// .build();
149/// ```
150pub struct RateLimiterBuilder(Config);
151
152impl RateLimiterBuilder {
153 /// Creates a new rate limiter configuration builder with default settings.
154 pub fn new() -> Self {
155 Self(Config::default())
156 }
157
158 /// Sets the maximum number of requests allowed (capacity/burst limit).
159 ///
160 /// This is the maximum burst size - how many requests can be made at once.
161 pub fn max_requests(mut self, n: u32) -> Self {
162 self.0.max_requests = n;
163 self
164 }
165
166 /// Sets how many requests to allow per refill interval.
167 ///
168 /// For example, `refill_rate(100)` with `refill_interval_ms(1000)` = 100 requests/second.
169 pub fn refill_rate(mut self, n: u32) -> Self {
170 self.0.refill_rate = n;
171 self
172 }
173
174 /// Sets the refill interval in milliseconds.
175 ///
176 /// Request quota is replenished at this interval. For example:
177 /// - `1000` = refill every second
178 /// - `500` = refill every 500ms (twice per second)
179 /// - `100` = refill every 100ms (10 times per second)
180 pub fn refill_interval_ms(mut self, ms: u64) -> Self {
181 self.0.refill_interval_ms = ms.max(1);
182 self
183 }
184
185 /// Sets the HTTP status code returned when rate limits are exceeded.
186 pub fn status(mut self, st: StatusCode) -> Self {
187 self.0.status_on_limit = st;
188 self
189 }
190
191 /// Builds the rate limiter plugin with the configured settings.
192 pub fn build(self) -> RateLimiterPlugin {
193 RateLimiterPlugin {
194 cfg: self.0,
195 store: Arc::new(DashMap::new()),
196 task_started: Arc::new(AtomicBool::new(false)),
197 }
198 }
199
200 /// Convenience method: Allow N requests per second with the same burst limit.
201 ///
202 /// This is a shorthand for setting max_requests, refill_rate, and refill_interval_ms
203 /// to create a simple "N requests per second" limit.
204 ///
205 /// # Examples
206 ///
207 /// ```rust
208 /// use tako::plugins::rate_limiter::RateLimiterBuilder;
209 ///
210 /// // Allow 100 requests per second
211 /// let limiter = RateLimiterBuilder::new()
212 /// .requests_per_second(100)
213 /// .build();
214 /// ```
215 pub fn requests_per_second(mut self, n: u32) -> Self {
216 self.0.max_requests = n;
217 self.0.refill_rate = n;
218 self.0.refill_interval_ms = 1000;
219 self
220 }
221
222 /// Convenience method: Allow N requests per minute with the same burst limit.
223 ///
224 /// # Examples
225 ///
226 /// ```rust
227 /// use tako::plugins::rate_limiter::RateLimiterBuilder;
228 ///
229 /// // Allow 600 requests per minute (10 per second)
230 /// let limiter = RateLimiterBuilder::new()
231 /// .requests_per_minute(600)
232 /// .build();
233 /// ```
234 pub fn requests_per_minute(mut self, n: u32) -> Self {
235 self.0.max_requests = n;
236 self.0.refill_rate = n;
237 self.0.refill_interval_ms = 60000;
238 self
239 }
240}
241
242/// Request quota tracker for rate limiting per IP address.
243///
244/// `Bucket` represents the state of request quota for a single IP address including
245/// the current number of available requests and last access time. Each IP address
246/// gets its own bucket for tracking rate limits independently.
247///
248/// # Examples
249///
250/// ```rust
251/// use std::time::Instant;
252///
253/// # struct Bucket {
254/// # available: f64,
255/// # last_seen: Instant,
256/// # }
257/// let bucket = Bucket {
258/// available: 60.0,
259/// last_seen: Instant::now(),
260/// };
261/// ```
262#[derive(Clone)]
263struct Bucket {
264 /// Current number of available requests remaining.
265 available: f64,
266 /// Last time this bucket was accessed for cleanup purposes.
267 last_seen: Instant,
268}
269
270/// Rate limiting plugin with per-IP request quota tracking.
271///
272/// `RateLimiterPlugin` provides comprehensive rate limiting functionality by tracking
273/// request quotas per IP address. It maintains per-IP state in a concurrent hash map,
274/// spawns a background task for quota replenishment and cleanup, and integrates with
275/// Tako's middleware system to enforce rate limits on incoming requests.
276///
277/// # Examples
278///
279/// ```rust
280/// use tako::plugins::rate_limiter::{RateLimiterPlugin, RateLimiterBuilder};
281/// use tako::plugins::TakoPlugin;
282/// use tako::router::Router;
283///
284/// // Create and configure rate limiter: 50 requests/sec, max 100 burst
285/// let limiter = RateLimiterBuilder::new()
286/// .max_requests(100)
287/// .refill_rate(50)
288/// .refill_interval_ms(1000)
289/// .build();
290///
291/// // Apply to router
292/// let mut router = Router::new();
293/// router.plugin(limiter);
294/// ```
295#[derive(Clone)]
296#[doc(alias = "rate_limiter")]
297#[doc(alias = "ratelimit")]
298pub struct RateLimiterPlugin {
299 /// Rate limiting configuration parameters.
300 cfg: Config,
301 /// Concurrent map storing token buckets for each IP address.
302 store: Arc<DashMap<IpAddr, Bucket>>,
303 /// Flag to ensure background task is spawned only once.
304 task_started: Arc<AtomicBool>,
305}
306
307impl TakoPlugin for RateLimiterPlugin {
308 /// Returns the plugin name for identification and debugging.
309 fn name(&self) -> &'static str {
310 "RateLimiterPlugin"
311 }
312
313 /// Sets up the rate limiter by registering middleware and starting background tasks.
314 fn setup(&self, router: &Router) -> Result<()> {
315 let cfg = self.cfg.clone();
316 let store = self.store.clone();
317
318 router.middleware(move |req, next| {
319 let cfg = cfg.clone();
320 let store = store.clone();
321 async move { retain(req, next, cfg, store).await }
322 });
323
324 // Only spawn the background task once per plugin instance
325 if !self.task_started.swap(true, Ordering::SeqCst) {
326 let cfg = self.cfg.clone();
327 let store = self.store.clone();
328
329 tokio::spawn(async move {
330 let mut tick = time::interval(Duration::from_millis(cfg.refill_interval_ms));
331 let requests_to_add = cfg.refill_rate as f64;
332 let purge_after = Duration::from_secs(300);
333 loop {
334 tick.tick().await;
335 let now = Instant::now();
336 store.retain(|_, b| {
337 b.available = (b.available + requests_to_add).min(cfg.max_requests as f64);
338 now.duration_since(b.last_seen) < purge_after
339 });
340 }
341 });
342 }
343
344 Ok(())
345 }
346}
347
348/// Middleware function that enforces rate limiting per IP address.
349///
350/// This function extracts the client IP address from the request, checks if they have
351/// available request quota remaining, and either allows the request to proceed or
352/// returns a rate limit error response. It updates quota state atomically and handles
353/// new clients by creating buckets with full quota.
354///
355/// # Examples
356///
357/// ```rust,no_run
358/// use tako::plugins::rate_limiter::{retain, Config};
359/// use tako::middleware::Next;
360/// use tako::types::Request;
361/// use std::sync::Arc;
362/// use dashmap::DashMap;
363///
364/// # async fn example() {
365/// # let req = Request::builder().body(tako::body::TakoBody::empty()).unwrap();
366/// # let next = Next { middlewares: Arc::new(vec![]), endpoint: Arc::new(|_| Box::pin(async { tako::types::Response::new(tako::body::TakoBody::empty()) })) };
367/// let config = Config::default();
368/// let store = Arc::new(DashMap::new());
369/// let response = retain(req, next, config, store).await;
370/// # }
371/// ```
372async fn retain(
373 req: Request,
374 next: Next,
375 cfg: Config,
376 store: Arc<DashMap<IpAddr, Bucket>>,
377) -> impl Responder {
378 let ip = req
379 .extensions()
380 .get::<SocketAddr>()
381 .map(|sa| sa.ip())
382 .unwrap_or(IpAddr::from([0, 0, 0, 0]));
383
384 let mut entry = store.entry(ip).or_insert_with(|| Bucket {
385 available: cfg.max_requests as f64,
386 last_seen: Instant::now(),
387 });
388
389 if entry.available < 1.0 {
390 return http::Response::builder()
391 .status(cfg.status_on_limit)
392 .body(TakoBody::empty())
393 .unwrap();
394 }
395 entry.available -= 1.0;
396 entry.last_seen = Instant::now();
397 drop(entry);
398
399 next.run(req).await
400}