use tokio::{net::tcp::OwnedReadHalf, sync::mpsc};
use tracing::instrument;
use surrealcs_kernel::logging::messages::actors_client::reader::log_client_reader_message;
use surrealcs_kernel::messages::{
client::message::TransactionMessage,
serialization::bincode_processes::message::deserialize_from_stream,
server::wrapper::WrappedServerMessage,
};
#[instrument(level = "trace", target = "surrealcs::client::reader", skip_all)]
pub async fn reader_actor(
mut reader: OwnedReadHalf,
router_tx: mpsc::Sender<TransactionMessage>,
connection_id: String,
) {
loop {
let wrapped_message: WrappedServerMessage = match deserialize_from_stream(&mut reader).await
{
Ok(message) => message,
Err(e) => {
tracing::error!(
"error when deserializing message from stream: {} -> [{}]",
e,
connection_id
);
break;
}
};
log_client_reader_message(&wrapped_message);
let transaction_message = TransactionMessage::TransactionOperation(wrapped_message);
router_tx.send(transaction_message).await.unwrap();
}
}
#[cfg(test)]
mod tests {
use super::*;
use surrealcs_kernel::messages::serialization::bincode_processes::message::serialize_server_message;
use surrealcs_kernel::messages::server::interface::ServerMessage;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
static CONNECTION_ID: &str = "1-1234567890";
#[tokio::test]
async fn test_reader_actor() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let mut client_stream = TcpStream::connect(addr).await.unwrap();
let server_stream = listener.accept().await.unwrap().0;
let (server_reader, _) = server_stream.into_split();
let (tx, mut rx) = mpsc::channel::<TransactionMessage>(32);
tokio::spawn(async move {
reader_actor(server_reader, tx, CONNECTION_ID.into()).await;
});
let message = WrappedServerMessage::new(20, ServerMessage::Ping(1), CONNECTION_ID.into());
let serialized = serialize_server_message(&message).unwrap();
client_stream.write_all(&serialized).await.unwrap();
let transaction_message = rx.recv().await.unwrap();
match transaction_message {
TransactionMessage::TransactionOperation(wrapped_message) => {
assert_eq!(wrapped_message.client_id, 20);
assert_eq!(wrapped_message.message, ServerMessage::Ping(1));
}
_ => {
panic!("Wrong message type received should have been TransactionOperation! but was {:?}", transaction_message);
}
}
}
}