use std::{sync::Arc, time::Duration};
use async_tungstenite::{accept_async, tokio::{TokioAdapter, connect_async}, tungstenite::{Bytes, Message}};
use futures::{StreamExt, pin_mut};
use highly_sendable::{ConnectionMessage, ConnectionStateId};
use tokio::{join, net::TcpListener, sync::Notify, time::timeout};
use crate::{WebSocketActor, WebSocketActorInputMessage, WebSocketActorState, WebSocketActorOutputMessage};
const SERVER_URL: &str = "127.0.0.1:8080";
const SERVER_URL_CLIENT_SIDE: &str = "ws://localhost:8080";
const CHANNEL_CLOSED: &str = "Channel closed unexpectedly";
const ONE_TWO_THREE: &[u8] = &[1, 2, 3];
const MESSAGE_EXPECTED: &str = "Messsage expected";
const MESSAGE_SEND_ERROR: &str = "Messsage send error";
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn not_connected()
{
let connection_state_id = ConnectionStateId::new();
let io_client = WebSocketActorState::spawn();
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::SendMessage(Message::Binary(Bytes::new())))).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
assert_eq!(id, connection_state_id);
assert!(message.is_not_connected());
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::Close)).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
assert_eq!(id, connection_state_id);
assert!(message.is_not_connected());
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::CloseNow)).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
assert_eq!(id, connection_state_id);
assert!(message.is_not_connected());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn basics_client_side_actor_only()
{
let server_notify = Arc::new(Notify::new());
let client_notify = Arc::new(Notify::new());
let server_notify_clone = server_notify.clone();
let client_notify_clone = client_notify.clone();
let server = tokio::spawn(async move
{
let try_socket = TcpListener::bind(SERVER_URL).await;
let listener = try_socket.expect("Failed to bind");
let (stream, addr) = listener.accept().await.expect("Failed to accepet incoming connection.");
println!("{} has connected to the test server.\n", addr);
let adapted_steam = TokioAdapter::new(stream);
let ws_stream = accept_async(adapted_steam).await.expect("WebSocket handshake failed - Server");
pin_mut!(ws_stream);
println!("Server pinging\n");
let new_message = Message::Ping(Bytes::new());
ws_stream.send(new_message).await.expect(MESSAGE_SEND_ERROR);
let message_future = ws_stream.next();
let res = timeout(Duration::new(4, 0), message_future).await;
println!("Client message details:");
println!("{:#?}\n", res);
println!("Server - Closing\n");
ws_stream.close(None).await.expect(MESSAGE_SEND_ERROR);
println!("Server - Closed\n");
let next = ws_stream.next().await;
println!("Server - close frame from the client:\n");
println!("{:#?}\n", next);
client_notify_clone.notify_one();
server_notify_clone.notified().await;
});
let server_notify_clone = server_notify.clone();
let client_notify_clone = client_notify.clone();
let client = tokio::spawn(async move
{
let io_client = WebSocketActorState::spawn();
let (ws_stream, response) = connect_async(SERVER_URL_CLIENT_SIDE).await.expect("WebSocket handshake failed - Client");
println!("The client has connected with the following response: {:#?}\n", response);
let mut connection_state_id = ConnectionStateId::new();
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::WebSocket(ws_stream))).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
assert_ne!(id, connection_state_id);
assert!(message.is_web_socket_engaged());
println!("Client: Server message 1\n\n");
let message_future = io_client.output_receiver_ref().recv();
let res = timeout(Duration::new(4, 0), message_future).await;
println!("Client: Server message details - timeout:\n");
println!("{:#?}\n", res);
assert!(res.is_ok());
println!("Client: Server message 2\n\n");
let res = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
println!("Client: Server message 2 - post-not-timeout\n\n");
println!("Client: Server message details:\n");
println!("{:#?}\n", res);
println!("Client: Server message 3\n\n");
let message_future = io_client.output_receiver_ref().recv();
let res = timeout(Duration::new(4, 0), message_future).await;
println!("Client: Server message details:\n");
println!("{:#?}\n\n", res);
println!("Client - Done\n\n");
server_notify_clone.notify_one();
client_notify_clone.notified().await;
});
let (server_result, client_result) = join!(server, client);
assert!(server_result.is_ok());
assert!(client_result.is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn basics_server_side_actor_only()
{
}
#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn basics()
{
let server = tokio::spawn(async
{
let server_io_client = WebSocketActorState::spawn();
let try_socket = TcpListener::bind(SERVER_URL).await;
let listener = try_socket.expect("Failed to bind");
let (stream, addr) = listener.accept().await.expect("Failed to accepet incoming connection.");
println!("{} has connected to the test server.\n", addr);
let adapted_steam = TokioAdapter::new(stream);
let ws_stream = accept_async(adapted_steam).await.expect("WebSocket handshake failed - Server");
let mut connection_state_id = ConnectionStateId::new();
server_io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::WebSocket(ws_stream))).await.expect(CHANNEL_CLOSED);
let message = server_io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
assert_ne!(id, connection_state_id);
connection_state_id = id;
assert!(message.is_web_socket_engaged());
println!("Server sending ping\n");
let new_message = Message::Ping(Bytes::from_static(ONE_TWO_THREE));
server_io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::SendMessage(new_message))).await.expect(CHANNEL_CLOSED);
println!("Server sent ping\n");
let message = server_io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
println!("{:#?}\n", message);
assert_eq!(id, connection_state_id);
assert!(message.is_message());
if let WebSocketActorOutputMessage::Message(the_message) = message
{
assert!(the_message.is_pong());
if let Message::Pong(message_data) = the_message
{
assert_eq!(message_data, ONE_TWO_THREE);
}
}
else
{
panic!("{}", MESSAGE_EXPECTED);
}
});
let client = tokio::spawn(async
{
let io_client = WebSocketActorState::spawn();
let (ws_stream, response) = connect_async(SERVER_URL_CLIENT_SIDE).await.expect("WebSocket handshake failed - Client");
println!("The client has connected with the following response: {:#?}\n", response);
let mut connection_state_id = ConnectionStateId::new();
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::WebSocket(ws_stream))).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
assert_ne!(id, connection_state_id);
connection_state_id = id;
assert!(message.is_web_socket_engaged());
let message_future = io_client.output_receiver_ref().recv();
let res = timeout(Duration::new(4, 0), message_future).await;
println!("Client message details:");
println!("{:#?}\n", res);
assert!(res.is_ok());
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::Close)).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
println!("Client message details:");
println!("{}\n", id);
println!("{:#?}\n", message);
io_client.input_sender_ref().send(connection_state_id.connection_message(WebSocketActorInputMessage::Close)).await.expect(CHANNEL_CLOSED);
let message = io_client.output_receiver_ref().recv().await.expect(CHANNEL_CLOSED);
let (id, message) = message.take_id_and_message();
println!("Last client message details:");
println!("{}\n", id);
println!("{:#?}\n", message);
});
let (server_result, client_result) = join!(server, client);
assert!(server_result.is_ok());
assert!(client_result.is_ok());
}