use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, RwLock};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use uuid::Uuid;
use crate::progress::{ProgressManager, ProgressUpdate};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum WebSocketMessage {
Subscribe { operation_id: Option<Uuid> },
Unsubscribe { operation_id: Option<Uuid> },
ProgressUpdate(ProgressUpdate),
Error { message: String },
Ping,
Pong,
}
#[derive(Debug)]
pub struct WebSocketClient {
id: Uuid,
#[allow(dead_code)]
sender: crossbeam_channel::Sender<ProgressUpdate>,
subscriptions: Arc<RwLock<Vec<Uuid>>>,
}
impl WebSocketClient {
#[must_use]
pub fn new(sender: crossbeam_channel::Sender<ProgressUpdate>) -> Self {
Self {
id: Uuid::new_v4(),
sender,
subscriptions: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn handle_connection(&self, stream: TcpStream, addr: SocketAddr) -> Result<()> {
let ws_stream = accept_async(stream).await?;
let (ws_sender, mut ws_receiver) = ws_stream.split();
let subscriptions = self.subscriptions.clone();
let client_id = self.id;
log::info!("New WebSocket connection from {addr}");
let subscriptions_clone = subscriptions.clone();
let ws_sender = Arc::new(tokio::sync::Mutex::new(ws_sender));
tokio::spawn(async move {
while let Some(msg) = ws_receiver.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Ok(ws_msg) = serde_json::from_str::<WebSocketMessage>(&text) {
match ws_msg {
WebSocketMessage::Subscribe { operation_id } => {
let mut subs = subscriptions_clone.write().await;
if let Some(op_id) = operation_id {
if !subs.contains(&op_id) {
subs.push(op_id);
}
}
log::debug!("Client {client_id} subscribed to operation {operation_id:?}");
}
WebSocketMessage::Unsubscribe { operation_id } => {
let mut subs = subscriptions_clone.write().await;
if let Some(op_id) = operation_id {
subs.retain(|&id| id != op_id);
} else {
subs.clear();
}
log::debug!("Client {client_id} unsubscribed from operation {operation_id:?}");
}
WebSocketMessage::Ping => {
let pong = WebSocketMessage::Pong;
if let Ok(pong_text) = serde_json::to_string(&pong) {
let mut sender = ws_sender.lock().await;
let _ = sender.send(Message::Text(pong_text)).await;
}
}
_ => {
log::warn!(
"Client {client_id} sent unexpected message: {ws_msg:?}"
);
}
}
} else {
log::warn!("Client {client_id} sent invalid JSON: {text}");
}
}
Ok(Message::Close(_)) => {
log::info!("Client {client_id} disconnected");
break;
}
Ok(Message::Ping(data)) => {
let mut sender = ws_sender.lock().await;
if let Err(e) = sender.send(Message::Pong(data)).await {
log::error!("Failed to send pong to client {client_id}: {e}");
break;
}
}
Err(e) => {
log::error!("WebSocket error for client {client_id}: {e}");
break;
}
_ => {}
}
}
});
Ok(())
}
}
#[derive(Debug)]
pub struct WebSocketServer {
progress_manager: Arc<ProgressManager>,
clients: Arc<RwLock<HashMap<Uuid, WebSocketClient>>>,
port: u16,
}
impl WebSocketServer {
#[must_use]
pub fn new(port: u16) -> Self {
Self {
progress_manager: Arc::new(ProgressManager::new()),
clients: Arc::new(RwLock::new(HashMap::new())),
port,
}
}
#[must_use]
pub fn progress_manager(&self) -> Arc<ProgressManager> {
self.progress_manager.clone()
}
pub async fn start(&self) -> Result<()> {
let addr = format!("127.0.0.1:{}", self.port);
let listener = TcpListener::bind(&addr).await?;
log::info!("WebSocket server listening on {addr}");
let progress_manager = self.progress_manager.clone();
tokio::spawn(async move {
let _ = progress_manager.run();
});
let clients = self.clients.clone();
let progress_sender = self.progress_manager.sender();
while let Ok((stream, addr)) = listener.accept().await {
let client = WebSocketClient::new(progress_sender.clone());
let client_id = client.id;
{
let mut clients = clients.write().await;
clients.insert(client_id, client);
}
let clients_clone = clients.clone();
tokio::spawn(async move {
if let Some(client) = clients_clone.read().await.get(&client_id) {
if let Err(e) = client.handle_connection(stream, addr).await {
log::error!("Error handling WebSocket connection from {addr}: {e}");
}
}
clients_clone.write().await.remove(&client_id);
});
}
Ok(())
}
pub async fn client_count(&self) -> usize {
self.clients.read().await.len()
}
pub async fn broadcast(&self, message: WebSocketMessage) -> Result<()> {
let clients = self.clients.read().await;
let _message_text = serde_json::to_string(&message)?;
for client in clients.values() {
log::debug!("Broadcasting message to client {}", client.id);
}
Ok(())
}
}
#[derive(Debug)]
pub struct WebSocketClientConnection {
sender: broadcast::Sender<ProgressUpdate>,
#[allow(dead_code)]
receiver: broadcast::Receiver<ProgressUpdate>,
}
impl Default for WebSocketClientConnection {
fn default() -> Self {
Self::new()
}
}
impl WebSocketClientConnection {
#[must_use]
pub fn new() -> Self {
let (sender, receiver) = broadcast::channel(1000);
Self { sender, receiver }
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<ProgressUpdate> {
self.sender.subscribe()
}
pub fn send_update(&self, update: ProgressUpdate) -> Result<()> {
self.sender.send(update)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration as StdDuration;
#[test]
fn test_websocket_message_serialization() {
let msg = WebSocketMessage::Subscribe {
operation_id: Some(Uuid::new_v4()),
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
match deserialized {
WebSocketMessage::Subscribe { operation_id } => {
assert!(operation_id.is_some());
}
_ => panic!("Expected Subscribe message"),
}
}
#[test]
fn test_websocket_client_creation() {
let (sender, _) = crossbeam_channel::unbounded();
let client = WebSocketClient::new(sender);
assert!(!client.id.is_nil());
}
#[test]
fn test_websocket_server_creation() {
let server = WebSocketServer::new(8080);
assert_eq!(server.port, 8080);
}
#[tokio::test]
async fn test_websocket_client_connection() {
let connection = WebSocketClientConnection::new();
let mut receiver = connection.subscribe();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 10,
total: Some(100),
message: Some("test message".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
connection.send_update(update.clone()).unwrap();
let received_msg = tokio::time::timeout(StdDuration::from_millis(100), receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(received_msg.operation_name, update.operation_name);
}
#[tokio::test]
async fn test_websocket_server_creation_with_port() {
let server = WebSocketServer::new(8080);
assert_eq!(server.port, 8080);
}
#[tokio::test]
async fn test_websocket_server_progress_manager() {
let server = WebSocketServer::new(8080);
let _progress_manager = server.progress_manager();
}
#[tokio::test]
async fn test_websocket_client_creation_async() {
let (sender, _receiver) = crossbeam_channel::unbounded();
let client = WebSocketClient::new(sender);
assert!(!client.id.is_nil());
}
#[tokio::test]
async fn test_websocket_client_connection_default() {
let _connection = WebSocketClientConnection::default();
}
#[tokio::test]
async fn test_websocket_client_connection_subscribe() {
let connection = WebSocketClientConnection::new();
let _receiver = connection.subscribe();
}
#[tokio::test]
async fn test_websocket_client_connection_send_update() {
let connection = WebSocketClientConnection::new();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 50,
total: Some(100),
message: Some("test message".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
let result = connection.send_update(update);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_websocket_message_serialization_async() {
let message = WebSocketMessage::Subscribe {
operation_id: Some(Uuid::new_v4()),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
match (message, deserialized) {
(
WebSocketMessage::Subscribe { operation_id: id1 },
WebSocketMessage::Subscribe { operation_id: id2 },
) => {
assert_eq!(id1, id2);
}
_ => panic!("Message types don't match"),
}
}
#[tokio::test]
#[allow(clippy::similar_names)]
async fn test_websocket_message_ping_pong() {
let ping_message = WebSocketMessage::Ping;
let pong_message = WebSocketMessage::Pong;
let ping_json = serde_json::to_string(&ping_message).unwrap();
let pong_json = serde_json::to_string(&pong_message).unwrap();
let ping_deserialized: WebSocketMessage = serde_json::from_str(&ping_json).unwrap();
let pong_deserialized: WebSocketMessage = serde_json::from_str(&pong_json).unwrap();
assert!(matches!(ping_deserialized, WebSocketMessage::Ping));
assert!(matches!(pong_deserialized, WebSocketMessage::Pong));
}
#[tokio::test]
async fn test_websocket_message_unsubscribe() {
let message = WebSocketMessage::Unsubscribe {
operation_id: Some(Uuid::new_v4()),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
match (message, deserialized) {
(
WebSocketMessage::Unsubscribe { operation_id: id1 },
WebSocketMessage::Unsubscribe { operation_id: id2 },
) => {
assert_eq!(id1, id2);
}
_ => panic!("Message types don't match"),
}
}
#[tokio::test]
async fn test_websocket_message_progress_update() {
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 75,
total: Some(100),
message: Some("Almost done".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
let message = WebSocketMessage::ProgressUpdate(update.clone());
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
match deserialized {
WebSocketMessage::ProgressUpdate(deserialized_update) => {
assert_eq!(update.operation_id, deserialized_update.operation_id);
assert_eq!(update.operation_name, deserialized_update.operation_name);
assert_eq!(update.current, deserialized_update.current);
}
_ => panic!("Expected ProgressUpdate message"),
}
}
#[tokio::test]
async fn test_websocket_message_error() {
let message = WebSocketMessage::Error {
message: "Test error".to_string(),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
match deserialized {
WebSocketMessage::Error { message: msg } => {
assert_eq!(msg, "Test error");
}
_ => panic!("Expected Error message"),
}
}
#[tokio::test]
async fn test_websocket_client_connection_multiple_updates() {
let connection = WebSocketClientConnection::new();
let mut receiver = connection.subscribe();
for i in 0..5 {
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: format!("test_{i}"),
current: i * 20,
total: Some(100),
message: Some(format!("Update {i}")),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
connection.send_update(update).unwrap();
}
for i in 0..5 {
let received_msg = tokio::time::timeout(StdDuration::from_millis(100), receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(received_msg.operation_name, format!("test_{i}"));
}
}
#[tokio::test]
async fn test_websocket_client_connection_timeout() {
let connection = WebSocketClientConnection::new();
let mut receiver = connection.subscribe();
let result = tokio::time::timeout(StdDuration::from_millis(50), receiver.recv()).await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_websocket_server_start() {
let server = WebSocketServer::new(8080);
assert_eq!(server.port, 8080);
let _server_ref = &server;
}
#[tokio::test]
async fn test_websocket_server_broadcast() {
let server = WebSocketServer::new(8080);
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 50,
total: Some(100),
message: Some("Test message".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
let result = server
.broadcast(WebSocketMessage::ProgressUpdate(update))
.await;
assert!(result.is_ok());
}
#[test]
fn test_websocket_message_debug() {
let message = WebSocketMessage::Ping;
let debug_str = format!("{message:?}");
assert!(debug_str.contains("Ping"));
}
#[test]
fn test_websocket_message_clone() {
let message = WebSocketMessage::Ping;
let cloned = message.clone();
assert_eq!(message, cloned);
}
#[test]
fn test_websocket_message_partial_eq() {
let message1 = WebSocketMessage::Ping;
let message2 = WebSocketMessage::Ping;
let message3 = WebSocketMessage::Pong;
assert_eq!(message1, message2);
assert_ne!(message1, message3);
}
#[test]
fn test_websocket_client_debug() {
let (sender, _receiver) = crossbeam_channel::unbounded();
let client = WebSocketClient::new(sender);
let debug_str = format!("{client:?}");
assert!(debug_str.contains("WebSocketClient"));
}
#[test]
fn test_websocket_client_connection_debug() {
let connection = WebSocketClientConnection::new();
let debug_str = format!("{connection:?}");
assert!(debug_str.contains("WebSocketClientConnection"));
}
#[test]
fn test_websocket_server_debug() {
let server = WebSocketServer::new(8080);
let debug_str = format!("{server:?}");
assert!(debug_str.contains("WebSocketServer"));
}
#[test]
fn test_websocket_message_subscribe_serialization() {
let message = WebSocketMessage::Subscribe {
operation_id: Some(Uuid::new_v4()),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
#[test]
fn test_websocket_message_unsubscribe_serialization() {
let message = WebSocketMessage::Unsubscribe {
operation_id: Some(Uuid::new_v4()),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
#[test]
fn test_websocket_message_progress_update_serialization() {
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test_operation".to_string(),
current: 50,
total: Some(100),
message: Some("Test message".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
let message = WebSocketMessage::ProgressUpdate(update);
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
#[test]
fn test_websocket_message_error_serialization() {
let message = WebSocketMessage::Error {
message: "Test error".to_string(),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
#[tokio::test]
async fn test_websocket_server_multiple_broadcasts() {
let server = WebSocketServer::new(8080);
let update1 = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "operation1".to_string(),
current: 25,
total: Some(100),
message: Some("First update".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
let update2 = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "operation2".to_string(),
current: 50,
total: Some(100),
message: Some("Second update".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
};
let result1 = server
.broadcast(WebSocketMessage::ProgressUpdate(update1))
.await;
let result2 = server
.broadcast(WebSocketMessage::ProgressUpdate(update2))
.await;
assert!(result1.is_ok());
assert!(result2.is_ok());
}
#[test]
fn test_websocket_server_port_access() {
let server = WebSocketServer::new(8080);
assert_eq!(server.port, 8080);
}
#[test]
fn test_websocket_client_id_generation() {
let (sender1, _receiver1) = crossbeam_channel::unbounded();
let (sender2, _receiver2) = crossbeam_channel::unbounded();
let client1 = WebSocketClient::new(sender1);
let client2 = WebSocketClient::new(sender2);
assert_ne!(client1.id, client2.id);
assert!(!client1.id.is_nil());
assert!(!client2.id.is_nil());
}
#[tokio::test]
async fn test_websocket_message_roundtrip_all_types() {
let messages = vec![
WebSocketMessage::Subscribe {
operation_id: Some(Uuid::new_v4()),
},
WebSocketMessage::Unsubscribe {
operation_id: Some(Uuid::new_v4()),
},
WebSocketMessage::Ping,
WebSocketMessage::Pong,
WebSocketMessage::ProgressUpdate(ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 0,
total: Some(100),
message: None,
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
}),
WebSocketMessage::Error {
message: "test error".to_string(),
},
];
for message in messages {
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
}
#[tokio::test]
async fn test_websocket_server_client_count() {
let server = WebSocketServer::new(8080);
let _count = server.client_count().await;
}
#[tokio::test]
async fn test_websocket_server_broadcast_error_handling() {
let server = WebSocketServer::new(8080);
let message = WebSocketMessage::Ping;
let result = server.broadcast(message).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_websocket_server_creation_with_different_ports() {
let server1 = WebSocketServer::new(8080);
let server2 = WebSocketServer::new(8081);
assert_eq!(server1.port, 8080);
assert_eq!(server2.port, 8081);
}
#[tokio::test]
async fn test_websocket_server_progress_manager_access() {
let server = WebSocketServer::new(8080);
let _progress_manager = server.progress_manager();
}
#[tokio::test]
async fn test_websocket_client_creation_with_sender() {
let (sender, _receiver) = crossbeam_channel::unbounded();
let client = WebSocketClient::new(sender);
assert!(!client.id.is_nil());
}
#[tokio::test]
async fn test_websocket_client_connection_creation() {
let (_sender, _receiver) = broadcast::channel::<ProgressUpdate>(100);
let _connection = WebSocketClientConnection::new();
}
#[tokio::test]
async fn test_websocket_message_error_creation() {
let error_msg = WebSocketMessage::Error {
message: "Test error".to_string(),
};
match error_msg {
WebSocketMessage::Error { message: msg } => assert_eq!(msg, "Test error"),
_ => panic!("Expected Error variant"),
}
}
#[tokio::test]
async fn test_websocket_message_progress_update_creation() {
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 5,
total: Some(10),
status: crate::progress::ProgressStatus::InProgress,
message: Some("Test".to_string()),
timestamp: chrono::Utc::now(),
};
let message = WebSocketMessage::ProgressUpdate(update);
match message {
WebSocketMessage::ProgressUpdate(update) => {
assert_eq!(update.operation_name, "test");
assert_eq!(update.current, 5);
assert_eq!(update.total, Some(10));
}
_ => panic!("Expected ProgressUpdate variant"),
}
}
#[tokio::test]
async fn test_websocket_message_subscribe_creation() {
let operation_id = Some(Uuid::new_v4());
let message = WebSocketMessage::Subscribe { operation_id };
match message {
WebSocketMessage::Subscribe { operation_id: id } => {
assert_eq!(id, operation_id);
}
_ => panic!("Expected Subscribe variant"),
}
}
#[tokio::test]
async fn test_websocket_message_unsubscribe_creation() {
let operation_id = Some(Uuid::new_v4());
let message = WebSocketMessage::Unsubscribe { operation_id };
match message {
WebSocketMessage::Unsubscribe { operation_id: id } => {
assert_eq!(id, operation_id);
}
_ => panic!("Expected Unsubscribe variant"),
}
}
#[tokio::test]
async fn test_websocket_message_serialization_all_variants() {
let operation_id = Some(Uuid::new_v4());
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 5,
total: Some(10),
status: crate::progress::ProgressStatus::InProgress,
message: Some("Test".to_string()),
timestamp: chrono::Utc::now(),
};
let messages = vec![
WebSocketMessage::Subscribe { operation_id },
WebSocketMessage::Unsubscribe { operation_id },
WebSocketMessage::Ping,
WebSocketMessage::Pong,
WebSocketMessage::ProgressUpdate(update),
WebSocketMessage::Error {
message: "Test error".to_string(),
},
];
for message in messages {
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
}
#[tokio::test]
async fn test_websocket_server_client_count_multiple_clients() {
let server = WebSocketServer::new(8080);
assert_eq!(server.client_count().await, 0);
let _count = server.client_count().await;
}
#[tokio::test]
async fn test_websocket_server_broadcast_different_message_types() {
let server = WebSocketServer::new(8080);
let messages = vec![
WebSocketMessage::Ping,
WebSocketMessage::Pong,
WebSocketMessage::Error {
message: "Test error".to_string(),
},
WebSocketMessage::Subscribe {
operation_id: Some(Uuid::new_v4()),
},
WebSocketMessage::Unsubscribe {
operation_id: Some(Uuid::new_v4()),
},
];
for message in messages {
let result = server.broadcast(message).await;
assert!(result.is_ok());
}
}
#[tokio::test]
async fn test_websocket_client_connection_receive_update() {
let (_sender, _receiver) = broadcast::channel::<ProgressUpdate>(100);
let connection = WebSocketClientConnection::new();
let update = ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 5,
total: Some(10),
status: crate::progress::ProgressStatus::InProgress,
message: Some("Test".to_string()),
timestamp: chrono::Utc::now(),
};
connection.send_update(update.clone()).unwrap();
let received_msg = tokio::time::timeout(
std::time::Duration::from_millis(100),
connection.subscribe().recv(),
)
.await;
if let Ok(Ok(received_update)) = received_msg {
assert_eq!(received_update.operation_name, update.operation_name);
assert_eq!(received_update.current, update.current);
assert_eq!(received_update.total, update.total);
} else {
}
}
#[tokio::test]
async fn test_websocket_server_handle_connection_error_handling() {
let server = WebSocketServer::new(8080);
let _server_ref = &server;
}
#[tokio::test]
async fn test_websocket_server_start_error_handling() {
let server = WebSocketServer::new(8080);
let _server_ref = &server;
}
#[tokio::test]
async fn test_websocket_message_debug_formatting() {
let message = WebSocketMessage::Ping;
let debug_str = format!("{message:?}");
assert!(debug_str.contains("Ping"));
}
#[tokio::test]
async fn test_websocket_server_debug_formatting() {
let server = WebSocketServer::new(8080);
let debug_str = format!("{server:?}");
assert!(debug_str.contains("8080"));
}
#[tokio::test]
async fn test_websocket_client_debug_formatting() {
let (sender, _receiver) = crossbeam_channel::unbounded();
let client = WebSocketClient::new(sender);
let debug_str = format!("{client:?}");
assert!(debug_str.contains("WebSocketClient"));
}
#[tokio::test]
async fn test_websocket_client_connection_debug_formatting() {
let (_sender, _receiver) = broadcast::channel::<ProgressUpdate>(100);
let connection = WebSocketClientConnection::new();
let debug_str = format!("{connection:?}");
assert!(debug_str.contains("WebSocketClientConnection"));
}
#[tokio::test]
async fn test_websocket_server_multiple_ports() {
let server1 = WebSocketServer::new(8080);
let server2 = WebSocketServer::new(8081);
let server3 = WebSocketServer::new(8082);
assert_eq!(server1.port, 8080);
assert_eq!(server2.port, 8081);
assert_eq!(server3.port, 8082);
}
#[tokio::test]
async fn test_websocket_server_port_edge_cases() {
let server_min = WebSocketServer::new(1);
let server_max = WebSocketServer::new(65535);
assert_eq!(server_min.port, 1);
assert_eq!(server_max.port, 65535);
}
#[tokio::test]
async fn test_websocket_message_all_variants() {
let _task_id = Uuid::new_v4();
let operation_id = Uuid::new_v4();
let messages = vec![
WebSocketMessage::Subscribe {
operation_id: Some(operation_id),
},
WebSocketMessage::Unsubscribe {
operation_id: Some(operation_id),
},
WebSocketMessage::Ping,
WebSocketMessage::Pong,
WebSocketMessage::ProgressUpdate(ProgressUpdate {
operation_id,
operation_name: "test".to_string(),
current: 50,
total: Some(100),
message: Some("Testing".to_string()),
timestamp: chrono::Utc::now(),
status: crate::progress::ProgressStatus::InProgress,
}),
WebSocketMessage::Error {
message: "Test error".to_string(),
},
];
for message in messages {
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
}
}
#[tokio::test]
async fn test_websocket_message_serialization_edge_cases() {
let subscribe_none = WebSocketMessage::Subscribe { operation_id: None };
let json = serde_json::to_string(&subscribe_none).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(subscribe_none, deserialized);
let error_empty = WebSocketMessage::Error {
message: String::new(),
};
let json = serde_json::to_string(&error_empty).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(error_empty, deserialized);
}
#[tokio::test]
async fn test_websocket_client_id_uniqueness() {
let (sender1, _receiver1) = crossbeam_channel::unbounded();
let (sender2, _receiver2) = crossbeam_channel::unbounded();
let client1 = WebSocketClient::new(sender1);
let client2 = WebSocketClient::new(sender2);
assert_ne!(client1.id, client2.id);
}
#[tokio::test]
async fn test_websocket_client_connection_subscription() {
let connection = WebSocketClientConnection::new();
let mut subscriber = connection.subscribe();
let result = subscriber.try_recv();
assert!(result.is_err());
}
#[tokio::test]
async fn test_websocket_server_error_handling() {
let _server = WebSocketServer::new(8080);
let error_msg = WebSocketMessage::Error {
message: "Test error".to_string(),
};
match error_msg {
WebSocketMessage::Error { message } => {
assert_eq!(message, "Test error");
}
_ => panic!("Expected Error variant"),
}
}
#[tokio::test]
async fn test_websocket_server_connection_handling() {
let _server = WebSocketServer::new(8080);
let connection = WebSocketClientConnection::new();
let _subscriber = connection.subscribe();
}
#[tokio::test]
async fn test_websocket_server_message_serialization_edge_cases() {
let subscribe_msg = WebSocketMessage::Subscribe { operation_id: None };
let json = serde_json::to_string(&subscribe_msg).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(subscribe_msg, deserialized);
let ping_msg = WebSocketMessage::Ping;
let json = serde_json::to_string(&ping_msg).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(ping_msg, deserialized);
}
#[tokio::test]
async fn test_websocket_server_large_messages() {
let _server = WebSocketServer::new(8080);
let ping_msg = WebSocketMessage::Ping;
let json = serde_json::to_string(&ping_msg).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(ping_msg, deserialized);
}
#[tokio::test]
async fn test_websocket_server_concurrent_operations() {
let _server = Arc::new(WebSocketServer::new(8080));
let mut handles = vec![];
for _i in 0..10 {
let handle = tokio::spawn(async move {
let message = WebSocketMessage::Ping;
let json = serde_json::to_string(&message).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(message, deserialized);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_websocket_server_message_roundtrip_all_variants() {
let variants = vec![
WebSocketMessage::Subscribe {
operation_id: Some(Uuid::new_v4()),
},
WebSocketMessage::Unsubscribe { operation_id: None },
WebSocketMessage::Ping,
WebSocketMessage::Pong,
WebSocketMessage::Error {
message: "error".to_string(),
},
WebSocketMessage::ProgressUpdate(ProgressUpdate {
operation_id: Uuid::new_v4(),
operation_name: "test".to_string(),
current: 1,
total: Some(10),
status: crate::progress::ProgressStatus::InProgress,
message: Some("test".to_string()),
timestamp: chrono::Utc::now(),
}),
];
for variant in variants {
let json = serde_json::to_string(&variant).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(variant, deserialized);
}
}
#[tokio::test]
async fn test_websocket_server_edge_cases() {
let minimal_ping = WebSocketMessage::Ping;
let json = serde_json::to_string(&minimal_ping).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(minimal_ping, deserialized);
let special_ping = WebSocketMessage::Ping;
let json = serde_json::to_string(&special_ping).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(special_ping, deserialized);
}
#[tokio::test]
async fn test_websocket_server_performance() {
let _server = WebSocketServer::new(8080);
let start = std::time::Instant::now();
for _i in 0..1000 {
let message = WebSocketMessage::Ping;
let _json = serde_json::to_string(&message).unwrap();
}
let elapsed = start.elapsed();
assert!(elapsed.as_millis() < 1000); }
#[tokio::test]
async fn test_websocket_server_memory_usage() {
let _server = WebSocketServer::new(8080);
let mut messages = Vec::new();
for _i in 0..100 {
let message = WebSocketMessage::Ping;
messages.push(message);
}
assert_eq!(messages.len(), 100);
for message in messages {
let _json = serde_json::to_string(&message).unwrap();
}
}
#[tokio::test]
async fn test_websocket_server_error_recovery() {
let _server = WebSocketServer::new(8080);
let malformed_json = r#"{"invalid": "json"}"#;
let result: Result<WebSocketMessage, _> = serde_json::from_str(malformed_json);
assert!(result.is_err());
let empty_json = r"{}";
let result: Result<WebSocketMessage, _> = serde_json::from_str(empty_json);
assert!(result.is_err());
}
#[tokio::test]
async fn test_websocket_server_unicode_handling() {
let _server = WebSocketServer::new(8080);
let unicode_ping = WebSocketMessage::Ping;
let json = serde_json::to_string(&unicode_ping).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(unicode_ping, deserialized);
}
#[tokio::test]
async fn test_websocket_server_nested_data() {
let _server = WebSocketServer::new(8080);
let ping_msg = WebSocketMessage::Ping;
let json = serde_json::to_string(&ping_msg).unwrap();
let deserialized: WebSocketMessage = serde_json::from_str(&json).unwrap();
assert_eq!(ping_msg, deserialized);
}
}