use std::fmt::{Debug, Formatter};
use async_trait::async_trait;
use crate::client::Client;
use crate::error::ClientError;
use crate::model::common::SendReceipt;
use crate::model::message::MessageView;
use crate::pb::{EndTransactionRequest, Resource, TransactionSource};
use crate::session::RPCClient;
#[async_trait]
pub trait Transaction {
async fn commit(self) -> Result<(), ClientError>;
async fn rollback(self) -> Result<(), ClientError>;
fn message_id(&self) -> &str;
fn transaction_id(&self) -> &str;
}
pub(crate) struct TransactionImpl {
rpc_client: Box<dyn RPCClient + Send + Sync>,
topic: Resource,
send_receipt: SendReceipt,
}
impl Debug for TransactionImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TransactionImpl")
.field("transaction_id", &self.send_receipt.transaction_id())
.field("message_id", &self.send_receipt.message_id())
.finish()
}
}
impl TransactionImpl {
pub(crate) fn new(
rpc_client: Box<dyn RPCClient + Send + Sync>,
topic: Resource,
send_receipt: SendReceipt,
) -> TransactionImpl {
TransactionImpl {
rpc_client,
topic,
send_receipt,
}
}
async fn end_transaction(
mut self,
resolution: TransactionResolution,
) -> Result<(), ClientError> {
let response = self
.rpc_client
.end_transaction(EndTransactionRequest {
topic: Some(self.topic),
message_id: self.send_receipt.message_id().to_string(),
transaction_id: self.send_receipt.transaction_id().to_string(),
resolution: resolution as i32,
source: TransactionSource::SourceClient as i32,
trace_context: "".to_string(),
})
.await?;
Client::handle_response_status(response.status, "end transaction")
}
}
#[async_trait]
impl Transaction for TransactionImpl {
async fn commit(mut self) -> Result<(), ClientError> {
return self.end_transaction(TransactionResolution::COMMIT).await;
}
async fn rollback(mut self) -> Result<(), ClientError> {
return self.end_transaction(TransactionResolution::ROLLBACK).await;
}
fn message_id(&self) -> &str {
self.send_receipt.message_id()
}
fn transaction_id(&self) -> &str {
self.send_receipt.transaction_id()
}
}
#[repr(i32)]
pub enum TransactionResolution {
COMMIT = 1,
ROLLBACK = 2,
UNKNOWN = 0,
}
pub type TransactionChecker = dyn Fn(String, MessageView) -> TransactionResolution + Send + Sync;
#[cfg(test)]
mod tests {
use crate::error::ClientError;
use crate::model::common::SendReceipt;
use crate::model::transaction::{Transaction, TransactionImpl};
use crate::pb::{Code, EndTransactionResponse, Resource, SendResultEntry, Status};
use crate::session;
#[tokio::test]
async fn transaction_commit() -> Result<(), ClientError> {
let mut mock = session::MockRPCClient::new();
mock.expect_end_transaction().return_once(|_| {
Box::pin(futures::future::ready(Ok(EndTransactionResponse {
status: Some(Status {
code: Code::Ok as i32,
message: "".to_string(),
}),
})))
});
let transaction = TransactionImpl::new(
Box::new(mock),
Resource {
resource_namespace: "".to_string(),
name: "".to_string(),
},
SendReceipt::from_pb_send_result(&SendResultEntry {
status: None,
message_id: "".to_string(),
transaction_id: "".to_string(),
offset: 0,
}),
);
transaction.commit().await
}
#[tokio::test]
async fn transaction_rollback() -> Result<(), ClientError> {
let mut mock = session::MockRPCClient::new();
mock.expect_end_transaction().return_once(|_| {
Box::pin(futures::future::ready(Ok(EndTransactionResponse {
status: Some(Status {
code: Code::Ok as i32,
message: "".to_string(),
}),
})))
});
let transaction = TransactionImpl::new(
Box::new(mock),
Resource {
resource_namespace: "".to_string(),
name: "".to_string(),
},
SendReceipt::from_pb_send_result(&SendResultEntry {
status: None,
message_id: "".to_string(),
transaction_id: "".to_string(),
offset: 0,
}),
);
transaction.rollback().await
}
}