Crate brakes

Crate brakes 

Source
Expand description

§brakes

brakes is a distributed rate limiting library. It offers a number of rate limiting algorithms, supports multiple caching backends (local memory, Redis, Memcached), and includes a set of middlewares for popular Rust web frameworks like Actix Web and Axum.

§Features

  • Support for multiple rate limiting algorithms:
    • Fixed window
    • Sliding window counter
    • Token bucket
    • Leaky bucket
  • Configurable caching backends:
    • Local memory
    • Memcache
    • Redis
  • Middleware for popular frameworks (see examples):
  • Retry strategies

§Usage

§You can use RateLimiter directly

use std::time::Duration;

use brakes::{
    backend::local::Memory,
    types::{leaky_bucket::LeakyBucket, RateLimiterError},
    RateLimiter,
};

fn main() {
    let limiter = RateLimiter::builder()
        .with_backend(Memory::new())
        .with_limiter(LeakyBucket::new(100, Duration::from_secs(10)))
        .build();

    let result = limiter.is_ratelimited("key");
    match &result {
        Ok(()) => println!("allowed"),
        Err(RateLimiterError::RateExceeded) => println!("rate exceeded"),
        Err(e) => println!("error {:?}", e),
    }
    
    assert!(result.is_ok());
}

§Built-in middlewares

§Actixweb:

Available on crate feature actixweb only

use std::time::Duration;

use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder};
use brakes::{
    backend::memcache::MemCache, middleware::actixweb::ActixwebRateLimiter,
    types::token_bucket::TokenBucket, RateLimiter,
};

#[get("/")]
async fn hello() -> impl Responder {
    HttpResponse::Ok().body("Hello world!")
}

#[post("/")]
async fn echo(req_body: String) -> impl Responder {
    HttpResponse::Ok().body(req_body)
}

#[actix_web::main]
async fn main() -> Result<(), std::io::Error> {
    let cache = memcache::connect("memcache://127.0.0.1:11211").unwrap();

    let hello_limiter = RateLimiter::builder()
        .with_backend(MemCache::new(cache.clone()))
        .with_limiter(TokenBucket::new(2, Duration::from_secs(2)))
        .build();

    let hello_middleware = ActixwebRateLimiter::new(hello_limiter);

    let echo_limiter = RateLimiter::builder()
        .with_backend(MemCache::new(cache))
        .with_limiter(TokenBucket::new(5, Duration::from_secs(1)))
        .build();

    let echo_middleware = ActixwebRateLimiter::new(echo_limiter)
        .with_callback(|_| HttpResponse::TooManyRequests().body("too many requests"))
        .with_key_extractor(|req| {
            req.headers()
                .get("x-forwarded-for")
                .unwrap()
                .to_str()
                .unwrap()
                .to_string()
        });

    HttpServer::new(move || {
        let hello_middleware = hello_middleware.clone();
        let echo_middleware = echo_middleware.clone();

        App::new()
            .service(web::scope("hello").wrap(hello_middleware).service(hello))
            .service(web::scope("echo").wrap(echo_middleware).service(echo))
    })
    .bind(("127.0.0.1", 8080))?
    .run()
    .await
}
§Axum

Available on crate feature tower only

Axum doesn’t have a middleware system of its own, instead it relies on tower middleware

use std::{net::SocketAddr, time::Duration};

use axum::{body::Body, extract::ConnectInfo, routing::get, Router};
use brakes::{
    backend::redis::RedisBackend, middleware::tower::TowerRateLimiterLayer,
    types::fixed_window::FixedWindow, RateLimiter,
};

async fn hello() -> &'static str {
    "Hello, World!"
}

async fn hi() -> &'static str {
    "hi"
}

#[tokio::main]
async fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").unwrap();
    let pool = r2d2::Pool::builder()
        .connection_timeout(Duration::from_secs(1))
        .build(client)
        .unwrap();

    let hello_limiter = RateLimiter::builder()
        .with_backend(RedisBackend::new(pool.clone()))
        .with_limiter(FixedWindow::new(5, Duration::from_secs(10)))
        .build();

    let hello_layer =
        // ::default()  uses the default callback
        TowerRateLimiterLayer::default(hello_limiter, |r: &axum::http::Request<Body>| {
            // key extractor
            r.headers()
                .get("x-forwarded-for")
                .unwrap()
                .to_str()
                .unwrap()
                .to_string()
        });

    let hi_limiter = RateLimiter::builder()
        .with_backend(RedisBackend::new(pool))
        .with_limiter(FixedWindow::new(5, Duration::from_secs(10)))
        .build();

    let hi_layer = TowerRateLimiterLayer::new(
        hi_limiter,
        // callback for RateExceeded
        |_| {
            axum::response::Response::builder()
                .status(429)
                .body(Body::from("too many requests"))
                .unwrap()
        },
        // key extractor
        |r: &axum::http::Request<Body>| {
            r.extensions()
                .get::<ConnectInfo<SocketAddr>>()
                .unwrap()
                .ip()
                .to_string()
        },
    );

    let app = Router::new()
        .route("/hello", get(hello).layer(hello_layer))
        .route("/hi", get(hi).layer(hi_layer));

    let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
        .await
        .unwrap();
    axum::serve(
        listener,
        app.into_make_service_with_connect_info::<SocketAddr>(),
    )
    .await
    .unwrap();
}

§Cache Backends

Cache backends are used to store LimiterInstances. A LimiterInstance contains information about a single rate limiter instance’s (a user’s or ip’s) usage.

§Memory

Uses an in memory HashMap to store keys and values (LimiterInstances).

It can be used safely across threads since it utilizes a Mutex, but it can’t be used across processes or in a distributed fashion.

let memory_cache = Memory::new();
let limiter = RateLimiter::builder()
    .with_backend(memory_cache)
    .with_limiter(...)
    .build();

§Memcache

Available on crate feature memcache only

Uses a memcache client to store LimiterInstance data.

Writes to memcache are done using the CAS (check-and-set) command to ensure conncurent writes won’t conflict.

If there’s a conflict (data related to a single LimiterInstance changed while it was being updated by another process), the write is either retried (if RetryAndAllow or RetryAndDeny is used) or a RateLimiterError::BackendConflict is returned. In either case, whether the request is ratelimited or not is based on the RetryStrategy used.

let cache = memcache::connect("memcache://127.0.0.1:11211").unwrap();
let memcache_backend = MemCache::new(cache);
let limiter = RateLimiter::builder()
    .with_backend(memcache_backend)
    .with_limiter(FixedWindow::new(10000, Duration::from_millis(1000)))
    .with_conflict_strategy(brakes::RetryStrategy::RetryAndDeny(2))
    .build();

§Redis

Available on crate feature redis only

Uses a redis connection pool to connect to redis. RedisBackend::new expects a r2d2::Pool.

Writes use transactions (WATCH, MULTI, and EXEC) to ensure conncurent writes won’t conflict.

If there’s a conflict (data related to a single LimiterInstance changed while it was being updated by another process), the write is either retried (if RetryAndAllow or RetryAndDeny is used) or a RateLimiterError::BackendConflict is returned. In either case, whether the request is ratelimited or not is based on the RetryStrategy used.

let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let pool = r2d2::Pool::builder().build(client).unwrap();
    
let limiter = RateLimiter::builder()
    .with_backend(RedisBackend::new(pool))
    .with_limiter(FixedWindow::new(100, Duration::from_millis(1000)))
    .with_conflict_strategy(brakes::RetryStrategy::RetryAndDeny(1))
    .build();

§Rate Limiter Types

LimiterType dictates the rate limiting algorithm to be used.

The LimiterType (ex: FixedWindow limiter type) stores configuration about the algorithm (ex: for FixedWindow, it’s the threshold and the window_size), while its associated LimiterInstance stores information about a single key’s (user, for example) usage of the limiter (ex: for FixedWindow, it’s window_start timestamp and count).

LimiterInstances are stored in the configured Backend

§FixedWindow

Defined by a threshold and a window_length.

The FixedWindowInstance keeps track of window_start and count for each key (user, for example).

// allow upto 10 requests in any 1000ms fixed window.
let limiter = RateLimiter::builder()
    .with_backend(...)
    .with_limiter(FixedWindow::new(10, Duration::from_millis(1000)))
    .with_conflict_strategy(brakes::RetryStrategy::RetryAndDeny(1))
    .build();

§SlidingWindowCounter

Defined by a threshold and a window_length.

The SlidingWindowInstance keeps track of the current and previous window_start and count.

// allow upto 5 requests in any 1000ms sliding window.
let limiter = RateLimiter::builder()
    .with_backend(...)
    .with_limiter(SlidingWindowCounter::new(5, Duration::from_millis(1000)))
    .with_conflict_strategy(brakes::RetryStrategy::RetryAndDeny(1))
    .build();

§TokenBucket

Defined by a capacity and a fill_frequency.

A bucket with a capacity of 10, and a fill_frequency of 1 second will allow up to 10 requests to be allowed. Each request consumes a token from the bucket. The bucket is refilled by 1 token every second. If the bucket is empty, no requests are allowed.

The TokenBucketInstance keeps track of how many tokens are available and the last_access timestamp for the user.

// 10 tokens at most, with a fill rate of 1 token every 2 seconds
let hello_limiter = RateLimiter::builder()
   .with_backend(...)
   .with_limiter(TokenBucket::new(10, Duration::from_secs(2)))
   .build();

§LeakyBucket

Defined by a capacity and a leak_frequency.

A bucket with a capacity of 10, and a leak_frequency of 1 second will allow up to 10 requests to be allowed. Each request is added to the bucket until it’s full. If the bucket is full, further requests are denied until requests are leaked. A leak_frequency of 1 second will leak one request per second.

The LeakyBucketInstance keeps track of how many allowed requests there are in the bucket and the last_leaked timestamp for the user.

// upto 100 requests can be allowed, with a leak rate of 1 request every 2 seconds
let hello_limiter = RateLimiter::builder()
   .with_backend(...)
   .with_limiter(LeakyBucket::new(100, Duration::from_secs(2)))
   .build();

§Retry Strategies

Retry strategies can be useful in two cases:

  • When reads or writes to the Backend fail (for example due to a network timeout). Can be set using RateLimiterBuilder::with_failure_strategy
  • When writes to the Backend fail due to a conflict (caused by concurrent requests for the same ip for example). Can be set using RateLimiterBuilder::with_conflict_strategy

A RetyStrategy can be one of four:

  • RetryAndAllow(n) tries the operation a total of n+1 times. If all fail, it allows the request.
  • RetryAndDeny(n) tries the operation a total of n+1 times. If all fail, it denies the request.
  • Allow allows the request without retries.
  • Deny denies the request without retries.

Where n is the number of retries.

If with_failure_strategy or with_conflict_strategy is not set, the default is used:

  • Failure strategy of RetryStrategy::RetryAndAllow(2)
  • Conflict strategy of RetryStrategy::RetryAndDeny(2)

Both can be set as follows:

let limiter = RateLimiter::builder()
    .with_backend(...)
    .with_limiter(...)
    .with_failure_strategy(brakes::RetryStrategy::RetryAndAllow(1))
    .with_conflict_strategy(brakes::RetryStrategy::Deny)
    .build();

§Discard Invalid Cache

In some cases, the data stored in the configured rate limiter Backend might be invalid. This might happen:

  • When the RateLimiter type changes (you switch from SlidingWindowCounter to LeakyBucket limiter, for example). Stored entries in the Backend now represent the wrong LimiterInstance type and are no longer valid. Calls to is_ratelimited(key) return Err(RateLimiterError::WrongLimiterInstanceType)
  • When the cached data in the configured Backend gets overwritten by an external process. Calls to is_ratelimited(key) return Err(RateLimiterError::MalformedValue(..))

In either case, fetching the cached LimiterInstance results in a deserialization error.

If this happens, by default, the RateLimiter discards invalid data by deleting the provided key and allowing the request. Subsequent requests will create a new value in the cache.

The behavior can be changed by calling with_discard_invalid_cache_entries(false). Note: this might cause all requests to be rate-limited (for example, if the RateLimiter type was changed)

Modules§

backend
middleware
types

Structs§

RateLimiter
RateLimiterBuilder

Enums§

RetryStrategy