# quota
A high-performance in-memory rate limiter for Rust, using a mix of Leaky Token Bucket & GCRA.
The main `QuotaPool` is concurrent by default. It uses `DashMap` for key routing and a small per-key mutex for exact quota state, so it can be shared as `Arc<QuotaPool<_>>` without an outer lock.
We provide 3 essential primitives: `Quota`, `QuotaPolicy`, and `QuotaPool` that combines both of them.
`QuotaPool` defaults to `QuotaKey`, an owned heap `String`.
If your quota identity is already a compact ID, interned symbol, or another key shape,
use `QuotaPool<K>` and construct it with `QuotaPool::<K>::with_key_type(...)`.
Use `QuotaPool::with_capacity(...)` and `pool.insert_keys(...)` when the key set is known ahead of traffic; that keeps the hot request path on borrowed-key lookup instead of insertion.
Example use of the simple `Quota` (A simple 8-byte number in memory):
```rust
use quota::Quota;
fn main() {
let quota = Quota::with_initial_tokens(10);
let mut results = vec![];
for _ in 0..100 {
results.push(quota.consume(1)); // 10..9..8..7..6..5..4..3..2..1..Err
}
assert_eq!(results.iter().filter(|r| r.is_ok()).count(), 10); // 10 Ok: 10..=1
assert_eq!(results.iter().filter(|r| r.is_err()).count(), 90); // 90 Err: Rate-limited
}
```
Example use of applying `QuotaPolicy` with a maximum capacity and `RefillRate`:
```rust
use quota::{Quota, QuotaPolicy, RefillRate};
fn main() {
let policy = QuotaPolicy::new()
.set_capacity(10.0) // Maximum Capacity to apply to a Quota per tick
.set_refill_rate(RefillRate::per_micro(100.0)); // Refill Rate to apply to a Quota per tick (0.1T/ns)
let quota = Quota::with_initial_tokens(10);
let mut results = vec![];
for _ in 0..100 {
policy.tick(1, &mut quota); // dt = 1ns => 1ns*(0.1T/ns) = 0.1 tokens per tick() call
results.push(quota.consume(1));
}
assert_eq!(results.iter().filter(|r| r.is_ok()).count(), 19);
assert_eq!(results.iter().filter(|r| r.is_err()).count(), 81);
}
```
And now the main `QuotaPool`:
```rust
use quota::{RefillRate, QuotaPolicy, QuotaPool};
use std::sync::Arc;
use std::time::Duration;
fn main() {
let policy = QuotaPolicy::new()
.set_capacity(10.0)
.set_refill_rate(RefillRate::per_sec(3))
.set_refill_interval(Duration::from_secs(1)); // It will not tick till this amount passes between every tick
/// QuotaPool uses the System's own clock and ticks the quotas with the time difference between every tick.
/// A "QuotaPolicy::set_refill_interval" would prevent a tick from happening if internal last_tick_time < refill_interval
let pool = Arc::new(QuotaPool::with_capacity(policy, 10, 1));
let mut results = vec![];
for _ in 0..100 {
results.push(pool.consume("testing", 1));
}
}
```
## Technical
This library's QuotaPool specifically (the most complex primitive)
has been tested in countless different configurations: MPSC methods (across different runtimes), Global Std Mutex, to ShardedParking Mutex, to thread-routing, and finally to Dashmap + a per-quota lock.
This benchmark measures the `QuotaPool::consume` method in single-key performance (Hot)
and multi-key performance (Spread) using various attempted implementations:
```
mode hot ns/op hot Mops/s spread ns/op spread Mops/s
global_std_mutex 209.14 4.78 267.92 3.73
dash_entry_lock 613.79 1.63 8.39 119.12
sharded_parking_mutex 814.90 1.23 49.92 20.03
mpsc_std_sync_channel 1589.35 0.63 1660.51 0.60
mpsc_flume_unbounded 1615.93 0.62 1888.52 0.53
owner_thread_routing 1641.45 0.61 965.08 1.04
mpsc_crossbeam_unbounded 1673.91 0.60 1663.72 0.60
mpsc_std_channel 1686.16 0.59 1743.96 0.57
mpsc_flume_bounded 1696.92 0.59 1625.05 0.62
mpsc_tokio_unbounded 1764.91 0.57 1682.05 0.59
mpsc_crossbeam_bounded 1767.41 0.57 1622.89 0.62
mpsc_tokio_bounded_blocking 2110.07 0.47 2024.87 0.49
```
MPSC is clearly out of this discussion. I also attempted an "Atomic" per quota,
but since Refilling + Consuming was not exactly an atomic operation, I had to do a multi-atomic structure.
(re-ran benchmark with considered methods)
```
mode hot ns/op hot Mops/s spread ns/op spread Mops/s
atomic_multi_relaxed_refill 156.97 6.37 116.87 8.56
global_std_mutex 218.57 4.58 259.62 3.85
dash_entry_lock 461.07 2.17 9.25 108.10
sharded_parking_mutex 685.26 1.46 43.83 22.81
owner_thread_routing 2238.35 0.45 973.36 1.03
```
Multi-atomic was very stable in both hot/spread.
But "Hot" is not a realistic workload. Because real-world traffic is skewed and distributed, Spread performance is the only thing that actually matters here.
This is exactly why we chose **Dash + Entry Lock** for `quota`'s `QuotaPool`. It's orders of magnitudes faster than our hottest implementation, and unless more than 26.1% of your requests are hitting the same exact key (break-even), Dashmap + Entry Lock is technically the superior choice here for its traffic-spread performance.
In some cases, that 'blazingly fast' spread traffic performance (12x than atomic performance) can tank certain **attacks**: e.g. If your API is getting DdoS'd, and you are rate-limiting by IP, you are going to get attacked from multiple other IPs as well, not one. It's common for DdoS attacks to come from multi-IP botnets. Same thing goes if you are rate-limiting malicious user IDs, you're likely to get attacked by multiple other user IDs as well, either by the same individual or your service generally needs protection from countless traffic by actual users.
Since `quota` sits at the application layer, you can do more nuanced domain-specific rate-limiting than what a firewall or even a request gateway would.
## Axum Example
```rust
use axum::{
Router,
extract::{Path, State},
http::StatusCode,
routing::get,
};
use quota::{QuotaPolicy, QuotaPool, RefillRate};
use std::{net::SocketAddr, sync::Arc};
type Limiter = Arc<QuotaPool<String>>;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let policy = QuotaPolicy::new()
.set_capacity(10.0)
.set_refill_rate(RefillRate::per_sec(1));
let limiter = Arc::new(QuotaPool::new(policy, 10));
let app = Router::new()
.route("/{key}", get(limit))
.with_state(limiter);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
axum::serve(tokio::net::TcpListener::bind(addr).await?, app).await?;
Ok(())
}
async fn limit(State(limiter): State<Limiter>, Path(key): Path<String>) -> StatusCode {
match limiter.consume(key.as_str(), 1) {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::TOO_MANY_REQUESTS,
}
}
```