surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the actor responsible for reading messages from a TCP connection from the server.
//!
//! # Flow
//! The reader listens to the TCP stream for messages from the server. When a message is recieved
//! from the server, it sends the message to the router actor which will then send the message to
//! the actor that is awaiting a response. Messages should have an allocator id for the router
//! actor to know which actor to send the message to.
//! ```bash
//! [SERVER] -> (TCP Stream) -> [Reader] -> [Router] -> [Awaiting Actor]
//! ```
//!
//! # Areas of discussion
//! Right now if there is an error reading form the TCP stream, nothing happens. It would be
//! good to agree on a plan of action for how this is handled. Right now I am thinking that the
//! connection is probably gone so the connection error should be logged and another connection
//! should be attempted?
use tokio::{net::tcp::OwnedReadHalf, sync::mpsc};
use tracing::instrument;

use surrealcs_kernel::logging::messages::actors_client::reader::log_client_reader_message;
use surrealcs_kernel::messages::{
	client::message::TransactionMessage,
	serialization::bincode_processes::message::deserialize_from_stream,
	server::wrapper::WrappedServerMessage,
};

/// The actor responsible for reading messages from the server.
///
/// # Arguments
/// * `reader`: The reader half of the TCP stream
/// * `router_tx`: The sender for the router actor
#[instrument(level = "trace", target = "surrealcs::client::reader", skip_all)]
pub async fn reader_actor(
	mut reader: OwnedReadHalf,
	router_tx: mpsc::Sender<TransactionMessage>,
	connection_id: String,
) {
	loop {
		let wrapped_message: WrappedServerMessage = match deserialize_from_stream(&mut reader).await
		{
			Ok(message) => message,
			Err(e) => {
				tracing::error!(
					"error when deserializing message from stream: {} -> [{}]",
					e,
					connection_id
				);
				break;
			}
		};

		// check just for logging purposes
		log_client_reader_message(&wrapped_message);
		let transaction_message = TransactionMessage::TransactionOperation(wrapped_message);

		// send back to the router actor to be sent to whatever actor is awaiting a response
		// unwrap is present here because if the router actor is dead then the whole chain is broken
		// route actor has no unwraps now
		router_tx.send(transaction_message).await.unwrap();
	}
}

#[cfg(test)]
mod tests {

	use super::*;
	use surrealcs_kernel::messages::serialization::bincode_processes::message::serialize_server_message;
	use surrealcs_kernel::messages::server::interface::ServerMessage;
	use tokio::io::AsyncWriteExt;
	use tokio::net::{TcpListener, TcpStream};

	static CONNECTION_ID: &str = "1-1234567890";

	#[tokio::test]
	async fn test_reader_actor() {
		let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
		let addr = listener.local_addr().unwrap();

		let mut client_stream = TcpStream::connect(addr).await.unwrap();

		let server_stream = listener.accept().await.unwrap().0;
		let (server_reader, _) = server_stream.into_split();

		let (tx, mut rx) = mpsc::channel::<TransactionMessage>(32);
		tokio::spawn(async move {
			reader_actor(server_reader, tx, CONNECTION_ID.into()).await;
		});

		let message = WrappedServerMessage::new(20, ServerMessage::Ping(1), CONNECTION_ID.into());
		let serialized = serialize_server_message(&message).unwrap();
		client_stream.write_all(&serialized).await.unwrap();

		let transaction_message = rx.recv().await.unwrap();
		match transaction_message {
			TransactionMessage::TransactionOperation(wrapped_message) => {
				assert_eq!(wrapped_message.client_id, 20);
				assert_eq!(wrapped_message.message, ServerMessage::Ping(1));
			}
			_ => {
				panic!("Wrong message type received should have been TransactionOperation! but was {:?}", transaction_message);
			}
		}
	}
}