# Tapaculo
A lightweight Rust server that handles real-time and turn-based communication for any multiplayer client interactions.
<h1 align="center">
<img width="300px" src="https://raw.githubusercontent.com/AlexTrebs/tapaculo/refs/heads/main/docs/images/icon.png" />
</h1>
## Features
- **Pluggable PubSub**: Swap between in-memory and Redis backends without code changes
- **JWT Authentication**: Secure token-based auth with session tracking and reconnection support
- **Room Management**: Capacity limits, player tracking, and lifecycle events
- **Real-time WebSocket**: Bidirectional communication with automatic cleanup
- **Rate Limiting**: Built-in spam prevention and abuse protection
- **Message History**: Optional replay for new joiners
- **User Metadata**: Associate custom data with players
- **Token Endpoint**: Optional `POST /token` route for server-side JWT minting
- **Custom Room State**: Store typed game state per room, accessible from any handler
- **Typed Messages**: Type-safe message handling with serde
- **Production Ready**: Comprehensive error handling, logging, and reconnection logic
## Quick Start
```toml
[dependencies]
tapaculo = "1.2"
```
### Basic Server
```rust
use tapaculo::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let auth = JwtAuth::new("your-secret-key");
let pubsub = InMemoryPubSub::new();
Server::new()
.with_auth(auth)
.with_pubsub(pubsub)
.on_message(|ctx, envelope| async move {
// Echo messages to all other players
ctx.broadcast_to_others(envelope.data).await.ok();
})
.listen("0.0.0.0:8080")
.await
}
```
## Use Cases
### Chess Server (2 players, turn-based)
```rust
use tapaculo::*;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
enum ChessMove {
Move { from: String, to: String },
Resign,
OfferDraw,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let room_settings = RoomSettings {
max_players: Some(2), // Exactly 2 for chess
allow_spectators: false,
store_message_history: true,
..Default::default()
};
Server::new()
.with_auth(JwtAuth::new("secret"))
.with_pubsub(InMemoryPubSub::new())
.with_room_settings(room_settings)
.on_message_typed::<ChessMove, _, _>(|ctx, envelope| async move {
// Validate and broadcast move to opponent only
ctx.broadcast_to_others(envelope.data).await.ok();
})
.with_event_handler(ChessEventHandler)
.listen("0.0.0.0:8080")
.await
}
struct ChessEventHandler;
#[async_trait::async_trait]
impl RoomEventHandler for ChessEventHandler {
async fn on_room_full(&self, ctx: &Context) {
// Both players connected - start game
let members = ctx.get_room_members().await;
ctx.broadcast(GameStart {
white: members[0].clone(),
black: members[1].clone(),
}).await.ok();
}
}
```
### Chat Server (group messaging)
```rust
Server::new()
.with_auth(JwtAuth::new("secret"))
.with_pubsub(InMemoryPubSub::new())
.with_room_settings(RoomSettings {
max_players: Some(50),
store_message_history: true,
max_history_messages: 200,
..Default::default()
})
.with_limits(MessageLimits {
max_messages_per_window: 10,
window_duration: Duration::from_secs(1),
..Default::default()
})
.on_message(|ctx, envelope| async move {
// Broadcast to everyone (including sender)
ctx.broadcast(envelope.data).await.ok();
})
.listen("0.0.0.0:8081")
.await
```
## Core Concepts
### 1. Room Management
Rooms are isolated multiplayer sessions with configurable limits:
```rust
let room_settings = RoomSettings {
max_players: Some(4), // Limit to 4 players
allow_spectators: true, // Allow additional viewers
store_message_history: true, // Enable history replay
max_history_messages: 100, // Keep last 100 messages
empty_room_timeout: Some(Duration::from_secs(300)), // Auto-cleanup
};
Server::new()
.with_room_settings(room_settings)
// ...
```
### 2. Broadcast Filtering
Send messages to specific players:
```rust
// To all players in room
ctx.broadcast(data).await?;
// To all OTHER players (exclude sender)
ctx.broadcast_to_others(data).await?;
// Custom filter
}).await?;
// Direct message to one player — works across nodes with Redis
ctx.send_to("player456", data).await?;
```
`broadcast` and `broadcast_to_others` use a single publish to the room topic regardless of player count. `send_to` and `broadcast_filtered` route via per-user topics and work correctly in multi-node Redis deployments.
### 3. Room Lifecycle Events
React to room state changes:
```rust
struct MyEventHandler;
#[async_trait::async_trait]
impl RoomEventHandler for MyEventHandler {
async fn on_player_joined(&self, ctx: &Context, user_id: &str) {
// Send welcome message, current game state, etc.
}
async fn on_player_left(&self, ctx: &Context, user_id: &str) {
// Handle disconnects, pause game, etc.
}
async fn on_room_full(&self, ctx: &Context) {
// Start game when all players connected
}
async fn on_room_empty(&self, room_id: &str) {
// Cleanup, save game state, etc.
}
}
Server::new()
.with_event_handler(MyEventHandler)
// ...
```
### 4. Room State Queries
Access room information:
```rust
// Get all members in room
let members = ctx.get_room_members().await;
// Check if specific user is in room
if ctx.has_member("user123").await {
// ...
}
// Get room info
if let Some(info) = ctx.get_room_info().await {
println!("Room has {} / {:?} players",
info.member_count, info.max_members);
println!("Is full: {}", info.is_full);
}
// Get message history
let history = ctx.get_message_history(50).await;
```
### 5. Rate Limiting
Prevent spam and abuse:
```rust
let limits = MessageLimits {
max_size_bytes: 10 * 1024, // 10KB per message
max_messages_per_window: 10, // 10 messages max
window_duration: Duration::from_secs(1), // per second
ban_duration: Duration::from_secs(60), // 1 minute ban
};
Server::new()
.with_limits(limits)
// ...
```
### 6. Message Validation
Validate messages before processing:
```rust
Server::new()
.on_message_validate(|ctx, envelope| {
// Example: validate chess moves
if !is_valid_move(&envelope.data) {
return Err("Invalid move".to_string());
}
Ok(())
})
// ...
```
### 7. Typed Message Handlers
Type-safe message processing:
```rust
#[derive(Serialize, Deserialize)]
enum GameAction {
Move { x: i32, y: i32 },
Attack { target: String },
UseItem { item_id: String },
}
Server::new()
.on_message_typed::<GameAction, _, _>(|ctx, envelope| async move {
match envelope.data {
GameAction::Move { x, y } => {
// Handle move
}
GameAction::Attack { target } => {
// Handle attack
}
_ => {}
}
})
// ...
```
### 8. User Metadata
Store custom player data:
```rust
// Set metadata
let metadata = PlayerMetadata {
user_id: "player1".to_string(),
display_name: "Alice".to_string(),
avatar_url: Some("https://...".to_string()),
custom: json!({ "level": 10, "team": "red" }),
..Default::default()
};
ctx.set_user_metadata(metadata).await?;
// Get metadata
if let Some(metadata) = ctx.get_user_metadata("player1").await {
println!("Display name: {}", metadata.display_name);
}
```
### 9. Custom Room State
Store typed game state per room, accessible from any handler:
```rust
#[derive(Clone)]
struct GameState {
turn: String,
score: u32,
}
// Set initial state when first player joins
ctx.set_custom_state(GameState { turn: user_id.to_string(), score: 0 }).await?;
// Update state atomically
ctx.update_custom_state::<GameState, _>(|state| {
state.score += 10;
state.turn = next_player.to_string();
}).await?;
// Read state
if let Some(state) = ctx.get_custom_state::<GameState>().await {
ctx.send_to(&state.turn, YourTurn).await?;
}
```
### 10. Reconnection Support
Sessions persist across connections:
```rust
// Generate token with session ID
let auth = JwtAuth::new("secret");
let token = auth.sign_access(
"user123".to_string(),
"room456".to_string(),
"session-abc".to_string(), // Session ID
3600
)?;
// Later, reconnect with same session ID
let new_token = auth.sign_access(
"user123".to_string(),
"room456".to_string(),
"session-abc".to_string(), // Same session
3600
)?;
// Access session ID in handler
ctx.session_id(); // "session-abc"
```
## PubSub Backends
### In-Memory (Development)
```rust
let pubsub = InMemoryPubSub::new();
// or with custom buffer size
let pubsub = InMemoryPubSub::with_buffer(128);
```
### Redis (Production)
```toml
[dependencies]
tapaculo = { version = "1.2", features = ["redis-backend"] }
```
```rust
let pubsub = RedisPubSub::new("redis://localhost:6379")?;
// With custom retry configuration
let config = BackoffConfig {
max_retries: 10,
base_delay_ms: 100,
max_delay_ms: 5000,
..Default::default()
};
let pubsub = RedisPubSub::with_config("redis://localhost", config)?;
```
## Authentication
### Creating Tokens
```rust
let auth = JwtAuth::new("your-secret-key");
// Access token
let access_token = auth.sign_access(
"user_id".to_string(),
"room_id".to_string(),
"session_id".to_string(),
3600 // 1 hour
)?;
// Refresh token
let refresh_token = auth.sign_refresh(
"user_id".to_string(),
86400 // 24 hours
)?;
// Refresh access token
let new_access = auth.refresh_access(
&refresh_token,
"new_room".to_string(),
"new_session".to_string(),
3600
)?;
```
### Token Endpoint
Enable a built-in `POST /token` route so clients can obtain JWTs without a separate auth service:
```rust
Server::new()
.with_auth(JwtAuth::new("secret"))
.with_pubsub(InMemoryPubSub::new())
.with_token_endpoint() // enable POST /token
.with_token_ttl(1800) // optional: 30 min TTL (default 3600)
.listen("0.0.0.0:8080")
.await
```
Clients POST `{ "user_id": "...", "room_id": "..." }` and receive `{ "token": "..." }`:
```javascript
const res = await fetch("http://localhost:8080/token", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ user_id: "player1", room_id: "room42" }),
});
const { token } = await res.json();
const ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`);
```
> **Note:** The `/token` endpoint performs no authorization itself — add your own middleware or proxy-level auth before exposing it publicly.
### Client Connection
```javascript
const token = "eyJ0eXAiOiJKV1QiLCJhbGc...";
const ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`);
ws.onmessage = (event) => {
const envelope = JSON.parse(event.data);
console.log(`Message from ${envelope.from}:`, envelope.data);
};
// Send message
ws.send(JSON.stringify({
from: "user123",
data: { type: "Move", from: "e2", to: "e4" }
}));
```
## Custom HTTP Routes
Use `into_router()` to merge the WebSocket handler with your own axum routes:
```rust
let app = Server::new()
.with_auth(auth)
.with_pubsub(pubsub)
.on_message(handler)
.into_router()
.route("/healthz", get(|| async { "ok" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app).await?;
```
## Examples
See the `examples/` directory for complete implementations:
- **`chess_server.rs`**: 2-player turn-based game with room capacity enforcement
- **`chat_server.rs`**: Group chat with message history and typing indicators
- **`custom_state.rs`**: Per-room typed game state
- **`server_with_redis.rs`**: Multi-node deployment with Redis pubsub
Run examples:
```bash
cargo run --example chess_server
cargo run --example chat_server
cargo run --example custom_state
cargo run --example server_with_redis --features redis-backend
```
## Architecture
```
┌──────────────────────────────────────────────┐
│ WebSocket Connections │
│ (JWT Auth + Session Tracking) │
└──────────┬───────────────────────────────────┘
│
┌──────────▼──────────────────────────────────┐
│ Server (Room Management) │
│ • Rate Limiting │
│ • Message Validation │
│ • Event Handlers │
│ • User Metadata │
└──────────┬──────────────────────────────────┘
│
┌──────────▼──────────────────────────────────┐
│ PubSub Backend │
│ ┌────────────────┬──────────────────────┐ │
│ │ InMemoryPubSub│ RedisPubSub │ │
│ │ (Single Node) │ (Distributed) │ │
│ └────────────────┴──────────────────────┘ │
└─────────────────────────────────────────────┘
```
## Production Checklist
- [x] Use Redis backend for multi-server deployments
- [x] Enable rate limiting to prevent spam
- [x] Set appropriate room size limits
- [x] Configure empty room timeouts
- [x] Use strong JWT secrets (env variables)
- [x] Add message validation for your use case
- [x] Implement custom event handlers for game logic
- [x] Enable message history if needed
- [x] Set up structured logging (tracing)
- [x] Monitor rate limit bans
- [x] Handle reconnections gracefully
## License
MIT
## Contributing
Contributions welcome! Please open an issue or PR.