surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the method in which a transaction client handle can send a message to the writer actor that is
//! associated with the transaction.
//!
//! # Notes
//! - This approach is used to allow us to unit test transaction handles without needing to have an entire system
//!   running.
//! - This process is not involved in the shutdown as we directly go for the connections via the router.
use super::interface::Transaction;
use super::utils::await_message;
use nanoservices_utils::errors::NanoServiceError;
use std::future::Future;
use surrealcs_kernel::messages::client::message::TransactionMessage;
use surrealcs_kernel::messages::server::{
	interface::{ServerMessage, ServerTransactionMessage},
	wrapper::WrappedServerMessage,
};
use tracing::instrument;

pub type BridgeResponse = Result<ServerTransactionMessage, NanoServiceError>;

/// A handle to enable a client to send a message to the transaction client actor.
pub struct BridgeHandle;

/// The trait that enables a client to send a message to the transaction client actor.
pub trait SendToTransactionClientActor {
	/// Sends a message to the transaction client actor.
	///
	/// # Arguments
	/// * `tx`: The sender to the transaction client actor
	/// * `message`: The message to send to the transaction client actor
	///
	/// # Returns
	/// The response from the transaction client actor
	fn send_to_transaction_client_actor<T: Send>(
		self_ref: &mut Transaction<T>,
		message: ServerMessage,
	) -> impl Future<Output = BridgeResponse> + Send;
}

impl SendToTransactionClientActor for BridgeHandle {
	/// Sends a message to the transaction client actor.
	///
	/// # Arguments
	/// * `self_ref`: Reference to the transaction that is sending the message
	/// * `message`: The message to send to the transaction client actor
	///
	/// # Returns
	/// A future to send the message to the transaction client actor
	fn send_to_transaction_client_actor<T: Send>(
		self_ref: &mut Transaction<T>,
		message: ServerMessage,
	) -> impl Future<Output = BridgeResponse> + Send {
		internal_send(self_ref, message)
	}
}

/// Sends a message to the actor that will send the message to the router, and then awaits a response from the server.
///
/// # Arguments
/// * `self_ref`: the reference to the transaction that is sending the message
/// * `message`: the message to send to the server
///
/// # Returns
/// The response message from the server
#[instrument(name = "internal_send", skip(self_ref))]
async fn internal_send<T>(self_ref: &mut Transaction<T>, message: ServerMessage) -> BridgeResponse {
	let mut message =
		WrappedServerMessage::new(self_ref.client_id, message, self_ref.connection_id.clone())
			.transaction_id(self_ref.transaction_id.clone());
	message.server_id = self_ref.server_id;
	let message = TransactionMessage::TransactionOperation(message);
	self_ref.sender.send(message).await.unwrap();

	// await the response from the server
	let output = await_message(self_ref).await?;
	let (server_id, transaction) = output.extract_transaction_operation()?;
	self_ref.server_id = Some(server_id);
	Ok(transaction)
}

#[cfg(test)]
mod tests {

	use std::marker::PhantomData;
	use surrealcs_kernel::messages::server::kv_operations::MessagePut;

	use super::super::interface::Any;
	use super::*;
	use tokio::sync::mpsc;

	#[tokio::test]
	async fn test_internal_send() {
		async fn actor(
			mut rx: mpsc::Receiver<TransactionMessage>,
			tx: mpsc::Sender<TransactionMessage>,
		) {
			let message = rx.recv().await.unwrap();
			tx.send(message).await.unwrap();
		}

		let (transaction_tx, transaction_rx) = mpsc::channel::<TransactionMessage>(32);

		let (tx, rx) = mpsc::channel::<TransactionMessage>(32);

		tokio::spawn(async move {
			actor(rx, transaction_tx).await;
		});

		let mut transaction = Transaction::<Any> {
			client_id: 0,
			connection_id: "1-1234567890".to_string(),
			server_id: Some(1),
			transaction_id: "1-1234567890".to_string(),
			sender: tx,
			receiver: transaction_rx,
			state: PhantomData,
		};
		let message = ServerMessage::SendOperation(ServerTransactionMessage::Put(MessagePut {
			key: b"key".to_vec(),
			value: b"value".to_vec(),
			version: None,
		}));
		let response = BridgeHandle::send_to_transaction_client_actor(&mut transaction, message)
			.await
			.unwrap();

		let response = match response {
			ServerTransactionMessage::Put(message) => message,
			_ => panic!("wrong message type"),
		};

		assert_eq!(response.key, b"key".to_vec());
		assert_eq!(response.value, b"value".to_vec());
	}
}