# Rpress
An async HTTP/1.1 and HTTP/2 framework in Rust, built on top of `tokio`. Designed to be lightweight, secure, and production-ready.
## Features
- Trie-based routing (static, dynamic, multi-method)
- Middleware (global and per route group)
- **Native TLS via rustls** (HTTPS with PEM certificates)
- **HTTP/2 via h2** (automatic ALPN negotiation over TLS)
- Request body streaming via `mpsc::channel`
- Automatic gzip/brotli compression
- Native CORS with builder pattern and **fail-fast validation** (RFC compliance)
- **Granular body size limits** (global and per route group)
- **Pluggable rate limiting** via `RateLimiter` trait (in-memory or distributed backends like Redis)
- Static file serving
- Cookies (parsing and Set-Cookie builder)
- Graceful shutdown
- Configurable timeouts (read and idle)
- Concurrent connection limits
- Automatic security headers (`X-Content-Type-Options: nosniff`) with configurable CSP, X-Frame-Options, and more
- Automatic request ID (`X-Request-ID`)
## Quick Start
```rust
use rpress::{Rpress, RpressCors, RpressRoutes, RequestPayload, ResponsePayload};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let cors = RpressCors::new()
.set_origins(vec!["*"])
.set_methods(vec!["GET", "POST", "PUT", "DELETE"])
.set_headers(vec!["Content-Type", "Authorization"]);
let mut app = Rpress::new(Some(cors));
let mut routes = RpressRoutes::new();
routes.add(":get/hello", |_req: RequestPayload| async move {
ResponsePayload::text("Hello, Rpress!")
});
app.add_route_group(routes);
app.listen("0.0.0.0:3000").await?;
Ok(())
}
```
## Routing
Routes use the format `:method/path`. Dynamic segments are prefixed with `:`.
### Static routes
```rust
let mut routes = RpressRoutes::new();
```rust
ResponsePayload::text("POST resource").with_status(StatusCode::Created)
});
ResponsePayload::text(format!("Deleted {}", id))
});
```
### Supported HTTP methods
`GET`, `POST`, `PUT`, `PATCH`, `DELETE`, `HEAD`, `OPTIONS`
## Middleware
### Global middleware
Applied to all routes:
```rust
app.use_middleware(|req, next| async move {
let uri = req.uri().to_string();
let method = req.method().to_string();
tracing::info!("--> {} {}", method, uri);
let start = std::time::Instant::now();
let result = next(req).await;
tracing::info!("<-- {} {} ({:?})", method, uri, start.elapsed());
result
});
```
### Route group middleware
```rust
let mut routes = RpressRoutes::new();
routes.use_middleware(|req, next| async move {
if req.header("authorization").is_none() {
return Err(RpressError {
status: StatusCode::Unauthorized,
message: "Token required".to_string(),
});
}
next(req).await
});
## Observability (Distributed Tracing)
Rpress automatically creates structured [tracing](https://docs.rs/tracing) spans for every request. This makes the framework compatible with distributed tracing backends like **Jaeger**, **Datadog**, **Grafana Tempo**, and **Zipkin** out of the box.
### Automatic spans
Every incoming request is wrapped in an `http.request` span with these fields:
| `http.method` | HTTP method (GET, POST, etc.) |
| `http.route` | Request URI path |
| `http.request_id` | Unique UUID v4 (same as X-Request-ID header) |
| `http.status_code` | Response status code (recorded after handler completes) |
| `http.latency_ms` | Total processing time in milliseconds |
Each connection also gets a parent span:
| `http.connection` | `peer.addr` | Per-connection span (HTTP/1.1 and TLS) |
| `h2.stream` | — | Per-stream span for HTTP/2 multiplexed streams |
The hierarchy looks like this:
```
http.connection (peer.addr=192.168.1.10)
└── http.request (method=GET, route=/users/1, request_id=abc-123, status_code=200, latency_ms=3)
└── app.request (your middleware span)
└── tracing::info!("...") ← inherits full context
```
Any `tracing::info!`, `tracing::warn!`, or `tracing::error!` emitted inside a middleware or handler automatically inherits the parent span context — no manual propagation needed.
### Adding custom fields in middleware
The framework span already exists when your middleware runs. Create a child span to add application-specific fields:
```rust
app.use_middleware(|req, next| async move {
let uri = req.uri().to_string();
let method = req.method().to_string();
let span = tracing::info_span!(
"app.request",
app.route = %uri,
app.method = %method,
app.user_id = tracing::field::Empty,
);
let _guard = span.enter();
tracing::info!("processing request");
let result = next(req).await;
// After authentication, record the user:
// tracing::Span::current().record("app.user_id", &"user-123");
result
});
```
### Exporting to Jaeger / Datadog / Tempo
Rpress uses the standard `tracing` crate. To export spans to a distributed tracing backend, configure `tracing-subscriber` with an OpenTelemetry layer in your `main()`:
```rust
// Cargo.toml:
// tracing-subscriber = { version = "0.3", features = ["env-filter"] }
// opentelemetry = "0.27"
// opentelemetry-otlp = "0.27"
// tracing-opentelemetry = "0.28"
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn init_tracing() {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.build()
.unwrap();
let provider = opentelemetry::sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter)
.build();
let tracer = provider.tracer("rpress-app");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(telemetry)
.init();
}
```
With this setup, every `http.request` span (and its children) is automatically exported as a trace to your backend. The `http.request_id` field matches the `X-Request-ID` response header, making it easy to correlate logs with traces.
## Request
### Accessing request data
```rust
let uri = req.uri();
let method = req.method();
// Headers (keys are lowercase)
let content_type = req.header("content-type").unwrap_or("unknown");
let auth = req.header("authorization");
// Route parameters
let id = req.get_param("id");
// Query string — GET /search?q=rust&page=1
let query = req.get_query("q").unwrap_or("");
let page = req.get_query("page").unwrap_or("1");
// Cookies
let cookies = req.cookies();
let session = cookies.get("session_id");
// Extensions (set by middleware, e.g. auth claims)
let user_id = req.get_extension("user_id");
let role = req.get_extension("role");
// Body as string
let body_text = req.body_str().unwrap_or("invalid utf8");
// Body as JSON
let data: serde_json::Value = req.body_json().unwrap();
ResponsePayload::text("ok")
});
```
### Body Streaming
For large uploads, Rpress can stream the body in chunks via a channel instead of accumulating everything in memory. The threshold is configurable:
```rust
app.set_stream_threshold(64 * 1024); // stream bodies > 64KB
```
#### `collect_body()` — Simple usage (recommended)
Collects the entire body into a `Vec<u8>`. Works for both small bodies (already buffered) and streamed ones:
```rust
ResponsePayload::text(format!("Received {} bytes", body.len()))
});
```
#### `body_stream()` — Chunk-by-chunk processing
For processing data on demand without accumulating everything in memory:
```rust
if let Some(mut rx) = req.body_stream() {
while let Some(chunk) = rx.recv().await {
total += chunk.len();
}
}
ResponsePayload::text(format!("Processed {} bytes in chunks", total))
});
```
## Response
### Available builders
```rust
// Plain text
ResponsePayload::text("Hello world")
// HTML
ResponsePayload::html("<h1>Welcome</h1>")
// JSON
ResponsePayload::json(&serde_json::json!({"status": "ok"})).unwrap()
// Bytes with custom content-type
ResponsePayload::bytes(vec![0x89, 0x50, 0x4E, 0x47], "image/png")
// Empty (204 No Content)
ResponsePayload::empty()
// Redirect
ResponsePayload::redirect("/new-location", StatusCode::Found)
```
### Chaining modifiers
```rust
ResponsePayload::text("data")
.with_status(StatusCode::Created)
.with_content_type("application/xml")
.with_header("X-Custom", "value")
```
### Cookies
```rust
use rpress::CookieBuilder;
let cookie = CookieBuilder::new("token", "abc123")
.path("/")
.max_age(3600)
.same_site("Strict")
.http_only(true)
.secure(true)
.domain("example.com");
ResponsePayload::text("logged in")
.set_cookie(&cookie)
```
Multiple `Set-Cookie` headers are supported — each `.set_cookie()` call adds a separate header.
## CORS
Native configuration via builder pattern:
```rust
let cors = RpressCors::new()
.set_origins(vec!["https://app.example.com", "https://admin.example.com"])
.set_methods(vec!["GET", "POST", "PUT", "DELETE"])
.set_headers(vec!["Content-Type", "Authorization", "X-Custom-Header"])
.set_expose_headers(vec!["X-Request-ID"])
.set_max_age(3600)
.set_credentials(true);
let mut app = Rpress::new(Some(cors));
```
Without CORS:
```rust
let mut app = Rpress::new(None);
```
Automatic headers: `Access-Control-Allow-Origin`, `Access-Control-Allow-Methods`, `Access-Control-Allow-Headers`, `Vary: Origin`. Preflight `OPTIONS` requests are handled automatically.
### CORS validation (fail-fast)
Rpress enforces RFC-compliant CORS at startup. Using wildcard origin `"*"` with `set_credentials(true)` will **panic** immediately, preventing the application from starting with an insecure configuration that browsers would silently reject:
```rust
// This will panic at startup:
let cors = RpressCors::new()
.set_origins(vec!["*"])
.set_credentials(true);
let app = Rpress::new(Some(cors)); // panics!
// Use explicit origins instead:
let cors = RpressCors::new()
.set_origins(vec!["https://app.example.com"])
.set_credentials(true);
let app = Rpress::new(Some(cors)); // ok
```
## Compression
Gzip and Brotli with automatic negotiation via `Accept-Encoding`:
```rust
app.enable_compression(true);
```
Behavior:
- Brotli is preferred when `Accept-Encoding: br` is present
- Gzip is used when `Accept-Encoding: gzip` is present
- Bodies smaller than 256 bytes are not compressed
- Already compressed types (image/*, video/*, audio/*, zip, gzip) are skipped
- SVG is compressed normally
- `Content-Encoding` and `Vary: Accept-Encoding` are added automatically
- **Compression runs inside `tokio::task::spawn_blocking`** — CPU-bound work (Brotli/Gzip encoding) never blocks the async event loop, even under high concurrency
## Rate Limiting
Limit requests per IP with a sliding window counter:
```rust
app.set_rate_limit(100, 60); // 100 requests per 60 seconds
```
When the limit is exceeded, returns `429 Too Many Requests`.
By default, `set_rate_limit` uses an in-memory backend (`InMemoryRateLimiter`) suitable for single-instance deployments. Expired entries are automatically cleaned up when the store exceeds 10,000 records.
### Distributed rate limiting
For multi-instance environments (e.g. Kubernetes), inject a custom backend that implements the `RateLimiter` trait:
```rust
use rpress::RateLimiter;
use std::pin::Pin;
struct RedisRateLimiter { /* redis client */ }
impl RateLimiter for RedisRateLimiter {
fn check(
&self,
key: &str,
max_requests: u32,
window_secs: u64,
) -> Pin<Box<dyn Future<Output = bool> + Send + '_>> {
let key = key.to_string();
Box::pin(async move {
// Query Redis INCR + EXPIRE and return whether under limit
true
})
}
}
let mut app = Rpress::new(None);
app.set_rate_limiter(RedisRateLimiter { /* ... */ });
app.set_rate_limit(100, 60);
```
The `set_rate_limiter` call must come **before** `set_rate_limit`, or after it to replace the default in-memory limiter. The framework does **not** ship a Redis implementation -- it only provides the trait and the in-memory default.
## Body Size Limits
By default, Rpress rejects request bodies larger than 10 MB with `413 Payload Too Large`.
### Global limit
```rust
app.set_max_body_size(5 * 1024 * 1024); // 5 MB for all routes
```
### Per route group limit
Individual route groups can override the global limit. This allows a file upload group to accept large bodies while keeping the rest of the API tightly restricted:
```rust
let mut api_routes = RpressRoutes::new();
api_routes.set_max_body_size(8 * 1024); // 8 KB for API routes
api_routes.add(":post/login", |req: RequestPayload| async move {
ResponsePayload::text("ok")
});
let mut upload_routes = RpressRoutes::new();
upload_routes.set_max_body_size(50 * 1024 * 1024); // 50 MB for uploads
upload_routes.add(":post/upload", |mut req: RequestPayload| async move {
let body = req.collect_body().await;
ResponsePayload::text(format!("Received {} bytes", body.len()))
});
app.set_max_body_size(1024 * 1024); // 1 MB global default
app.add_route_group(api_routes);
app.add_route_group(upload_routes);
```
When a route group has its own limit, that limit takes precedence over the global one -- even if the group limit is larger. The global limit acts as the baseline for routes without a specific override.
## Static Files
```rust
app.serve_static("/assets", "./public");
app.serve_static("/uploads", "/var/data/uploads");
```
- Content-Type is detected by file extension
- Path traversal is prevented with `canonicalize()` — both the base directory and the requested path are resolved and compared before any read is performed
- File reads use `tokio::fs::read` and path resolution uses `tokio::fs::canonicalize` — **no blocking syscalls on the event loop**
- Supports: HTML, CSS, JS, JSON, images (PNG, JPG, GIF, SVG, WebP, ICO), fonts (WOFF, WOFF2, TTF), PDF, XML, videos (MP4, WebM)
## TLS (HTTPS)
Rpress supports native TLS via `rustls`. Use `listen_tls` instead of `listen` to serve over HTTPS:
```rust
use rpress::{Rpress, RpressTlsConfig, RpressRoutes, RequestPayload, ResponsePayload};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut app = Rpress::new(None);
let mut routes = RpressRoutes::new();
routes.add(":get/hello", |_req: RequestPayload| async {
ResponsePayload::text("Hello, HTTPS!")
});
app.add_route_group(routes);
let tls = RpressTlsConfig::from_pem("cert.pem", "key.pem")?;
app.listen_tls("0.0.0.0:443", tls).await
}
```
### `RpressTlsConfig`
| `from_pem(cert_path, key_path)` | Loads a PEM certificate chain and private key from files |
| `from_config(rustls::ServerConfig)` | Uses an existing `rustls::ServerConfig` for full control |
Both methods automatically configure ALPN to support HTTP/2 (`h2`) and HTTP/1.1.
### Plaintext and TLS side by side
The `listen()` method continues to work for plaintext HTTP. You can use either one depending on your environment:
```rust
// Development — plaintext
app.listen("0.0.0.0:3000").await?;
// Production — TLS
let tls = RpressTlsConfig::from_pem("cert.pem", "key.pem")?;
app.listen_tls("0.0.0.0:443", tls).await?;
```
## HTTP/2
HTTP/2 is supported automatically over TLS connections. When a client negotiates the `h2` protocol via ALPN during the TLS handshake, Rpress routes the connection through its HTTP/2 handler.
- All routes, middleware, CORS, and response features work identically over HTTP/2
- No code changes required — the same `RpressRoutes` and handlers serve both protocols
- HTTP/2 multiplexing is fully supported (concurrent streams on a single connection)
- Plaintext connections (`listen()`) always use HTTP/1.1
```rust
// This handler serves both HTTP/1.1 and HTTP/2 clients transparently
Organize handlers in structs with `Arc`:
```rust
use rpress::handler;
pub struct UserController;
impl UserController {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
async fn get_user(&self, req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let id = req.get_param("id").ok_or_else(|| RpressError {
status: StatusCode::BadRequest,
message: "Missing id".to_string(),
})?;
Ok(ResponsePayload::json(&serde_json::json!({
"id": id,
"name": "Guilherme"
}))?)
}
async fn create_user(&self, mut req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let body = req.collect_body().await;
let data: serde_json::Value = serde_json::from_slice(&body)?;
Ok(ResponsePayload::json(&serde_json::json!({
"created": true,
"name": data["name"]
}))?.with_status(StatusCode::Created))
}
}
pub fn get_user_routes() -> RpressRoutes {
let controller = UserController::new();
let mut routes = RpressRoutes::new();
routes.add(":get/users/:id", handler!(controller, get_user));
routes.add(":post/users", handler!(controller, create_user));
routes
}
```
## State Management
Shared state — database pools, config, caches, service clients — is passed into route groups as function parameters and stored inside controllers wrapped in `Arc`.
### The pattern
```
main()
└── Arc::new(MyPool::new()) — created once
├── .clone() → get_user_routes(db)
│ └── UserController { db }
│ └── self.db.query(…).await
└── .clone() → get_order_routes(db)
└── OrderController { db }
```
### Example — database pool
```rust
// db.rs — your database pool (e.g. sqlx::PgPool or a mock)
pub struct DbPool { /* connection pool */ }
impl DbPool {
pub async fn find_user(&self, id: u32) -> Option<User> { /* … */ }
pub async fn create_user(&self, name: String, email: String) -> User { /* … */ }
}
```
```rust
// routes/user.rs
use std::sync::Arc;
use rpress::{handler, RpressRoutes, RequestPayload, ResponsePayload, RpressError, StatusCode};
use crate::db::DbPool;
pub struct UserController {
db: Arc<DbPool>, // shared, cloning Arc is O(1)
}
impl UserController {
pub fn new(db: Arc<DbPool>) -> Arc<Self> {
Arc::new(Self { db })
}
async fn get_user(&self, req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let id: u32 = req.get_param("id")
.and_then(|v| v.parse().ok())
.ok_or(RpressError { status: StatusCode::BadRequest, message: "bad id".into() })?;
let user = self.db.find_user(id).await
.ok_or(RpressError { status: StatusCode::NotFound, message: "not found".into() })?;
Ok(ResponsePayload::json(&user)?)
}
async fn create_user(&self, mut req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let body = req.collect_body().await;
let data: serde_json::Value = serde_json::from_slice(&body)?;
let user = self.db.create_user(
data["name"].as_str().unwrap_or("").to_string(),
data["email"].as_str().unwrap_or("").to_string(),
).await;
Ok(ResponsePayload::json(&user)?.with_status(StatusCode::Created))
}
}
// The pool is injected here — route groups are plain functions.
pub fn get_user_routes(db: Arc<DbPool>) -> RpressRoutes {
let controller = UserController::new(db);
let mut routes = RpressRoutes::new();
routes.add(":get/users/:id", handler!(controller, get_user));
routes.add(":post/users", handler!(controller, create_user));
routes
}
```
```rust
// main.rs — create the pool once, share it via Arc::clone
use std::sync::Arc;
use rpress::Rpress;
use crate::db::DbPool;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// With sqlx: let db = Arc::new(PgPool::connect(&database_url).await?);
let db = Arc::new(DbPool::new());
let mut app = Rpress::new(None);
// Each route group gets a cheap Arc clone — no data is copied.
app.add_route_group(get_user_routes(db.clone()));
app.add_route_group(get_order_routes(db.clone()));
app.listen("0.0.0.0:3000").await?;
Ok(())
}
```
### Multiple state types
Pass additional state the same way — just add more parameters:
```rust
pub fn get_auth_routes(
db: Arc<DbPool>,
cache: Arc<RedisClient>,
cfg: Arc<AppConfig>,
) -> RpressRoutes {
let controller = AuthController::new(db, cache, cfg);
// …
}
```
```rust
// main.rs
let db = Arc::new(DbPool::new());
let cache = Arc::new(RedisClient::connect("redis://localhost")?);
let cfg = Arc::new(AppConfig::from_env());
app.add_route_group(get_auth_routes(db.clone(), cache.clone(), cfg.clone()));
```
Any type that is `Send + Sync + 'static` can be wrapped in `Arc` and shared this way, including `tokio::sync::RwLock` and `tokio::sync::Mutex` for mutable shared state.
## Custom Errors
Implement `RpressErrorExt` to return errors with custom status codes:
```rust
use rpress::{RpressErrorExt, StatusCode};
struct NotFoundError {
resource: String,
}
impl RpressErrorExt for NotFoundError {
fn into_rpress_error(self) -> (StatusCode, String) {
(StatusCode::NotFound, format!("{} not found", self.resource))
}
}
if id == "0" {
return Err(NotFoundError { resource: "Item".into() });
}
Ok(ResponsePayload::text(format!("Item {}", id)))
});
```
Handlers can return:
- `ResponsePayload` (implicit 200)
- `Result<ResponsePayload, RpressError>`
- `Result<ResponsePayload, E>` where `E: RpressErrorExt`
- Any `E: RpressErrorExt` directly (error without Result)
- `()` (202 Accepted with no body)
## Security Headers
### Always Applied
These headers are sent automatically on every response:
| `X-Content-Type-Options` | `nosniff` |
| `X-Request-ID` | Unique UUID v4 per request |
| `Server` | `Rpress/1.0` |
| `Connection` | `keep-alive` |
### Configurable Security Headers
Use `RpressSecurityHeaders` to opt-in to additional security headers such as
`Content-Security-Policy`, `X-Frame-Options`, `X-XSS-Protection`, and any custom
header. These are injected into every response **unless** the handler already set
the same header via `with_header()`.
```rust
use rpress::{Rpress, RpressSecurityHeaders};
let mut app = Rpress::new(None);
app.set_security_headers(
RpressSecurityHeaders::new()
.content_security_policy("default-src 'self'; script-src 'self'")
.x_frame_options("DENY")
.x_xss_protection("1; mode=block")
.custom("Permissions-Policy", "camera=(), microphone=()")
.custom("Referrer-Policy", "strict-origin-when-cross-origin"),
);
```
If a handler needs a different policy for a specific route, it can override by
setting the header directly:
```rust
ResponsePayload::html(page)
.with_header("Content-Security-Policy", "default-src 'self'; script-src 'self' 'unsafe-inline'")
```
The handler-set value takes priority and the global default is skipped for that header.
## Graceful Shutdown
The server responds to `SIGINT` (Ctrl+C):
1. Stops accepting new connections
2. Waits for active connections to finish
3. Shuts down cleanly
## Security Limits
| Request line | 8 KB |
| Headers (size) | 8 KB |
| Headers (count) | 100 |
| Body (Content-Length) | Configurable per route group (default 10 MB) |
| Individual chunk | 1 MB |
| Connection buffer | Configurable (default 40 KB) |
## Socket.IO (Real-time Communication)
Rpress includes a built-in Socket.IO server compatible with `socket.io-client` v4+
(Engine.IO v4, Socket.IO protocol v5). It supports HTTP long-polling and WebSocket
transports, namespaces, rooms, event-based messaging, acknowledgements, and broadcasting.
### Basic Setup
```rust
use rpress::{Rpress, RpressIo};
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let io = RpressIo::new();
io.on_connection(|socket| async move {
println!("Connected: {}", socket.id());
socket.on("message", |socket, data| async move {
// Broadcast to all other sockets
socket.broadcast().emit("message", &data[0]).await;
None
}).await;
socket.on_disconnect(|socket| async move {
println!("Disconnected: {}", socket.id());
}).await;
});
let mut app = Rpress::new(None);
app.attach_socketio(io);
app.listen("0.0.0.0:3000").await
}
```
### Namespaces
```rust
let io = RpressIo::new();
// Default namespace "/"
io.on_connection(|socket| async move { /* ... */ });
// Custom namespace "/admin"
io.of("/admin").on_connection(|socket| async move {
println!("Admin connected: {}", socket.id());
});
```
### Rooms
```rust
socket.on("join_room", |socket, data| async move {
if let Some(room) = data.first().and_then(|v| v.as_str()) {
socket.join(room).await;
socket.to(room).emit("user_joined", &socket.id()).await;
}
None
}).await;
```
### Acknowledgements
```rust
```javascript
import { io } from "socket.io-client";
const socket = io("http://localhost:3000");
socket.on("connect", () => {
console.log("Connected:", socket.id);
});
socket.emit("message", "Hello from client");
socket.on("message", (data) => {
console.log("Received:", data);
});
```
### Client Connection (Rust — `rpress-client`)
For server-to-server communication, use the `rpress-client` crate to connect from Rust:
```toml
[dependencies]
rpress-client = "0.1"
tokio = { version = "1", features = ["full"] }
serde_json = "1"
```
```rust
use rpress_client::SocketIoClient;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Connect to default namespace "/"
let client = SocketIoClient::connect("http://localhost:3000").await?;
// Listen for events
client.on("chat message", |data| async move {
println!("Received: {:?}", data);
}).await;
// Emit events
client.emit("chat message", &serde_json::json!("Hello from Rust!")).await?;
// Emit with acknowledgement
let ack = client.emit_with_ack("greet", &serde_json::json!("Rpress")).await?;
println!("Ack: {:?}", ack);
// Connect to a custom namespace
let admin = SocketIoClient::connect_to("http://localhost:3000", "/admin").await?;
admin.emit("status", &serde_json::json!("online")).await?;
// Disconnect
client.disconnect().await?;
admin.disconnect().await?;
Ok(())
}
```
### Authentication
Protect Socket.IO connections by registering an authentication handler. The handler
receives the `auth` payload from the client's CONNECT packet and must return
`Ok(claims)` to allow the connection or `Err(message)` to reject it with a
`CONNECT_ERROR`. The returned claims are accessible on the socket via `socket.auth()`.
**Server (Rust):**
```rust
use rpress::{Rpress, RpressIo};
use std::sync::Arc;
let io = RpressIo::new();
// Register auth handler for the default namespace
io.use_auth(|auth| async move {
let token = auth.get("token").and_then(|v| v.as_str())
.ok_or_else(|| "Missing token".to_string())?;
// Validate the token (e.g. JWT verification)
if token == "valid-secret" {
Ok(serde_json::json!({"user_id": "123", "role": "admin"}))
} else {
Err("Unauthorized".to_string())
}
});
io.on_connection(|socket| async move {
let user_id = socket.auth().get("user_id").and_then(|v| v.as_str());
println!("Authenticated user: {}", user_id.unwrap_or("unknown"));
socket.on("admin_action", |socket, data| async move {
if socket.auth().get("role").and_then(|v| v.as_str()) == Some("admin") {
socket.broadcast().emit("notification", &data[0]).await;
}
None
}).await;
});
// Per-namespace auth is also supported:
io.of("/admin").use_auth(|auth| async move {
let token = auth.get("token").and_then(|v| v.as_str())
.ok_or_else(|| "Missing token".to_string())?;
// Stricter validation for /admin namespace...
Ok(serde_json::json!({"admin": true}))
});
```
**Client (JavaScript):**
```javascript
import { io } from "socket.io-client";
const socket = io("http://localhost:3000", {
auth: { token: "valid-secret" }
});
socket.on("connect", () => {
console.log("Authenticated and connected:", socket.id);
});
socket.on("connect_error", (err) => {
console.error("Auth failed:", err.message);
});
```
**Client (Rust — `rpress-client`):**
```rust
use rpress_client::SocketIoClient;
// Connect with authentication
let client = SocketIoClient::connect_with_auth(
"http://localhost:3000",
serde_json::json!({"token": "valid-secret"}),
).await?;
// Connect to a specific namespace with auth
let admin = SocketIoClient::connect_to_with_auth(
"http://localhost:3000",
"/admin",
serde_json::json!({"token": "admin-secret"}),
).await?;
```
Without an auth handler configured, connections are accepted without validation
(backward compatible).
### Scaling with Redis
By default, Rpress uses an in-memory adapter for room management and broadcasting.
This works perfectly for a single server instance, but when running multiple replicas
behind a load balancer (e.g. Kubernetes), broadcasts on one Pod won't reach sockets
connected to another Pod.
Enable the `redis` feature to use Redis Pub/Sub for cross-instance broadcasting:
```toml
[dependencies]
rpress = { version = "0.5", features = ["redis"] }
```
**Server setup:**
```rust
use rpress::{Rpress, RpressIo};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let io = RpressIo::with_redis("redis://127.0.0.1:6379").await?;
io.on_connection(|socket| async move {
println!("Connected: {}", socket.id());
socket.on("message", |socket, data| async move {
// This broadcast reaches ALL connected clients,
// even those on different server instances
socket.broadcast().emit("message", &data[0]).await;
None
}).await;
});
let mut app = Rpress::new(None);
app.attach_socketio(io);
app.listen("0.0.0.0:3000").await
}
```
You can also use a custom adapter with `set_adapter`:
```rust
use rpress::{RpressIo, RedisAdapter};
let adapter = RedisAdapter::new("redis://my-redis-cluster:6379").await?;
let mut io = RpressIo::new();
io.set_adapter(adapter);
```
**Deploying with Multiple Replicas (Kubernetes / Load Balancers)**
Engine.IO starts connections via HTTP long-polling before upgrading to WebSocket.
In a multi-replica deployment, successive polling requests from the same client may
be routed to different Pods by the load balancer, causing `"Session ID unknown"` errors.
There are two solutions:
**Option A: WebSocket-only mode (recommended)**
Force all clients to connect directly via WebSocket, bypassing long-polling entirely.
This eliminates the sticky session requirement because WebSocket is a single persistent
connection that stays on the same Pod.
Server:
```rust
use rpress::{Rpress, RpressIo, EioConfig};
let config = EioConfig {
websocket_only: true,
..EioConfig::default()
};
let io = RpressIo::with_config(config);
```
Client (JavaScript):
```javascript
const socket = io("https://api.example.com", {
transports: ["websocket"], // skip long-polling
});
```
Client (Rust — rpress-client):
```rust
// rpress-client connects via WebSocket by default — no changes needed
let client = SocketIoClient::connect("http://localhost:3000").await?;
```
When `websocket_only` is enabled, the server rejects any long-polling request with
a clear error message instructing the client to use WebSocket transport.
**Option B: Sticky sessions**
If you need long-polling support (e.g. for clients behind restrictive proxies that
block WebSocket), configure your load balancer with session affinity so that all
requests from the same client reach the same Pod.
Nginx example:
```nginx
upstream rpress_backend {
ip_hash; # sticky sessions by client IP
server pod1:3000;
server pod2:3000;
}
server {
location /socket.io/ {
proxy_pass http://rpress_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
```
Kubernetes Ingress (nginx-ingress) example:
```yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/session-cookie-name: "RPRESS_AFFINITY"
nginx.ingress.kubernetes.io/session-cookie-expires: "172800"
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
spec:
rules:
- host: api.example.com
http:
paths:
- path: /socket.io/
pathType: Prefix
backend:
service:
name: rpress-service
port:
number: 3000
```
The Redis adapter handles only cross-instance broadcast synchronization. Room
membership, socket state, and Engine.IO sessions remain local to each instance,
which is why sticky sessions (or WebSocket-only mode) are required.
## Benchmarks
Load tested on a single machine with [oha](https://github.com/hatoo/oha) (HTTP) and [Artillery](https://www.artillery.io/) (Socket.IO). All tests run against a release build of the benchmark server included in `bench/`.
### HTTP Performance
| Warmup | 1,000 | 50 | **85,422** | 0.37ms | 1.79ms | 100% |
| Max Throughput | 50,000 | 500 | **144,126** | 2.91ms | 10.91ms | 100% |
| JSON Serialization | 20,000 | 200 | **30,528** | 6.13ms | 15.03ms | 100% |
| POST Echo (1KB body) | 20,000 | 200 | **28,886** | 6.51ms | 16.48ms | 100% |
| Large Body + Compression | 10,000 | 100 | **1,840** | 53.30ms | 108.01ms | 100% |
| Static File (32KB CSS) | 10,000 | 100 | **10,295** | 9.01ms | 22.52ms | 100% |
| Extreme Concurrency | 10,000 | 1,000 | **94,558** | 6.19ms | 35.60ms | 100% |
| Sustained Load (60s) | 1,854,463 | 200 | **30,906** | 6.10ms | 15.69ms | 100% |
### Socket.IO Performance
| Virtual Users | 520 created, 520 completed, **0 failed** |
| Scenarios | Ping-Pong (60%), Room Join (30%), Broadcast Storm (10%) |
| Total Emits | 3,567 |
| Peak Emit Rate | 135/sec |
| Session Length (median) | 2.0s |
| Total Test Time | 1 min 10s |
### Stress Tests
| Connection Limit (5,000 vs 4,096 max) | **PASS** | 4,883 successful, excess gracefully rejected |
| Slowloris (20 slow connections) | **PASS** | Server remained responsive |
| Oversized Body (15MB vs 10MB limit) | **PASS** | Returned 413 Payload Too Large |
| Post-Stress Health Check | **PASS** | 10/10 checks passed |
All scenarios are configurable via environment variables. See [`bench/README.md`](bench/README.md) for details on running your own benchmarks.
## License
MIT