tapaculo 1.5.0

Lightweight Rust server for real-time and turn-based multiplayer communication
Documentation

Tapaculo

A lightweight Rust server that handles real-time and turn-based communication for any multiplayer client interactions.

Features

  • Pluggable PubSub: Swap between in-memory and Redis backends without code changes
  • JWT Authentication: Secure token-based auth with session tracking and reconnection support
  • Room Management: Capacity limits, player tracking, and lifecycle events
  • Real-time WebSocket: Bidirectional communication with automatic cleanup
  • Rate Limiting: Built-in spam prevention and abuse protection
  • Message History: Optional replay for new joiners
  • User Metadata: Associate custom data with players
  • Token Endpoint: Optional POST /token route for server-side JWT minting
  • Custom Room State: Store typed game state per room, accessible from any handler
  • Typed Messages: Type-safe message handling with serde
  • Production Ready: Comprehensive error handling, logging, and reconnection logic

Quick Start

[dependencies]
tapaculo = "1.2"

Basic Server

use tapaculo::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let auth = JwtAuth::new("your-secret-key");
    let pubsub = InMemoryPubSub::new();

    Server::new()
        .with_auth(auth)
        .with_pubsub(pubsub)
        .on_message(|ctx, envelope| async move {
            // Echo messages to all other players
            ctx.broadcast_to_others(envelope.data).await.ok();
        })
        .listen("0.0.0.0:8080")
        .await
}

Use Cases

Chess Server (2 players, turn-based)

use tapaculo::*;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
enum ChessMove {
    Move { from: String, to: String },
    Resign,
    OfferDraw,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let room_settings = RoomSettings {
        max_players: Some(2),  // Exactly 2 for chess
        allow_spectators: false,
        store_message_history: true,
        ..Default::default()
    };

    Server::new()
        .with_auth(JwtAuth::new("secret"))
        .with_pubsub(InMemoryPubSub::new())
        .with_room_settings(room_settings)
        .on_message_typed::<ChessMove, _, _>(|ctx, envelope| async move {
            // Validate and broadcast move to opponent only
            ctx.broadcast_to_others(envelope.data).await.ok();
        })
        .with_event_handler(ChessEventHandler)
        .listen("0.0.0.0:8080")
        .await
}

struct ChessEventHandler;

#[async_trait::async_trait]
impl RoomEventHandler for ChessEventHandler {
    async fn on_room_full(&self, ctx: &Context) {
        // Both players connected - start game
        let members = ctx.get_room_members().await;
        ctx.broadcast(GameStart {
            white: members[0].clone(),
            black: members[1].clone(),
        }).await.ok();
    }
}

Chat Server (group messaging)

Server::new()
    .with_auth(JwtAuth::new("secret"))
    .with_pubsub(InMemoryPubSub::new())
    .with_room_settings(RoomSettings {
        max_players: Some(50),
        store_message_history: true,
        max_history_messages: 200,
        ..Default::default()
    })
    .with_limits(MessageLimits {
        max_messages_per_window: 10,
        window_duration: Duration::from_secs(1),
        ..Default::default()
    })
    .on_message(|ctx, envelope| async move {
        // Broadcast to everyone (including sender)
        ctx.broadcast(envelope.data).await.ok();
    })
    .listen("0.0.0.0:8081")
    .await

Core Concepts

1. Room Management

Rooms are isolated multiplayer sessions with configurable limits:

let room_settings = RoomSettings {
    max_players: Some(4),              // Limit to 4 players
    allow_spectators: true,            // Allow additional viewers
    store_message_history: true,       // Enable history replay
    max_history_messages: 100,         // Keep last 100 messages
    empty_room_timeout: Some(Duration::from_secs(300)), // Auto-cleanup
};

Server::new()
    .with_room_settings(room_settings)
    // ...

2. Broadcast Filtering

Send messages to specific players:

// To all players in room
ctx.broadcast(data).await?;

// To all OTHER players (exclude sender)
ctx.broadcast_to_others(data).await?;

// Custom filter
ctx.broadcast_filtered(data, |user_id| {
    user_id != "spectator123"
}).await?;

// Direct message to one player — works across nodes with Redis
ctx.send_to("player456", data).await?;

broadcast and broadcast_to_others use a single publish to the room topic regardless of player count. send_to and broadcast_filtered route via per-user topics and work correctly in multi-node Redis deployments.

3. Room Lifecycle Events

React to room state changes:

struct MyEventHandler;

#[async_trait::async_trait]
impl RoomEventHandler for MyEventHandler {
    async fn on_player_joined(&self, ctx: &Context, user_id: &str) {
        // Send welcome message, current game state, etc.
    }

    async fn on_player_left(&self, ctx: &Context, user_id: &str) {
        // Handle disconnects, pause game, etc.
    }

    async fn on_room_full(&self, ctx: &Context) {
        // Start game when all players connected
    }

    async fn on_room_empty(&self, room_id: &str) {
        // Cleanup, save game state, etc.
    }
}

Server::new()
    .with_event_handler(MyEventHandler)
    // ...

4. Room State Queries

Access room information:

// Get all members in room
let members = ctx.get_room_members().await;

// Check if specific user is in room
if ctx.has_member("user123").await {
    // ...
}

// Get room info
if let Some(info) = ctx.get_room_info().await {
    println!("Room has {} / {:?} players",
        info.member_count, info.max_members);
    println!("Is full: {}", info.is_full);
}

// Get message history
let history = ctx.get_message_history(50).await;

5. Rate Limiting

Prevent spam and abuse:

let limits = MessageLimits {
    max_size_bytes: 10 * 1024,      // 10KB per message
    max_messages_per_window: 10,    // 10 messages max
    window_duration: Duration::from_secs(1), // per second
    ban_duration: Duration::from_secs(60),   // 1 minute ban
};

Server::new()
    .with_limits(limits)
    // ...

6. Message Validation

Validate messages before processing:

Server::new()
    .on_message_validate(|ctx, envelope| {
        // Example: validate chess moves
        if !is_valid_move(&envelope.data) {
            return Err("Invalid move".to_string());
        }
        Ok(())
    })
    // ...

7. Typed Message Handlers

Type-safe message processing:

#[derive(Serialize, Deserialize)]
enum GameAction {
    Move { x: i32, y: i32 },
    Attack { target: String },
    UseItem { item_id: String },
}

Server::new()
    .on_message_typed::<GameAction, _, _>(|ctx, envelope| async move {
        match envelope.data {
            GameAction::Move { x, y } => {
                // Handle move
            }
            GameAction::Attack { target } => {
                // Handle attack
            }
            _ => {}
        }
    })
    // ...

8. User Metadata

Store custom player data:

// Set metadata
let metadata = PlayerMetadata {
    user_id: "player1".to_string(),
    display_name: "Alice".to_string(),
    avatar_url: Some("https://...".to_string()),
    custom: json!({ "level": 10, "team": "red" }),
    ..Default::default()
};
ctx.set_user_metadata(metadata).await?;

// Get metadata
if let Some(metadata) = ctx.get_user_metadata("player1").await {
    println!("Display name: {}", metadata.display_name);
}

9. Custom Room State

Store typed game state per room, accessible from any handler:

#[derive(Clone)]
struct GameState {
    turn: String,
    score: u32,
}

// Set initial state when first player joins
ctx.set_custom_state(GameState { turn: user_id.to_string(), score: 0 }).await?;

// Update state atomically
ctx.update_custom_state::<GameState, _>(|state| {
    state.score += 10;
    state.turn = next_player.to_string();
}).await?;

// Read state
if let Some(state) = ctx.get_custom_state::<GameState>().await {
    ctx.send_to(&state.turn, YourTurn).await?;
}

10. Reconnection Support

Sessions persist across connections:

// Generate token with session ID
let auth = JwtAuth::new("secret");
let token = auth.sign_access(
    "user123".to_string(),
    "room456".to_string(),
    "session-abc".to_string(),  // Session ID
    3600
)?;

// Later, reconnect with same session ID
let new_token = auth.sign_access(
    "user123".to_string(),
    "room456".to_string(),
    "session-abc".to_string(),  // Same session
    3600
)?;

// Access session ID in handler
ctx.session_id(); // "session-abc"

PubSub Backends

In-Memory (Development)

let pubsub = InMemoryPubSub::new();
// or with custom buffer size
let pubsub = InMemoryPubSub::with_buffer(128);

Redis (Production)

[dependencies]
tapaculo = { version = "1.2", features = ["redis-backend"] }
let pubsub = RedisPubSub::new("redis://localhost:6379")?;

// With custom retry configuration
let config = BackoffConfig {
    max_retries: 10,
    base_delay_ms: 100,
    max_delay_ms: 5000,
    ..Default::default()
};
let pubsub = RedisPubSub::with_config("redis://localhost", config)?;

Authentication

Creating Tokens

let auth = JwtAuth::new("your-secret-key");

// Access token
let access_token = auth.sign_access(
    "user_id".to_string(),
    "room_id".to_string(),
    "session_id".to_string(),
    3600  // 1 hour
)?;

// Refresh token
let refresh_token = auth.sign_refresh(
    "user_id".to_string(),
    86400  // 24 hours
)?;

// Refresh access token
let new_access = auth.refresh_access(
    &refresh_token,
    "new_room".to_string(),
    "new_session".to_string(),
    3600
)?;

Token Endpoint

Enable a built-in POST /token route so clients can obtain JWTs without a separate auth service:

Server::new()
    .with_auth(JwtAuth::new("secret"))
    .with_pubsub(InMemoryPubSub::new())
    .with_token_endpoint()          // enable POST /token
    .with_token_ttl(1800)           // optional: 30 min TTL (default 3600)
    .listen("0.0.0.0:8080")
    .await

Clients POST { "user_id": "...", "room_id": "..." } and receive { "token": "..." }:

const res = await fetch("http://localhost:8080/token", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ user_id: "player1", room_id: "room42" }),
});
const { token } = await res.json();
const ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`);

Note: The /token endpoint performs no authorization itself — add your own middleware or proxy-level auth before exposing it publicly.

Client Connection

const token = "eyJ0eXAiOiJKV1QiLCJhbGc...";
const ws = new WebSocket(`ws://localhost:8080/ws?token=${token}`);

ws.onmessage = (event) => {
  const envelope = JSON.parse(event.data);
  console.log(`Message from ${envelope.from}:`, envelope.data);
};

// Send message
ws.send(JSON.stringify({
  from: "user123",
  data: { type: "Move", from: "e2", to: "e4" }
}));

Custom HTTP Routes

Use into_router() to merge the WebSocket handler with your own axum routes:

let app = Server::new()
    .with_auth(auth)
    .with_pubsub(pubsub)
    .on_message(handler)
    .into_router()
    .route("/healthz", get(|| async { "ok" }));

let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, app).await?;

Examples

See the examples/ directory for complete implementations:

  • chess_server.rs: 2-player turn-based game with room capacity enforcement
  • chat_server.rs: Group chat with message history and typing indicators
  • custom_state.rs: Per-room typed game state
  • server_with_redis.rs: Multi-node deployment with Redis pubsub

Run examples:

cargo run --example chess_server
cargo run --example chat_server
cargo run --example custom_state
cargo run --example server_with_redis --features redis-backend

Architecture

┌──────────────────────────────────────────────┐
│           WebSocket Connections              │
│  (JWT Auth + Session Tracking)               │
└──────────┬───────────────────────────────────┘
           │
┌──────────▼──────────────────────────────────┐
│         Server (Room Management)            │
│  • Rate Limiting                            │
│  • Message Validation                       │
│  • Event Handlers                           │
│  • User Metadata                            │
└──────────┬──────────────────────────────────┘
           │
┌──────────▼──────────────────────────────────┐
│         PubSub Backend                      │
│  ┌────────────────┬──────────────────────┐  │
│  │  InMemoryPubSub│    RedisPubSub      │  │
│  │  (Single Node) │  (Distributed)      │  │
│  └────────────────┴──────────────────────┘  │
└─────────────────────────────────────────────┘

Production Checklist

  • Use Redis backend for multi-server deployments
  • Enable rate limiting to prevent spam
  • Set appropriate room size limits
  • Configure empty room timeouts
  • Use strong JWT secrets (env variables)
  • Add message validation for your use case
  • Implement custom event handlers for game logic
  • Enable message history if needed
  • Set up structured logging (tracing)
  • Monitor rate limit bans
  • Handle reconnections gracefully

License

MIT

Contributing

Contributions welcome! Please open an issue or PR.