Expand description
§WebSocket Support for Real-Time Applications
This module provides comprehensive WebSocket support for building real-time applications with Torch. It includes connection management, message broadcasting, room-based messaging, and automatic connection handling.
§Features
- Real-time Communication: Bidirectional communication between client and server
- Connection Management: Automatic connection tracking and cleanup
- Message Broadcasting: Send messages to all connected clients
- Room Support: Group clients into rooms for targeted messaging
- JSON Messaging: Automatic JSON serialization/deserialization
- Ping/Pong: Built-in connection health monitoring
- Error Handling: Robust error handling and reconnection support
- Scalable: Designed for high-concurrency applications
Note: This module requires the websocket
feature to be enabled.
§Quick Start
§Basic WebSocket Server
use torch_web::{App, websocket::*};
let ws_manager = WebSocketManager::new();
let app = App::new()
.with_state(ws_manager.clone())
// WebSocket endpoint
.websocket("/ws", |mut connection| async move {
println!("New WebSocket connection: {}", connection.id());
while let Some(message) = connection.receive().await? {
match message {
WebSocketMessage::Text(text) => {
println!("Received: {}", text);
// Echo the message back
connection.send_text(&format!("Echo: {}", text)).await?;
}
WebSocketMessage::Binary(data) => {
println!("Received {} bytes", data.len());
connection.send_binary(data).await?;
}
WebSocketMessage::Close => {
println!("Connection closed");
break;
}
}
}
Ok(())
})
// HTTP endpoint to broadcast messages
.post("/broadcast", |State(ws): State<WebSocketManager>, Json(msg): Json<BroadcastMessage>| async move {
let count = ws.broadcast(&msg.text).await?;
Response::ok().json(&serde_json::json!({
"sent_to": count,
"message": msg.text
}))
});
§Chat Application Example
use torch_web::{App, websocket::*, extractors::*};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct ChatMessage {
user: String,
message: String,
timestamp: String,
}
let ws_manager = WebSocketManager::new();
let app = App::new()
.with_state(ws_manager.clone())
// Chat WebSocket endpoint
.websocket("/chat", |mut connection| async move {
// Join the general chat room
connection.join_room("general").await?;
while let Some(message) = connection.receive().await? {
if let WebSocketMessage::Text(text) = message {
// Parse incoming chat message
if let Ok(chat_msg) = serde_json::from_str::<ChatMessage>(&text) {
// Broadcast to all users in the room
connection.broadcast_to_room("general", &text).await?;
}
}
}
Ok(())
})
// REST endpoint to send messages
.post("/chat/send", |State(ws): State<WebSocketManager>, Json(msg): Json<ChatMessage>| async move {
let message_json = serde_json::to_string(&msg)?;
let count = ws.broadcast_to_room("general", &message_json).await?;
Response::ok().json(&serde_json::json!({"sent_to": count}))
});
§Real-Time Dashboard
use torch_web::{App, websocket::*, extractors::*};
use tokio::time::{interval, Duration};
let ws_manager = WebSocketManager::new();
// Background task to send periodic updates
let ws_clone = ws_manager.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(5));
loop {
interval.tick().await;
let stats = get_system_stats().await;
let message = serde_json::to_string(&stats).unwrap();
if let Err(e) = ws_clone.broadcast_to_room("dashboard", &message).await {
eprintln!("Failed to broadcast stats: {}", e);
}
}
});
let app = App::new()
.with_state(ws_manager)
// Dashboard WebSocket
.websocket("/dashboard", |mut connection| async move {
connection.join_room("dashboard").await?;
// Send initial data
let initial_stats = get_system_stats().await;
connection.send_json(&initial_stats).await?;
// Keep connection alive and handle incoming messages
while let Some(_message) = connection.receive().await? {
// Handle client requests for specific data
}
Ok(())
});
§Gaming/Multiplayer Example
use torch_web::{App, websocket::*};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct GameAction {
action_type: String,
player_id: String,
data: serde_json::Value,
}
let ws_manager = WebSocketManager::new();
let app = App::new()
.websocket("/game/:room_id", |mut connection, Path(room_id): Path<String>| async move {
// Join the specific game room
connection.join_room(&room_id).await?;
// Notify other players
let join_message = serde_json::json!({
"type": "player_joined",
"player_id": connection.id()
});
connection.broadcast_to_room(&room_id, &join_message.to_string()).await?;
while let Some(message) = connection.receive().await? {
if let WebSocketMessage::Text(text) = message {
if let Ok(action) = serde_json::from_str::<GameAction>(&text) {
// Process game action and broadcast to other players
process_game_action(&action).await;
connection.broadcast_to_room(&room_id, &text).await?;
}
}
}
// Notify other players when leaving
let leave_message = serde_json::json!({
"type": "player_left",
"player_id": connection.id()
});
connection.broadcast_to_room(&room_id, &leave_message.to_string()).await?;
Ok(())
});
Structs§
- Chat
Room - Real-time chat room example
- SSEStream
- Server-Sent Events (SSE) support for real-time updates
- WebSocket
Manager - WebSocket connection manager
- WebSocket
Middleware - WebSocket middleware for automatic connection management
Functions§
- websocket_
upgrade - WebSocket upgrade handler