use super::bridge::SendToTransactionClientActor;
use super::interface::{Any, Transaction};
use nanoservices_utils::errors::NanoServiceError;
use surrealcs_kernel::messages::server::interface::{ServerMessage, ServerTransactionMessage};
impl Transaction<Any> {
#[allow(dead_code)]
pub async fn begin<T: SendToTransactionClientActor>(
&mut self,
operation: ServerTransactionMessage,
) -> Result<ServerTransactionMessage, NanoServiceError> {
let message = ServerMessage::BeginTransaction(operation);
let transaction = T::send_to_transaction_client_actor(self, message).await?;
Ok(transaction)
}
#[allow(dead_code)]
pub async fn send<T: SendToTransactionClientActor>(
&mut self,
message: ServerTransactionMessage,
) -> Result<ServerTransactionMessage, NanoServiceError> {
let message = ServerMessage::SendOperation(message);
T::send_to_transaction_client_actor(self, message).await
}
#[allow(dead_code)]
pub async fn commit<T: SendToTransactionClientActor>(
&mut self,
) -> Result<ServerTransactionMessage, NanoServiceError> {
let message = ServerMessage::CommitTransaction;
let outcome = T::send_to_transaction_client_actor(self, message).await?;
Ok(outcome)
}
#[allow(dead_code)]
pub async fn rollback<T: SendToTransactionClientActor>(
&mut self,
) -> Result<(), NanoServiceError> {
let message = ServerMessage::RollbackTransaction;
let rollback_result = T::send_to_transaction_client_actor(self, message).await;
match rollback_result {
Ok(_) => {}
Err(e) => {
if e.message == *"error sending transaction to key value store: no ID provided" {
} else {
return Err(e);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::generate_mock_handle;
use std::future::Future;
use std::marker::PhantomData;
use surrealcs_kernel::messages::server::interface::{ServerMessage, ServerTransactionMessage};
use surrealcs_kernel::messages::server::kv_operations::{MessageGet, MessagePut, ResponseGet};
use tokio::sync::mpsc;
#[tokio::test]
async fn test_begin() {
generate_mock_handle! {
struct MockHandle;
match ServerMessage::BeginTransaction(message) => {
match message {
ServerTransactionMessage::Get(message) => {
assert_eq!(message.key, b"keys".to_vec());
}
_ => {
panic!("message not as expected: {:?}", message);
}
}
}
return Ok(ServerTransactionMessage::ResponseGet(
ResponseGet { value: Some(b"value".to_vec()) }
));
}
let (tx, _rx) = mpsc::channel(32);
let (_h_tx, h_rx) = mpsc::channel(32);
let mut transaction = Transaction::<Any> {
client_id: 1,
server_id: None,
receiver: h_rx,
sender: tx,
connection_id: "connection_id".to_string(),
transaction_id: "transaction_id".to_string(),
state: PhantomData,
};
let message = ServerTransactionMessage::Get(MessageGet {
key: b"keys".to_vec(),
version: None,
});
let outcome = transaction.begin::<MockHandle>(message).await.unwrap();
let response = match outcome {
ServerTransactionMessage::ResponseGet(response) => response,
_ => panic!("wrong message type"),
};
assert_eq!(response.value, Some(b"value".to_vec()));
}
#[tokio::test]
async fn test_send() {
generate_mock_handle! {
struct MockHandle;
match ServerMessage::SendOperation(message) => {
match message {
ServerTransactionMessage::Put(message) => {
assert_eq!(message.key, b"keys".to_vec());
assert_eq!(message.value, b"value".to_vec());
}
_ => {
panic!("message not as expected: {:?}", message);
}
}
}
return Ok(ServerTransactionMessage::EmptyResponse);
}
let (tx, _rx) = mpsc::channel(32);
let (_h_tx, h_rx) = mpsc::channel(32);
let mut transaction = Transaction::<Any> {
client_id: 1,
server_id: None,
receiver: h_rx,
sender: tx,
connection_id: "connection_id".to_string(),
transaction_id: "transaction_id".to_string(),
state: PhantomData,
};
let message = ServerTransactionMessage::Put(MessagePut {
key: b"keys".to_vec(),
value: b"value".to_vec(),
version: None,
});
let outcome = transaction.send::<MockHandle>(message).await.unwrap();
match outcome {
ServerTransactionMessage::EmptyResponse => (),
_ => panic!("wrong message type"),
};
}
#[tokio::test]
async fn test_commit() {
generate_mock_handle! {
struct MockHandle;
match ServerMessage::CommitTransaction => {
}
return Ok(ServerTransactionMessage::EmptyResponse);
}
let (tx, _rx) = mpsc::channel(32);
let (_h_tx, h_rx) = mpsc::channel(32);
let mut transaction = Transaction::<Any> {
client_id: 1,
server_id: None,
receiver: h_rx,
sender: tx,
connection_id: "connection_id".to_string(),
transaction_id: "transaction_id".to_string(),
state: PhantomData,
};
transaction.commit::<MockHandle>().await.unwrap();
}
#[tokio::test]
async fn test_rollback() {
generate_mock_handle! {
struct MockHandle;
match ServerMessage::RollbackTransaction => {
}
return Ok(ServerTransactionMessage::ResponseRolledBack(
true
));
}
let (tx, _rx) = mpsc::channel(32);
let (_h_tx, h_rx) = mpsc::channel(32);
let mut transaction = Transaction::<Any> {
client_id: 1,
server_id: None,
receiver: h_rx,
sender: tx,
connection_id: "connection_id".to_string(),
transaction_id: "transaction_id".to_string(),
state: PhantomData,
};
transaction.rollback::<MockHandle>().await.unwrap();
}
}