use futures::Future;
use nanoservices_utils::errors::{NanoServiceError, NanoServiceErrorStatus};
use surrealcs_kernel::logging::messages::connections::id::create_id;
use surrealcs_kernel::messages::client::{message::TransactionMessage, router::RouterMessage};
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
pub async fn establish_transaction_connection<F, Fut>(
closure: F,
) -> Result<
(mpsc::Sender<TransactionMessage>, usize, mpsc::Receiver<TransactionMessage>, String),
NanoServiceError,
>
where
F: FnOnce(RouterMessage) -> Fut,
Fut: Future<Output = Result<RouterMessage, NanoServiceError>>,
{
let (h_tx, mut h_rx) = mpsc::channel::<TransactionMessage>(32);
let message: RouterMessage = RouterMessage::GetConnection;
let (writer_tx, connection_index) = match closure(message).await? {
RouterMessage::ReturnConnection(sender) => sender,
RouterMessage::ConnectionPoolClosed => {
return Err(NanoServiceError::new(
"Connection pool closed".to_string(),
NanoServiceErrorStatus::Unknown,
))
}
_ => {
return Err(NanoServiceError::new(
"connection not returned".to_string(),
NanoServiceErrorStatus::Unknown,
))
}
};
let message = TransactionMessage::Register(h_tx);
match writer_tx.send(message).await {
Ok(_) => {}
Err(e) => {
return Err(NanoServiceError::new(
format!("error sending Registration message: {}", e),
NanoServiceErrorStatus::Unknown,
))
}
};
let timeout_duration = Duration::from_secs(2);
let message = match timeout(timeout_duration, h_rx.recv()).await {
Ok(Some(message)) => message,
Ok(None) => {
return Err(NanoServiceError::new(
"Registration message not returned".to_string(),
NanoServiceErrorStatus::Unknown,
))
}
Err(e) => {
return Err(NanoServiceError::new(
format!("Registration message not returned with error: {}", e),
NanoServiceErrorStatus::Unknown,
))
}
};
let client_id: usize = match message {
TransactionMessage::Registered(id) => id,
_ => {
return Err(NanoServiceError::new(
"Registration ID not returned".to_string(),
NanoServiceErrorStatus::Unknown,
))
}
};
let connection_id = create_id(connection_index);
Ok((writer_tx, client_id, h_rx, connection_id))
}