Expand description
This crate provides a Tower service with rate-limiting functionality
backed by Valkey or Redis deployed with the Redis Cell
module.
The service and layer this crate provides are transport agnostic (just like
Tower itself), but here is a basic
example using axum.
First, let’s define Rule provider: the rule is defined per request, and
contains [Key] (e.g. IP, API key, user ID), redis_cell::Policy,
and - optionally - a resource name (useful for tracing, debugging, audit).
use axum::http::{Request, Method};
use tower_redis_cell::redis_cell::{Key, Policy};
use tower_redis_cell::{ProvideRule, ProvideRuleResult, Rule};
const BASIC_POLICY: Policy = Policy::from_tokens_per_second(1);
const STRICT_POLICY: Policy = Policy::from_tokens_per_day(5).max_burst(5);
#[derive(Clone)]
struct RuleProvider;
impl<T> ProvideRule<Request<T>> for RuleProvider {
fn provide<'a>(&self, req: &'a Request<T>) -> ProvideRuleResult<'a> {
let api_key = req
.headers()
.get("x-api-key")
.and_then(|val| val.to_str().ok())
.ok_or("cannot define key, since 'x-api-key' header is missing")?;
let (path, method) = (req.uri().path(), req.method());
let rule = if path.contains("/embed") &&
(method == Method::POST || method == Method::PUT) {
let key = Key::triple(api_key, path, method.as_str());
let rule = Rule::new(key, STRICT_POLICY).resource("embeddings::create");
return Ok(Some(rule));
} else {
Rule::new(api_key, BASIC_POLICY)
};
Ok(Some(rule))
}
}We now need to instantiate RateLimitConfig (which expects a rule provider
and an error handler), procure a Valkey/Redis client and use those to create
RateLimitLayer. Note that we are using ConnectionManager
in this example, but dy default anything ConnectionLike
will do. There is also an option to use a pool, but you will need to enable
a corresponding feature for that (currently, deadpool is supported).
use axum::http::{StatusCode, header};
use axum::response::{AppendHeaders, IntoResponse, Response};
use axum::{Router, body::Body, routing::get, routing::post};
use tower_redis_cell::{Error, RateLimitLayer, RateLimitConfig};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let config = redis::aio::ConnectionManagerConfig::new();
let connection = redis::aio::ConnectionManager::new_with_config(client, config)
.await
.unwrap();
let config = RateLimitConfig::new(RuleProvider, |err, _req| {
match err {
Error::ProvideRule(err) => {
tracing::warn!(
key = ?err.key,
detail = err.detail.as_deref(),
"failed to define rule for request"
);
(StatusCode::UNAUTHORIZED, err.to_string()).into_response()
}
Error::RateLimit(err) => {
tracing::warn!(
key = %err.rule.key,
policy = err.rule.policy.name,
"request throttled"
);
(
StatusCode::TOO_MANY_REQUESTS,
AppendHeaders([(header::RETRY_AFTER, err.details.retry_after)]),
Body::from("too many requests"),
)
.into_response()
}
err => {
tracing::error!(err = %err, "unexpected error");
(StatusCode::INTERNAL_SERVER_ERROR).into_response()
}
}
});
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/embded", post(|| async { "Create embeddings" }))
.layer(RateLimitLayer::new(config, connection));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}Note that we are in-lining the error handler above, but this might as well be
a free standing function. Also, you can optionally provide RateLimitConfig::on_success
and RateLimitConfig::on_unruled handlers, which both provide a mutable access
to the response, and so - if needed - you can set any additional headers.
Re-exports§
pub use redis_cell_rs as redis_cell;
Structs§
- Provide
Rule Error - Rate
Limit - Rate
Limit Config - Rate
Limit Layer - Request
Allowed Details - Request
Blocked Details - Rule