surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the router for the connection so messages recieved from the TCP connection
//! from the server can find their way back to the sender.
//!
//! # Notes
//! At the time of 25th Sept 2024 the operations have been moved into their own modules and handled here
//! in this actor. This reduces the complexity of the code making it easier to understand the flow of the
//! messages. It also enables the individual operations to be unit tested much more easily. The tests
//! in this module still run and pass but if they start to break due to feature changes, it is best to drop
//! them and maintain easier tests as the individual operations now have their own tests
//! [contact Maxwell Flitton for comment].
//!
//! # Routes
//! The router supports the following routes:
//!
//! ## Register
//! The register route is used to register a client actor with the router so the router
//! can send messages back to the client actor with the following loop:
//! ```bash
//! [writer] -> (message containing actor sender) -> [router] -> (message containing id) -> [transaction_handle]
//! ```
//!
//! ## Deregister
//! This follows the same path as the register route but is used to deregister the client actor meaning the ID is
//! sent instead of an actor sender:
//! ```bash
//! [writer] -> (message containing actor ID) -> [router] -> (confirmation) -> [transaction_handle]
//! ```
//!
//! ## TransactionOperation
//! This is where an operation result is sent from the client reader to the router and then back to the actor that
//! initially sent the operation to the server:
//! ```bash
//! [reader] -> (message containing operation result and actor ID) -> [router] -> (message containing operation result) -> [transaction_handle]
//! ```
//!
//! # Areas of discussion
//! The router has an allocator that is used to allocate and deallocate client actors. The allocator is unit tested
//! by itself and is essentially a vector of `Sender<TransactionMessage>` that can be used to send messages back to
//! the client actors. We must only have one actor that is sending allocate and deallocate messages to the router
//! otherwise we could get data races and undefined behaviour. Maybe a UUID could also be assigned with the allocation
//! and only one actor has this UUID, as the index for the allocator is assigned to every message in order to get
//! routed.
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::instrument;

use surrealcs_kernel::{
	allocator::Allocator, logging::messages::actors_client::router::log_client_router_message,
	messages::client::message::TransactionMessage,
};

mod deregister;
mod register;
mod transaction_operation;

/// Accepts messages from the writer and reader actors to return messages
/// back to the actor that initially sent the message.
///
/// # Arguments
/// * `rx`: The receiver to accept transaction messages on
/// * `address`: The address of the server
/// * `connection_id`: The ID of the connection
/// * `reader_handle`: The handle to the reader actor (for shutdown purposes)
#[instrument(name = "client_router_actor", skip(rx))]
pub async fn router_actor(
	mut rx: mpsc::Receiver<TransactionMessage>,
	address: String,
	connection_id: String,
	reader_handle: JoinHandle<()>,
) {
	let mut allocator = Allocator::new();

	while let Some(message) = rx.recv().await {
		log_client_router_message(&message);
		match message {
			// allocating client ID
			TransactionMessage::Register(sender) => {
				register::register(sender, &mut allocator).await;
				continue;
			}
			TransactionMessage::Deregister(index) => {
				deregister::deregister(index, &mut allocator).await;
				continue;
			}
			// send back to actor waiting
			TransactionMessage::TransactionOperation(unwrapped_message) => {
				transaction_operation::send_transaction_operation(
					unwrapped_message,
					&mut allocator,
				)
				.await;
				continue;
			}
			TransactionMessage::CloseConnection => {
				reader_handle.abort();
				tracing::trace!("shutting down router actor for connection: {}", connection_id);
				break;
			}
			_ => {}
		}
		tracing::error!("the message that slipped through for the router: {:?}", message);
	}
}

#[cfg(test)]
mod tests {

	use super::*;
	use surrealcs_kernel::messages::server::interface::ServerMessage;
	use surrealcs_kernel::messages::server::wrapper::WrappedServerMessage;
	use tokio::sync::oneshot;
	use tokio::time::{timeout, Duration};

	static CONNECTION_ID: &str = "1-1234567890";

	#[tokio::test]
	async fn test_ok() {
		let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
		let address = "127.0.0:8080".to_string();
		let reader_actor_sim = async {};
		let reader_handle = tokio::spawn(reader_actor_sim);

		tokio::spawn(async move {
			router_actor(rx, address, CONNECTION_ID.into(), reader_handle).await;
		});
		let (tx_1, mut rx_1) = mpsc::channel::<TransactionMessage>(32);
		let (tx_2, mut rx_2) = mpsc::channel::<TransactionMessage>(32);
		let (tx_3, mut rx_3) = mpsc::channel::<TransactionMessage>(32);
		tx.send(TransactionMessage::Register(tx_1.clone())).await.unwrap();
		tx.send(TransactionMessage::Register(tx_2.clone())).await.unwrap();
		tx.send(TransactionMessage::Register(tx_3.clone())).await.unwrap();
		let response_1 = rx_1.recv().await.unwrap();
		let response_2 = rx_2.recv().await.unwrap();
		let response_3 = rx_3.recv().await.unwrap();
		match response_1 {
			TransactionMessage::Registered(index) => {
				assert_eq!(index, 0);
			}
			_ => {
				panic!("unexpected response");
			}
		}
		match response_2 {
			TransactionMessage::Registered(index) => {
				assert_eq!(index, 1);
			}
			_ => {
				panic!("unexpected response");
			}
		}
		match response_3 {
			TransactionMessage::Registered(index) => {
				assert_eq!(index, 2);
			}
			_ => {
				panic!("unexpected response");
			}
		}
		tx.send(TransactionMessage::Deregister(1)).await.unwrap();
		let response = rx_2.recv().await.unwrap();
		match response {
			TransactionMessage::Unregistered => {}
			_ => {
				panic!("unexpected response");
			}
		}
		tx.send(TransactionMessage::TransactionOperation(WrappedServerMessage::new(
			0,
			ServerMessage::Ping(0),
			CONNECTION_ID.into(),
		)))
		.await
		.unwrap();
		let response = rx_1.recv().await.unwrap();
		match response {
			TransactionMessage::TransactionOperation(wrapped_message) => {
				assert_eq!(wrapped_message.client_id, 0);
				assert_eq!(wrapped_message.message, ServerMessage::Ping(0));
			}
			_ => {
				panic!("unexpected response");
			}
		}
	}

	#[tokio::test]
	async fn test_out_of_bounds_unregister() {
		let reader_actor_sim = async {};
		let reader_handle = tokio::spawn(reader_actor_sim);
		let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
		let address = "127.0.0:8080".to_string();
		tokio::spawn(async move {
			router_actor(rx, address, CONNECTION_ID.into(), reader_handle).await;
		});
		let (_tx_1, mut rx_1) = mpsc::channel::<TransactionMessage>(32);
		tx.send(TransactionMessage::Deregister(0)).await.unwrap();
		let timeout_duration = Duration::from_secs(1);

		let response = timeout(timeout_duration, rx_1.recv()).await;
		match response {
			Ok(_) => {
				panic!("an unregister that is out of bounds should not return a response");
			}
			Err(err) => {
				assert_eq!(format!("{:?}", err), "Elapsed(())");
			}
		}
	}

	#[tokio::test]
	async fn test_already_deallocated_unregister() {
		let reader_actor_sim = async {};
		let reader_handle = tokio::spawn(reader_actor_sim);
		let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
		let address = "127.0.0:8080".to_string();
		tokio::spawn(async move {
			router_actor(rx, address, CONNECTION_ID.into(), reader_handle).await;
		});
		let (tx_1, mut rx_1) = mpsc::channel::<TransactionMessage>(32);
		tx.send(TransactionMessage::Register(tx_1.clone())).await.unwrap();
		let response = rx_1.recv().await.unwrap();
		match response {
			TransactionMessage::Registered(index) => {
				assert_eq!(index, 0);
			}
			_ => {
				panic!("unexpected response from the register");
			}
		}

		tx.send(TransactionMessage::Deregister(0)).await.unwrap();
		let response = rx_1.recv().await.unwrap();
		match response {
			TransactionMessage::Unregistered => {}
			_ => {
				panic!("unexpected response from the deregister");
			}
		}

		tx.send(TransactionMessage::Deregister(0)).await.unwrap();
		let timeout_duration = Duration::from_secs(1);

		let response = timeout(timeout_duration, rx_1.recv()).await;
		match response {
			Ok(_) => {
				panic!("an unregister that is out of bounds should not return a response");
			}
			Err(err) => {
				assert_eq!(format!("{:?}", err), "Elapsed(())");
			}
		}
	}

	#[tokio::test]
	async fn test_register_deregister_ok() {
		let reader_actor_sim = async {};
		let reader_handle = tokio::spawn(reader_actor_sim);
		let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
		let address = "127.0.0:8080".to_string();
		tokio::spawn(async move {
			router_actor(rx, address, CONNECTION_ID.into(), reader_handle).await;
		});
		let (tx_1, mut rx_1) = mpsc::channel::<TransactionMessage>(32);
		tx.send(TransactionMessage::Register(tx_1.clone())).await.unwrap();

		let response = rx_1.recv().await.unwrap();
		match response {
			TransactionMessage::Registered(index) => {
				assert_eq!(index, 0);
			}
			_ => {
				panic!("unexpected response from the register");
			}
		}
		tx.send(TransactionMessage::Deregister(0)).await.unwrap();
		let response = rx_1.recv().await.unwrap();
		match response {
			TransactionMessage::Unregistered => {}
			_ => {
				panic!("unexpected response from the deregister");
			}
		}

		// assert that the client ID is infact deallocated and cannot be accessed
		tx.send(TransactionMessage::Deregister(0)).await.unwrap();
		let timeout_duration = Duration::from_secs(1);

		let response = timeout(timeout_duration, rx_1.recv()).await;
		match response {
			Ok(_) => {
				panic!("an unregister that is out of bounds should not return a response");
			}
			Err(err) => {
				assert_eq!(format!("{:?}", err), "Elapsed(())");
			}
		}
	}

	#[tokio::test]
	async fn test_shutdown_not_called() {
		let (wr_tx, mut wr_rx) = mpsc::channel::<Option<oneshot::Sender<bool>>>(32);

		let reader_handle = tokio::spawn(async move {
			while let Some(tx) = wr_rx.recv().await {
				tx.unwrap().send(true).unwrap();
			}
		});
		let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
		let address = "127.0.0:8080".to_string();
		let router_handle = tokio::spawn(async move {
			router_actor(rx, address, CONNECTION_ID.into(), reader_handle).await;
		});

		// assert that the writer task does not shutdown
		let (one_tx, one_rx) = oneshot::channel();
		wr_tx.send(Some(one_tx)).await.unwrap();
		let response = one_rx.await.unwrap();
		assert!(response);

		tx.send(TransactionMessage::CloseConnection).await.unwrap();

		// assert that the router has shutdown
		let timeout_duration = Duration::from_secs(1);
		timeout(timeout_duration, router_handle).await.unwrap().unwrap();

		// assert that the writer task has shutdown
		let (two_tx, _two_rx) = oneshot::channel();
		if (wr_tx.send(Some(two_tx)).await).is_ok() {
			panic!("the writer task should have shutdown");
		}
	}
}