surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! The router for creating and getting connections.
//!
//! # Notes
//! The router is a central actor that manages the connections for the client. it is not
//! tested as these methods use static lazy loading to create the router channel.
// Standard library imports
use std::future::Future;

// External crate imports
use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
use once_cell::sync::Lazy;
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

// Project-specific imports
use crate::connection::constructor as connection_constructor;
use crate::utils::env::{get_url, EnviroVarConfig, GetEnv};
use surrealcs_kernel::allocator::Allocator;
use surrealcs_kernel::messages::client::message::TransactionMessage;
use surrealcs_kernel::messages::client::router::{RouterMessage, WrappedRouterMessage};

pub type ConAlloc = Allocator<mpsc::Sender<TransactionMessage>>;

/// Creates a connection pool.
///
/// # Arguments
/// * `address`: the server URL to make connections to
/// * `number`: the number of connections to create (if none is provided, it will use the number of CPUs)
///
/// # Returns
/// A vector of connection IDs
pub async fn create_connection_pool(
	address: &str,
	number: Option<usize>,
) -> Result<Vec<usize>, NanoServiceError> {
	let mut connections = Vec::new();
	let num = number.unwrap_or(num_cpus::get());
	for _ in 0..num {
		let connection = create_connection(address, send_to_router).await?;
		connections.push(connection);
	}
	Ok(connections)
}

#[allow(unused)]
/// Closes all the connections and associated actors with those connections.
pub async fn close_connection_pool() -> Result<(), NanoServiceError> {
	let message = RouterMessage::CloseConnectionPool;
	send_to_router(message).await?;
	Ok(())
}

/// Creates a connection.
///
/// # Notes
/// This function is for the client to create a connection to the server. It sends a message
/// to the router to create a connection and waits for the response.
///
/// # Arguments
/// * `address`: the server URL to make connections to
///
/// # Returns
/// The index of the connection in the allocator (also known as the connection ID)
pub async fn create_connection<F, Fut>(address: &str, closure: F) -> Result<usize, NanoServiceError>
where
	F: FnOnce(RouterMessage) -> Fut,
	Fut: Future<Output = Result<RouterMessage, NanoServiceError>>,
{
	let message = RouterMessage::MakeConnection(address.into());
	let response = closure(message).await?;
	match response {
		RouterMessage::ConnectionCreated(index) => Ok(index),
		_ => Err(NanoServiceError::new(
			"response was not ConnectionCreated when creating a connection".to_string(),
			NanoServiceErrorStatus::Unknown,
		)),
	}
}

/// Creates a connection.
///
/// # Notes
/// This function is inner for directly interacting with the allocator to get a connection
/// ID. It should not be used outside of the router by the client.
///
/// # Arguments
/// * `address`: the server URL to make connections to
/// * `allocator`: the allocator to assign the new connection to
///
/// # Returns
/// The index of the connection in the allocator (also known as the connection ID)
async fn inner_create_connection(address: String, allocator: &mut ConAlloc) -> usize {
	let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
	let connection_id = allocator.allocate(tx.clone());

	connection_constructor(address.clone(), rx, tx, connection_id).await;
	connection_id
}

/// Extracts the connection sender from the allocator and sends it back to the client.
///
/// # Arguments
/// * `index`: the index of the connection to extract from the allocator
/// * `allocator`: the allocator to extract the connection from
/// * `message`: the message to get the sender for the actor from. The sender from this
///              message is used to send the connection back to the client
fn get_and_return_allocated_connection(
	index: usize,
	allocator: &mut ConAlloc,
	message: WrappedRouterMessage,
) {
	match allocator.extract_connection(index) {
		Ok(sender) => {
			let response = RouterMessage::ReturnConnection(sender);
			match message.tx.send(response) {
				Ok(_) => {}
				Err(_) => {
					tracing::error!(
						"error when sending message back to client for getting a connection"
					);
				}
			}
		}
		Err(e) => {
			tracing::error!("error when extracting a connection: {}", e.message);
		}
	}
}

/// Accepts messages to get or create connections.
///
/// # Notes
/// - should only have one actor for the entire client to access all of the connections.
/// The `GetEnv` trait is used to get the URL for the server if there are no connections
/// available.
///
/// - Proper unit testing is not fully supported for this yet, we need to have a make connection
/// dependency injection.
///
/// # Arguments
/// * `rx`: the reciever for messages
/// * `tx`: the transmitter for messages that is the same channel as the `rx` but is not
///         used
#[instrument(name = "actor", level = "trace", target = "surrealcs::client::router", skip_all)]
pub async fn router_actor<T: GetEnv>(
	mut rx: mpsc::Receiver<WrappedRouterMessage>,
	tx: mpsc::Sender<WrappedRouterMessage>,
) {
	let mut allocator: Allocator<mpsc::Sender<TransactionMessage>> = Allocator::new();
	let _tx = tx.clone();
	let mut shutdown = false;

	while let Some(message) = rx.recv().await {
		match message.message {
			RouterMessage::MakeConnection(url) => {
				let index = inner_create_connection(url, &mut allocator).await;
				let respone = RouterMessage::ConnectionCreated(index);
				match message.tx.send(respone) {
					Ok(_) => {}
					Err(_) => {
						tracing::error!(
							"error sending message back to client when creating the connection"
						);
					}
				};
			}
			RouterMessage::CloseConnection(index) => {
				let return_message: RouterMessage = RouterMessage::ConnectionClosed;
				match message.tx.send(return_message) {
					Ok(_) => {}
					Err(_) => {
						tracing::error!("error is sending message when closing connection");
					}
				}
				let _ = allocator.deallocate(index);
			}
			RouterMessage::GetConnection => {
				if shutdown {
					let _ = message.tx.send(RouterMessage::ConnectionPoolClosed);
					continue;
				}
				if let Some(index) = allocator.yield_next_allocated_index() {
					get_and_return_allocated_connection(index, &mut allocator, message);
				} else {
					inner_create_connection(get_url::<T>(), &mut allocator).await;
					// below can be unwrapped because we have just created a connection
					let index = allocator.yield_next_allocated_index().unwrap();
					get_and_return_allocated_connection(index, &mut allocator, message);
				}
			}
			RouterMessage::CloseConnectionPool => {
				tracing::trace!("Closing the connection pool");
				shutdown = true;
				for connection in allocator.yield_connections() {
					match connection.send(TransactionMessage::CloseConnection).await {
						Ok(_) => {}
						Err(_) => {
							tracing::error!("error when closing connection");
						}
					}
				}
				let _ = message.tx.send(RouterMessage::ConnectionPoolClosed);
			}
			_ => tracing::error!("Unknown message!"),
		}
	}
}

/// A handle function to send messages to the router for creating and getting connections.
///
/// # Notes
/// The reason why this is is seperate from the router actor but basically just sends
/// messages to the router actor is to act as an interface so we do not have to keep
/// track of channels and pass them around due to the lazy loading of the router channel
/// in this function. This function also spawns the router actor and the shutdown monitor,
/// decoupling the client support systems from the router actor.
///
/// # Arguments
/// * `message`: message to be sent to the router
///
/// # Returns
/// The router message that has details such as connection details
pub async fn send_to_router(message: RouterMessage) -> Result<RouterMessage, NanoServiceError> {
	static ROUTER_CHANNEL: Lazy<mpsc::Sender<WrappedRouterMessage>> = Lazy::new(|| {
		// TODO => look into expodential backoff for the channel
		// TODO => load shedding for the channel
		// TODO => will have to run experiments to see how many messages can be sent through the channel
		//         and make the total amount configurable
		let (tx, rx) = mpsc::channel::<WrappedRouterMessage>(32);
		let tx_ref = tx.clone();
		tokio::spawn(async move {
			router_actor::<EnviroVarConfig>(rx, tx_ref).await;
		});
		tx
	});

	let (tx, rx) = oneshot::channel();
	let wrapped_message = WrappedRouterMessage {
		message,
		tx,
	};
	ROUTER_CHANNEL
		.send(wrapped_message)
		.await
		.map_err(|e| NanoServiceError::new(e.to_string(), NanoServiceErrorStatus::Unknown))?;
	let response = rx
		.await
		.map_err(|e| NanoServiceError::new(e.to_string(), NanoServiceErrorStatus::Unknown))?;
	Ok(response)
}

#[cfg(test)]
mod tests {

	use super::*;

	#[tokio::test]
	async fn test_close_connection_pool() {
		let (tx, rx) = mpsc::channel::<WrappedRouterMessage>(32);
		let tx_two = tx.clone();

		let _router_handle = tokio::spawn(async {
			tokio::spawn(async move {
				router_actor::<EnviroVarConfig>(rx, tx_two).await;
			});
		});

		let (tx_one, rx_one) = tokio::sync::oneshot::channel::<RouterMessage>();
		let message = RouterMessage::CloseConnectionPool;
		let wrapped_message = WrappedRouterMessage {
			message,
			tx: tx_one,
		};
		let _ = tx.send(wrapped_message).await;

		let response = rx_one.await.unwrap();
		match response {
			RouterMessage::ConnectionPoolClosed => {}
			_ => panic!("expected ConnectionPoolClosed"),
		}

		let message = RouterMessage::GetConnection;
		let (tx_one, rx_one) = tokio::sync::oneshot::channel::<RouterMessage>();
		let wrapped_message = WrappedRouterMessage {
			message,
			tx: tx_one,
		};
		let _ = tx.send(wrapped_message).await;

		let response = rx_one.await.unwrap();

		match response {
			RouterMessage::ConnectionPoolClosed => {}
			_ => panic!("expected ConnectionPoolClosed"),
		}
	}
}