Module websocket

Source
Expand description

§WebSocket Support for Real-Time Applications

This module provides comprehensive WebSocket support for building real-time applications with Torch. It includes connection management, message broadcasting, room-based messaging, and automatic connection handling.

§Features

  • Real-time Communication: Bidirectional communication between client and server
  • Connection Management: Automatic connection tracking and cleanup
  • Message Broadcasting: Send messages to all connected clients
  • Room Support: Group clients into rooms for targeted messaging
  • JSON Messaging: Automatic JSON serialization/deserialization
  • Ping/Pong: Built-in connection health monitoring
  • Error Handling: Robust error handling and reconnection support
  • Scalable: Designed for high-concurrency applications

Note: This module requires the websocket feature to be enabled.

§Quick Start

§Basic WebSocket Server

use torch_web::{App, websocket::*};

let ws_manager = WebSocketManager::new();

let app = App::new()
    .with_state(ws_manager.clone())

    // WebSocket endpoint
    .websocket("/ws", |mut connection| async move {
        println!("New WebSocket connection: {}", connection.id());

        while let Some(message) = connection.receive().await? {
            match message {
                WebSocketMessage::Text(text) => {
                    println!("Received: {}", text);
                    // Echo the message back
                    connection.send_text(&format!("Echo: {}", text)).await?;
                }
                WebSocketMessage::Binary(data) => {
                    println!("Received {} bytes", data.len());
                    connection.send_binary(data).await?;
                }
                WebSocketMessage::Close => {
                    println!("Connection closed");
                    break;
                }
            }
        }

        Ok(())
    })

    // HTTP endpoint to broadcast messages
    .post("/broadcast", |State(ws): State<WebSocketManager>, Json(msg): Json<BroadcastMessage>| async move {
        let count = ws.broadcast(&msg.text).await?;
        Response::ok().json(&serde_json::json!({
            "sent_to": count,
            "message": msg.text
        }))
    });

§Chat Application Example

use torch_web::{App, websocket::*, extractors::*};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct ChatMessage {
    user: String,
    message: String,
    timestamp: String,
}

let ws_manager = WebSocketManager::new();

let app = App::new()
    .with_state(ws_manager.clone())

    // Chat WebSocket endpoint
    .websocket("/chat", |mut connection| async move {
        // Join the general chat room
        connection.join_room("general").await?;

        while let Some(message) = connection.receive().await? {
            if let WebSocketMessage::Text(text) = message {
                // Parse incoming chat message
                if let Ok(chat_msg) = serde_json::from_str::<ChatMessage>(&text) {
                    // Broadcast to all users in the room
                    connection.broadcast_to_room("general", &text).await?;
                }
            }
        }

        Ok(())
    })

    // REST endpoint to send messages
    .post("/chat/send", |State(ws): State<WebSocketManager>, Json(msg): Json<ChatMessage>| async move {
        let message_json = serde_json::to_string(&msg)?;
        let count = ws.broadcast_to_room("general", &message_json).await?;
        Response::ok().json(&serde_json::json!({"sent_to": count}))
    });

§Real-Time Dashboard

use torch_web::{App, websocket::*, extractors::*};
use tokio::time::{interval, Duration};

let ws_manager = WebSocketManager::new();

// Background task to send periodic updates
let ws_clone = ws_manager.clone();
tokio::spawn(async move {
    let mut interval = interval(Duration::from_secs(5));

    loop {
        interval.tick().await;

        let stats = get_system_stats().await;
        let message = serde_json::to_string(&stats).unwrap();

        if let Err(e) = ws_clone.broadcast_to_room("dashboard", &message).await {
            eprintln!("Failed to broadcast stats: {}", e);
        }
    }
});

let app = App::new()
    .with_state(ws_manager)

    // Dashboard WebSocket
    .websocket("/dashboard", |mut connection| async move {
        connection.join_room("dashboard").await?;

        // Send initial data
        let initial_stats = get_system_stats().await;
        connection.send_json(&initial_stats).await?;

        // Keep connection alive and handle incoming messages
        while let Some(_message) = connection.receive().await? {
            // Handle client requests for specific data
        }

        Ok(())
    });

§Gaming/Multiplayer Example

use torch_web::{App, websocket::*};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct GameAction {
    action_type: String,
    player_id: String,
    data: serde_json::Value,
}

let ws_manager = WebSocketManager::new();

let app = App::new()
    .websocket("/game/:room_id", |mut connection, Path(room_id): Path<String>| async move {
        // Join the specific game room
        connection.join_room(&room_id).await?;

        // Notify other players
        let join_message = serde_json::json!({
            "type": "player_joined",
            "player_id": connection.id()
        });
        connection.broadcast_to_room(&room_id, &join_message.to_string()).await?;

        while let Some(message) = connection.receive().await? {
            if let WebSocketMessage::Text(text) = message {
                if let Ok(action) = serde_json::from_str::<GameAction>(&text) {
                    // Process game action and broadcast to other players
                    process_game_action(&action).await;
                    connection.broadcast_to_room(&room_id, &text).await?;
                }
            }
        }

        // Notify other players when leaving
        let leave_message = serde_json::json!({
            "type": "player_left",
            "player_id": connection.id()
        });
        connection.broadcast_to_room(&room_id, &leave_message.to_string()).await?;

        Ok(())
    });

Structs§

ChatRoom
Real-time chat room example
SSEStream
Server-Sent Events (SSE) support for real-time updates
WebSocketManager
WebSocket connection manager
WebSocketMiddleware
WebSocket middleware for automatic connection management

Functions§

websocket_upgrade
WebSocket upgrade handler