use tokio::net::TcpListener;
use ultimo::prelude::*;
use ultimo::websocket::{Message, WebSocket, WebSocketHandler};
#[derive(Clone)]
struct TestHandler;
#[async_trait::async_trait]
impl WebSocketHandler for TestHandler {
type Data = ();
async fn on_open(&self, ws: &WebSocket<Self::Data>) {
ws.send("connected").await.ok();
}
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
if let Message::Text(text) = msg {
ws.send(format!("echo: {}", text)).await.ok();
}
}
}
#[derive(Clone)]
#[allow(dead_code)]
struct TypedHandler;
#[async_trait::async_trait]
impl WebSocketHandler for TypedHandler {
type Data = String;
async fn on_open(&self, ws: &WebSocket<Self::Data>) {
let user = ws.data();
ws.send(format!("Welcome, {}!", user)).await.ok();
}
async fn on_message(&self, ws: &WebSocket<Self::Data>, msg: Message) {
if let Message::Text(text) = msg {
let user = ws.data();
ws.send(format!("{}: {}", user, text)).await.ok();
}
}
}
async fn find_available_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
port
}
async fn start_test_server(port: u16) {
let mut app = Ultimo::new();
app.websocket("/ws", TestHandler);
app.get("/health", |ctx: Context| async move {
ctx.json(json!({"status": "ok"})).await
});
tokio::spawn(async move {
let addr = format!("127.0.0.1:{}", port);
app.listen(&addr).await.ok();
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_websocket_route_registration() {
let port = find_available_port().await;
start_test_server(port).await;
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/ws", port))
.await
.expect("Failed to connect");
use futures_util::StreamExt;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
if let Some(Ok(TungsteniteMessage::Text(text))) = ws.next().await {
assert_eq!(text, "connected");
} else {
panic!("Expected connection message");
}
}
#[tokio::test]
async fn test_websocket_echo() {
let port = find_available_port().await;
start_test_server(port).await;
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/ws", port))
.await
.expect("Failed to connect");
use futures_util::SinkExt;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
let timeout = tokio::time::Duration::from_millis(500);
next_text_frame(&mut ws, timeout).await;
ws.send(TungsteniteMessage::Text("hello".to_string()))
.await
.unwrap();
let text = next_text_frame(&mut ws, timeout).await;
assert_eq!(text, "echo: hello");
}
async fn next_text_frame<S>(ws: &mut S, timeout: tokio::time::Duration) -> String
where
S: futures_util::Stream<
Item = std::result::Result<
tokio_tungstenite::tungstenite::Message,
tokio_tungstenite::tungstenite::Error,
>,
> + Unpin,
{
use futures_util::StreamExt;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
loop {
match tokio::time::timeout(timeout, ws.next()).await {
Ok(Some(Ok(TungsteniteMessage::Text(text)))) => return text,
Ok(Some(Ok(TungsteniteMessage::Ping(_) | TungsteniteMessage::Pong(_)))) => continue,
Ok(None) => panic!("Connection closed unexpectedly"),
Ok(Some(Err(e))) => panic!("WebSocket error: {}", e),
Ok(Some(Ok(msg))) => panic!("Unexpected message type: {:?}", msg),
Err(_) => panic!("Timeout waiting for text frame"),
}
}
}
#[tokio::test]
async fn test_multiple_websocket_connections() {
let port = find_available_port().await;
start_test_server(port).await;
let (mut ws1, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/ws", port))
.await
.expect("Failed to connect client 1");
let (mut ws2, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/ws", port))
.await
.expect("Failed to connect client 2");
use futures_util::StreamExt;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
if let Some(Ok(TungsteniteMessage::Text(text))) = ws1.next().await {
assert_eq!(text, "connected");
}
if let Some(Ok(TungsteniteMessage::Text(text))) = ws2.next().await {
assert_eq!(text, "connected");
}
}
#[tokio::test]
async fn test_http_and_websocket_coexist() {
let port = find_available_port().await;
start_test_server(port).await;
let client = reqwest::Client::new();
let response = client
.get(format!("http://127.0.0.1:{}/health", port))
.send()
.await
.expect("Failed to make HTTP request");
assert_eq!(response.status(), 200);
let body: serde_json::Value = response.json().await.unwrap();
assert_eq!(body["status"], "ok");
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/ws", port))
.await
.expect("Failed to connect to WebSocket");
use futures_util::StreamExt;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
if let Some(Ok(TungsteniteMessage::Text(text))) = ws.next().await {
assert_eq!(text, "connected");
}
}
#[tokio::test]
async fn test_websocket_binary_messages() {
let port = find_available_port().await;
start_test_server(port).await;
let (mut ws, _) = tokio_tungstenite::connect_async(format!("ws://127.0.0.1:{}/ws", port))
.await
.expect("Failed to connect");
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
ws.next().await;
let data = vec![1, 2, 3, 4, 5];
ws.send(TungsteniteMessage::Binary(data.clone()))
.await
.unwrap();
ws.send(TungsteniteMessage::Text("ping".to_string()))
.await
.unwrap();
if let Some(Ok(TungsteniteMessage::Text(text))) = ws.next().await {
assert_eq!(text, "echo: ping");
}
}