use std::future::Future;
use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
use once_cell::sync::Lazy;
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;
use crate::connection::constructor as connection_constructor;
use crate::utils::env::{get_url, EnviroVarConfig, GetEnv};
use surrealcs_kernel::allocator::Allocator;
use surrealcs_kernel::messages::client::message::TransactionMessage;
use surrealcs_kernel::messages::client::router::{RouterMessage, WrappedRouterMessage};
pub type ConAlloc = Allocator<mpsc::Sender<TransactionMessage>>;
pub async fn create_connection_pool(
address: &str,
number: Option<usize>,
) -> Result<Vec<usize>, NanoServiceError> {
let mut connections = Vec::new();
let num = number.unwrap_or(num_cpus::get());
for _ in 0..num {
let connection = create_connection(address, send_to_router).await?;
connections.push(connection);
}
Ok(connections)
}
#[allow(unused)]
pub async fn close_connection_pool() -> Result<(), NanoServiceError> {
let message = RouterMessage::CloseConnectionPool;
send_to_router(message).await?;
Ok(())
}
pub async fn create_connection<F, Fut>(address: &str, closure: F) -> Result<usize, NanoServiceError>
where
F: FnOnce(RouterMessage) -> Fut,
Fut: Future<Output = Result<RouterMessage, NanoServiceError>>,
{
let message = RouterMessage::MakeConnection(address.into());
let response = closure(message).await?;
match response {
RouterMessage::ConnectionCreated(index) => Ok(index),
_ => Err(NanoServiceError::new(
"response was not ConnectionCreated when creating a connection".to_string(),
NanoServiceErrorStatus::Unknown,
)),
}
}
async fn inner_create_connection(address: String, allocator: &mut ConAlloc) -> usize {
let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
let connection_id = allocator.allocate(tx.clone());
connection_constructor(address.clone(), rx, tx, connection_id).await;
connection_id
}
fn get_and_return_allocated_connection(
index: usize,
allocator: &mut ConAlloc,
message: WrappedRouterMessage,
) {
match allocator.extract_connection(index) {
Ok(sender) => {
let response = RouterMessage::ReturnConnection(sender);
match message.tx.send(response) {
Ok(_) => {}
Err(_) => {
tracing::error!(
"error when sending message back to client for getting a connection"
);
}
}
}
Err(e) => {
tracing::error!("error when extracting a connection: {}", e.message);
}
}
}
#[instrument(name = "actor", level = "trace", target = "surrealcs::client::router", skip_all)]
pub async fn router_actor<T: GetEnv>(
mut rx: mpsc::Receiver<WrappedRouterMessage>,
tx: mpsc::Sender<WrappedRouterMessage>,
) {
let mut allocator: Allocator<mpsc::Sender<TransactionMessage>> = Allocator::new();
let _tx = tx.clone();
let mut shutdown = false;
while let Some(message) = rx.recv().await {
match message.message {
RouterMessage::MakeConnection(url) => {
let index = inner_create_connection(url, &mut allocator).await;
let respone = RouterMessage::ConnectionCreated(index);
match message.tx.send(respone) {
Ok(_) => {}
Err(_) => {
tracing::error!(
"error sending message back to client when creating the connection"
);
}
};
}
RouterMessage::CloseConnection(index) => {
let return_message: RouterMessage = RouterMessage::ConnectionClosed;
match message.tx.send(return_message) {
Ok(_) => {}
Err(_) => {
tracing::error!("error is sending message when closing connection");
}
}
let _ = allocator.deallocate(index);
}
RouterMessage::GetConnection => {
if shutdown {
let _ = message.tx.send(RouterMessage::ConnectionPoolClosed);
continue;
}
if let Some(index) = allocator.yield_next_allocated_index() {
get_and_return_allocated_connection(index, &mut allocator, message);
} else {
inner_create_connection(get_url::<T>(), &mut allocator).await;
let index = allocator.yield_next_allocated_index().unwrap();
get_and_return_allocated_connection(index, &mut allocator, message);
}
}
RouterMessage::CloseConnectionPool => {
tracing::trace!("Closing the connection pool");
shutdown = true;
for connection in allocator.yield_connections() {
match connection.send(TransactionMessage::CloseConnection).await {
Ok(_) => {}
Err(_) => {
tracing::error!("error when closing connection");
}
}
}
let _ = message.tx.send(RouterMessage::ConnectionPoolClosed);
}
_ => tracing::error!("Unknown message!"),
}
}
}
pub async fn send_to_router(message: RouterMessage) -> Result<RouterMessage, NanoServiceError> {
static ROUTER_CHANNEL: Lazy<mpsc::Sender<WrappedRouterMessage>> = Lazy::new(|| {
let (tx, rx) = mpsc::channel::<WrappedRouterMessage>(32);
let tx_ref = tx.clone();
tokio::spawn(async move {
router_actor::<EnviroVarConfig>(rx, tx_ref).await;
});
tx
});
let (tx, rx) = oneshot::channel();
let wrapped_message = WrappedRouterMessage {
message,
tx,
};
ROUTER_CHANNEL
.send(wrapped_message)
.await
.map_err(|e| NanoServiceError::new(e.to_string(), NanoServiceErrorStatus::Unknown))?;
let response = rx
.await
.map_err(|e| NanoServiceError::new(e.to_string(), NanoServiceErrorStatus::Unknown))?;
Ok(response)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_close_connection_pool() {
let (tx, rx) = mpsc::channel::<WrappedRouterMessage>(32);
let tx_two = tx.clone();
let _router_handle = tokio::spawn(async {
tokio::spawn(async move {
router_actor::<EnviroVarConfig>(rx, tx_two).await;
});
});
let (tx_one, rx_one) = tokio::sync::oneshot::channel::<RouterMessage>();
let message = RouterMessage::CloseConnectionPool;
let wrapped_message = WrappedRouterMessage {
message,
tx: tx_one,
};
let _ = tx.send(wrapped_message).await;
let response = rx_one.await.unwrap();
match response {
RouterMessage::ConnectionPoolClosed => {}
_ => panic!("expected ConnectionPoolClosed"),
}
let message = RouterMessage::GetConnection;
let (tx_one, rx_one) = tokio::sync::oneshot::channel::<RouterMessage>();
let wrapped_message = WrappedRouterMessage {
message,
tx: tx_one,
};
let _ = tx.send(wrapped_message).await;
let response = rx_one.await.unwrap();
match response {
RouterMessage::ConnectionPoolClosed => {}
_ => panic!("expected ConnectionPoolClosed"),
}
}
}