use super::interface::Transaction;
use super::utils::await_message;
use nanoservices_utils::errors::NanoServiceError;
use std::future::Future;
use surrealcs_kernel::messages::client::message::TransactionMessage;
use surrealcs_kernel::messages::server::{
interface::{ServerMessage, ServerTransactionMessage},
wrapper::WrappedServerMessage,
};
use tracing::instrument;
pub type BridgeResponse = Result<ServerTransactionMessage, NanoServiceError>;
pub struct BridgeHandle;
pub trait SendToTransactionClientActor {
fn send_to_transaction_client_actor<T: Send>(
self_ref: &mut Transaction<T>,
message: ServerMessage,
) -> impl Future<Output = BridgeResponse> + Send;
}
impl SendToTransactionClientActor for BridgeHandle {
fn send_to_transaction_client_actor<T: Send>(
self_ref: &mut Transaction<T>,
message: ServerMessage,
) -> impl Future<Output = BridgeResponse> + Send {
internal_send(self_ref, message)
}
}
#[instrument(name = "internal_send", skip(self_ref))]
async fn internal_send<T>(self_ref: &mut Transaction<T>, message: ServerMessage) -> BridgeResponse {
let mut message =
WrappedServerMessage::new(self_ref.client_id, message, self_ref.connection_id.clone())
.transaction_id(self_ref.transaction_id.clone());
message.server_id = self_ref.server_id;
let message = TransactionMessage::TransactionOperation(message);
self_ref.sender.send(message).await.unwrap();
let output = await_message(self_ref).await?;
let (server_id, transaction) = output.extract_transaction_operation()?;
self_ref.server_id = Some(server_id);
Ok(transaction)
}
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
use surrealcs_kernel::messages::server::kv_operations::MessagePut;
use super::super::interface::Any;
use super::*;
use tokio::sync::mpsc;
#[tokio::test]
async fn test_internal_send() {
async fn actor(
mut rx: mpsc::Receiver<TransactionMessage>,
tx: mpsc::Sender<TransactionMessage>,
) {
let message = rx.recv().await.unwrap();
tx.send(message).await.unwrap();
}
let (transaction_tx, transaction_rx) = mpsc::channel::<TransactionMessage>(32);
let (tx, rx) = mpsc::channel::<TransactionMessage>(32);
tokio::spawn(async move {
actor(rx, transaction_tx).await;
});
let mut transaction = Transaction::<Any> {
client_id: 0,
connection_id: "1-1234567890".to_string(),
server_id: Some(1),
transaction_id: "1-1234567890".to_string(),
sender: tx,
receiver: transaction_rx,
state: PhantomData,
};
let message = ServerMessage::SendOperation(ServerTransactionMessage::Put(MessagePut {
key: b"key".to_vec(),
value: b"value".to_vec(),
version: None,
}));
let response = BridgeHandle::send_to_transaction_client_actor(&mut transaction, message)
.await
.unwrap();
let response = match response {
ServerTransactionMessage::Put(message) => message,
_ => panic!("wrong message type"),
};
assert_eq!(response.key, b"key".to_vec());
assert_eq!(response.value, b"value".to_vec());
}
}