use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::{Message, Utf8Bytes};
use whispeer::broker::broker::Broker;
use whispeer::plugins::websocket::WebSocketPlugin;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestMessage {
content: String,
}
#[tokio::test]
async fn test_websocket_plugin() {
let broker = Broker::new();
let ws_plugin = WebSocketPlugin::new("127.0.0.1:9090");
broker.add_plugin(ws_plugin).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let (mut ws_stream, _) = connect_async("ws://127.0.0.1:9090")
.await
.expect("Failed to connect");
ws_stream
.send(Message::Text(Utf8Bytes::from(
"SUBSCRIBE test-topic".to_string(),
)))
.await
.expect("Failed to subscribe");
tokio::time::sleep(Duration::from_millis(100)).await;
let msg = TestMessage {
content: "Hello from Rust".to_string(),
};
let json_msg = serde_json::to_value(&msg).unwrap();
broker
.publish("test-topic", json_msg)
.await
.expect("Failed to publish");
if let Some(Ok(Message::Text(text))) = ws_stream.next().await {
println!("Received: {}", text);
let received: serde_json::Value = serde_json::from_str(&text).unwrap();
assert_eq!(received["content"], "Hello from Rust");
} else {
panic!("Did not receive message");
}
let ws_msg = serde_json::json!({ "content": "Hello from WS" }).to_string();
ws_stream
.send(Message::Text(Utf8Bytes::from(format!(
"PUBLISH test-topic {}",
ws_msg
))))
.await
.expect("Failed to publish from WS");
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
broker.subscribe::<serde_json::Value>("test-topic", move |msg| {
let tx = tx.clone();
Box::pin(async move {
if msg["content"] == "Hello from WS" {
tx.send(()).await.unwrap();
}
})
});
let result = tokio::time::timeout(Duration::from_secs(1), rx.recv()).await;
assert!(result.is_ok(), "Timed out waiting for message from WS");
}