tako/plugins/
rate_limiter.rs

1#![cfg_attr(docsrs, doc(cfg(feature = "plugins")))]
2//! Rate limiting plugin using token bucket algorithm for controlling request frequency.
3//!
4//! This module provides rate limiting functionality to protect Tako applications from abuse
5//! and ensure fair resource usage. It implements a token bucket algorithm with per-IP tracking,
6//! configurable burst sizes, and automatic token replenishment. The plugin maintains state
7//! using a concurrent hash map and spawns a background task for token replenishment and
8//! cleanup of inactive buckets.
9//!
10//! The rate limiter plugin can be applied at both router-level (all routes) and route-level
11//! (specific routes), allowing different rate limits for different endpoints.
12//!
13//! # Examples
14//!
15//! ```rust
16//! use tako::plugins::rate_limiter::{RateLimiterPlugin, RateLimiterBuilder};
17//! use tako::plugins::TakoPlugin;
18//! use tako::router::Router;
19//! use tako::Method;
20//! use http::StatusCode;
21//!
22//! async fn handler(_req: tako::types::Request) -> &'static str {
23//!     "Response"
24//! }
25//!
26//! async fn api_handler(_req: tako::types::Request) -> &'static str {
27//!     "API response"
28//! }
29//!
30//! let mut router = Router::new();
31//!
32//! // Router-level: Basic rate limiting (50 req/sec, burst 100)
33//! let global_limiter = RateLimiterBuilder::new()
34//!     .max_requests(100)
35//!     .refill_rate(50)
36//!     .refill_interval_ms(1000)
37//!     .build();
38//! router.plugin(global_limiter);
39//!
40//! // Route-level: Stricter rate limiting for API (5 req/sec, burst 10)
41//! let api_route = router.route(Method::POST, "/api/sensitive", api_handler);
42//! let api_limiter = RateLimiterBuilder::new()
43//!     .max_requests(10)
44//!     .refill_rate(5)
45//!     .refill_interval_ms(1000)
46//!     .status(StatusCode::TOO_MANY_REQUESTS)
47//!     .build();
48//! api_route.plugin(api_limiter);
49//! ```
50
51use std::{
52  net::{IpAddr, SocketAddr},
53  sync::{
54    Arc,
55    atomic::{AtomicBool, Ordering},
56  },
57  time::{Duration, Instant},
58};
59
60use anyhow::Result;
61use dashmap::DashMap;
62use http::StatusCode;
63use tokio::time;
64
65use crate::{
66  body::TakoBody, middleware::Next, plugins::TakoPlugin, responder::Responder, router::Router,
67  types::Request,
68};
69
70/// Rate limiter configuration parameters.
71///
72/// `Config` defines the behavior of the rate limiter including the maximum
73/// number of requests allowed (capacity), request quota replenishment rate,
74/// update frequency, and HTTP status code for rate limit violations. The rate limiter
75/// allows for burst traffic up to the max capacity while maintaining an average rate over time.
76///
77/// # Examples
78///
79/// ```rust
80/// use tako::plugins::rate_limiter::Config;
81/// use http::StatusCode;
82///
83/// // Allow 100 requests per second with burst up to 200
84/// let config = Config {
85///     max_requests: 200,
86///     refill_rate: 100,
87///     refill_interval_ms: 1000,
88///     status_on_limit: StatusCode::TOO_MANY_REQUESTS,
89/// };
90/// ```
91#[derive(Clone)]
92pub struct Config {
93  /// Maximum number of requests that can be made (capacity/burst limit).
94  pub max_requests: u32,
95  /// Number of requests to allow per refill interval.
96  pub refill_rate: u32,
97  /// Interval in milliseconds at which request quota is refilled.
98  pub refill_interval_ms: u64,
99  /// HTTP status code returned when the rate limit is exceeded.
100  pub status_on_limit: StatusCode,
101}
102
103impl Default for Config {
104  /// Provides sensible default rate limiting configuration: 60 requests per second.
105  fn default() -> Self {
106    Self {
107      max_requests: 60,
108      refill_rate: 60,
109      refill_interval_ms: 1000,
110      status_on_limit: StatusCode::TOO_MANY_REQUESTS,
111    }
112  }
113}
114
115/// Builder for configuring rate limiter settings with a fluent API.
116///
117/// `RateLimiterBuilder` provides a convenient way to construct rate limiter configurations
118/// using method chaining. The rate limiter works by maintaining a quota of available requests where:
119/// - `max_requests`: Maximum burst capacity (how many requests can be made at once)
120/// - `refill_rate`: Number of requests allowed per refill interval
121/// - `refill_interval_ms`: How often to refill the request quota (in milliseconds)
122///
123/// # Examples
124///
125/// ```rust
126/// use tako::plugins::rate_limiter::RateLimiterBuilder;
127/// use http::StatusCode;
128///
129/// // Allow 100 requests per second with burst up to 1000
130/// let high_traffic = RateLimiterBuilder::new()
131///     .max_requests(1000)
132///     .refill_rate(100)
133///     .refill_interval_ms(1000)
134///     .build();
135///
136/// // Allow 5 requests per second, max 10 burst
137/// let conservative = RateLimiterBuilder::new()
138///     .max_requests(10)
139///     .refill_rate(5)
140///     .refill_interval_ms(1000)
141///     .build();
142///
143/// // Allow 1 request per 500ms (2 per second)
144/// let strict = RateLimiterBuilder::new()
145///     .max_requests(1)
146///     .refill_rate(1)
147///     .refill_interval_ms(500)
148///     .build();
149/// ```
150pub struct RateLimiterBuilder(Config);
151
152impl RateLimiterBuilder {
153  /// Creates a new rate limiter configuration builder with default settings.
154  pub fn new() -> Self {
155    Self(Config::default())
156  }
157
158  /// Sets the maximum number of requests allowed (capacity/burst limit).
159  ///
160  /// This is the maximum burst size - how many requests can be made at once.
161  pub fn max_requests(mut self, n: u32) -> Self {
162    self.0.max_requests = n;
163    self
164  }
165
166  /// Sets how many requests to allow per refill interval.
167  ///
168  /// For example, `refill_rate(100)` with `refill_interval_ms(1000)` = 100 requests/second.
169  pub fn refill_rate(mut self, n: u32) -> Self {
170    self.0.refill_rate = n;
171    self
172  }
173
174  /// Sets the refill interval in milliseconds.
175  ///
176  /// Request quota is replenished at this interval. For example:
177  /// - `1000` = refill every second
178  /// - `500` = refill every 500ms (twice per second)
179  /// - `100` = refill every 100ms (10 times per second)
180  pub fn refill_interval_ms(mut self, ms: u64) -> Self {
181    self.0.refill_interval_ms = ms.max(1);
182    self
183  }
184
185  /// Sets the HTTP status code returned when rate limits are exceeded.
186  pub fn status(mut self, st: StatusCode) -> Self {
187    self.0.status_on_limit = st;
188    self
189  }
190
191  /// Builds the rate limiter plugin with the configured settings.
192  pub fn build(self) -> RateLimiterPlugin {
193    RateLimiterPlugin {
194      cfg: self.0,
195      store: Arc::new(DashMap::new()),
196      task_started: Arc::new(AtomicBool::new(false)),
197    }
198  }
199
200  /// Convenience method: Allow N requests per second with the same burst limit.
201  ///
202  /// This is a shorthand for setting max_requests, refill_rate, and refill_interval_ms
203  /// to create a simple "N requests per second" limit.
204  ///
205  /// # Examples
206  ///
207  /// ```rust
208  /// use tako::plugins::rate_limiter::RateLimiterBuilder;
209  ///
210  /// // Allow 100 requests per second
211  /// let limiter = RateLimiterBuilder::new()
212  ///     .requests_per_second(100)
213  ///     .build();
214  /// ```
215  pub fn requests_per_second(mut self, n: u32) -> Self {
216    self.0.max_requests = n;
217    self.0.refill_rate = n;
218    self.0.refill_interval_ms = 1000;
219    self
220  }
221
222  /// Convenience method: Allow N requests per minute with the same burst limit.
223  ///
224  /// # Examples
225  ///
226  /// ```rust
227  /// use tako::plugins::rate_limiter::RateLimiterBuilder;
228  ///
229  /// // Allow 600 requests per minute (10 per second)
230  /// let limiter = RateLimiterBuilder::new()
231  ///     .requests_per_minute(600)
232  ///     .build();
233  /// ```
234  pub fn requests_per_minute(mut self, n: u32) -> Self {
235    self.0.max_requests = n;
236    self.0.refill_rate = n;
237    self.0.refill_interval_ms = 60000;
238    self
239  }
240}
241
242/// Request quota tracker for rate limiting per IP address.
243///
244/// `Bucket` represents the state of request quota for a single IP address including
245/// the current number of available requests and last access time. Each IP address
246/// gets its own bucket for tracking rate limits independently.
247///
248/// # Examples
249///
250/// ```rust
251/// use std::time::Instant;
252///
253/// # struct Bucket {
254/// #     available: f64,
255/// #     last_seen: Instant,
256/// # }
257/// let bucket = Bucket {
258///     available: 60.0,
259///     last_seen: Instant::now(),
260/// };
261/// ```
262#[derive(Clone)]
263struct Bucket {
264  /// Current number of available requests remaining.
265  available: f64,
266  /// Last time this bucket was accessed for cleanup purposes.
267  last_seen: Instant,
268}
269
270/// Rate limiting plugin with per-IP request quota tracking.
271///
272/// `RateLimiterPlugin` provides comprehensive rate limiting functionality by tracking
273/// request quotas per IP address. It maintains per-IP state in a concurrent hash map,
274/// spawns a background task for quota replenishment and cleanup, and integrates with
275/// Tako's middleware system to enforce rate limits on incoming requests.
276///
277/// # Examples
278///
279/// ```rust
280/// use tako::plugins::rate_limiter::{RateLimiterPlugin, RateLimiterBuilder};
281/// use tako::plugins::TakoPlugin;
282/// use tako::router::Router;
283///
284/// // Create and configure rate limiter: 50 requests/sec, max 100 burst
285/// let limiter = RateLimiterBuilder::new()
286///     .max_requests(100)
287///     .refill_rate(50)
288///     .refill_interval_ms(1000)
289///     .build();
290///
291/// // Apply to router
292/// let mut router = Router::new();
293/// router.plugin(limiter);
294/// ```
295#[derive(Clone)]
296#[doc(alias = "rate_limiter")]
297#[doc(alias = "ratelimit")]
298pub struct RateLimiterPlugin {
299  /// Rate limiting configuration parameters.
300  cfg: Config,
301  /// Concurrent map storing token buckets for each IP address.
302  store: Arc<DashMap<IpAddr, Bucket>>,
303  /// Flag to ensure background task is spawned only once.
304  task_started: Arc<AtomicBool>,
305}
306
307impl TakoPlugin for RateLimiterPlugin {
308  /// Returns the plugin name for identification and debugging.
309  fn name(&self) -> &'static str {
310    "RateLimiterPlugin"
311  }
312
313  /// Sets up the rate limiter by registering middleware and starting background tasks.
314  fn setup(&self, router: &Router) -> Result<()> {
315    let cfg = self.cfg.clone();
316    let store = self.store.clone();
317
318    router.middleware(move |req, next| {
319      let cfg = cfg.clone();
320      let store = store.clone();
321      async move { retain(req, next, cfg, store).await }
322    });
323
324    // Only spawn the background task once per plugin instance
325    if !self.task_started.swap(true, Ordering::SeqCst) {
326      let cfg = self.cfg.clone();
327      let store = self.store.clone();
328
329      tokio::spawn(async move {
330        let mut tick = time::interval(Duration::from_millis(cfg.refill_interval_ms));
331        let requests_to_add = cfg.refill_rate as f64;
332        let purge_after = Duration::from_secs(300);
333        loop {
334          tick.tick().await;
335          let now = Instant::now();
336          store.retain(|_, b| {
337            b.available = (b.available + requests_to_add).min(cfg.max_requests as f64);
338            now.duration_since(b.last_seen) < purge_after
339          });
340        }
341      });
342    }
343
344    Ok(())
345  }
346}
347
348/// Middleware function that enforces rate limiting per IP address.
349///
350/// This function extracts the client IP address from the request, checks if they have
351/// available request quota remaining, and either allows the request to proceed or
352/// returns a rate limit error response. It updates quota state atomically and handles
353/// new clients by creating buckets with full quota.
354///
355/// # Examples
356///
357/// ```rust,no_run
358/// use tako::plugins::rate_limiter::{retain, Config};
359/// use tako::middleware::Next;
360/// use tako::types::Request;
361/// use std::sync::Arc;
362/// use dashmap::DashMap;
363///
364/// # async fn example() {
365/// # let req = Request::builder().body(tako::body::TakoBody::empty()).unwrap();
366/// # let next = Next { middlewares: Arc::new(vec![]), endpoint: Arc::new(|_| Box::pin(async { tako::types::Response::new(tako::body::TakoBody::empty()) })) };
367/// let config = Config::default();
368/// let store = Arc::new(DashMap::new());
369/// let response = retain(req, next, config, store).await;
370/// # }
371/// ```
372async fn retain(
373  req: Request,
374  next: Next,
375  cfg: Config,
376  store: Arc<DashMap<IpAddr, Bucket>>,
377) -> impl Responder {
378  let ip = req
379    .extensions()
380    .get::<SocketAddr>()
381    .map(|sa| sa.ip())
382    .unwrap_or(IpAddr::from([0, 0, 0, 0]));
383
384  let mut entry = store.entry(ip).or_insert_with(|| Bucket {
385    available: cfg.max_requests as f64,
386    last_seen: Instant::now(),
387  });
388
389  if entry.available < 1.0 {
390    return http::Response::builder()
391      .status(cfg.status_on_limit)
392      .body(TakoBody::empty())
393      .unwrap();
394  }
395  entry.available -= 1.0;
396  entry.last_seen = Instant::now();
397  drop(entry);
398
399  next.run(req).await
400}