surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the proceedure of registering a transaction handle to a connection Actor.
//!
//! # Flow
//! The transaction handle sends a message to the writer actor. The writer actor then sends the message to the
//! server via TCP. The transaction handle then awaits the response from the server via the router actor.
//! ```bash
//! [Transaction Handle] -> [Writer Actor] -> [Server] -> [Reader Actor] -> [Router Actor] -> [Transaction Handle]
//! ```
use futures::Future;
use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
use surrealcs_kernel::logging::messages::connections::id::create_id;
use surrealcs_kernel::messages::client::{message::TransactionMessage, router::RouterMessage};
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};

/// Registers a transaction handle to a connection Actor.
///
/// # Arguments
/// * `closure`: A closure that enables us to send a message to the main router
/// * `message` The initial transaction operation that is performed when registering the transaction
///
/// # Returns
/// - a sender to the connection actor,
/// - the ID of the client router,
/// - handle reciever to recieve the update from the operation sent
pub async fn establish_transaction_connection<F, Fut>(
	closure: F,
) -> Result<
	(mpsc::Sender<TransactionMessage>, usize, mpsc::Receiver<TransactionMessage>, String),
	NanoServiceError,
>
where
	F: FnOnce(RouterMessage) -> Fut,
	Fut: Future<Output = Result<RouterMessage, NanoServiceError>>,
{
	// define the channel for the handle
	let (h_tx, mut h_rx) = mpsc::channel::<TransactionMessage>(32);

	// get a connection from the router (dependency injection used to aid testing)
	let message: RouterMessage = RouterMessage::GetConnection;
	let (writer_tx, connection_index) = match closure(message).await? {
		RouterMessage::ReturnConnection(sender) => sender,
		RouterMessage::ConnectionPoolClosed => {
			return Err(NanoServiceError::new(
				"Connection pool closed".to_string(),
				NanoServiceErrorStatus::Unknown,
			))
		}
		_ => {
			return Err(NanoServiceError::new(
				"connection not returned".to_string(),
				NanoServiceErrorStatus::Unknown,
			))
		}
	};

	// register with that connection's router
	// look into attaching a message to the register for the first registration
	let message = TransactionMessage::Register(h_tx);
	match writer_tx.send(message).await {
		Ok(_) => {}
		Err(e) => {
			return Err(NanoServiceError::new(
				format!("error sending Registration message: {}", e),
				NanoServiceErrorStatus::Unknown,
			))
		}
	};

	// wait for the registration message with a 2 second timeout
	let timeout_duration = Duration::from_secs(2);
	let message = match timeout(timeout_duration, h_rx.recv()).await {
		Ok(Some(message)) => message,
		Ok(None) => {
			return Err(NanoServiceError::new(
				"Registration message not returned".to_string(),
				NanoServiceErrorStatus::Unknown,
			))
		}
		Err(e) => {
			return Err(NanoServiceError::new(
				format!("Registration message not returned with error: {}", e),
				NanoServiceErrorStatus::Unknown,
			))
		}
	};
	// get the index of the allocator of the connection router (also known as transaction id)
	let client_id: usize = match message {
		TransactionMessage::Registered(id) => id,
		_ => {
			return Err(NanoServiceError::new(
				"Registration ID not returned".to_string(),
				NanoServiceErrorStatus::Unknown,
			))
		}
	};

	let connection_id = create_id(connection_index);

	Ok((writer_tx, client_id, h_rx, connection_id))
}