use crate::{
api::{error::Error, Result},
frame::{Frame, TxCommit, TxCommitOk, TxRollback, TxRollbackOk, TxSelect, TxSelectOk},
};
use super::Channel;
impl Channel {
pub async fn tx_select(&self) -> Result<()> {
let select = TxSelect;
let responder_rx = self.register_responder(TxSelectOk::header()).await?;
let _method = synchronous_request!(
self.shared.outgoing_tx,
(self.shared.channel_id, select.into_frame()),
responder_rx,
Frame::TxSelectOk,
Error::ChannelUseError
)?;
Ok(())
}
pub async fn tx_commit(&self) -> Result<()> {
let select = TxCommit;
let responder_rx = self.register_responder(TxCommitOk::header()).await?;
let _method = synchronous_request!(
self.shared.outgoing_tx,
(self.shared.channel_id, select.into_frame()),
responder_rx,
Frame::TxCommitOk,
Error::ChannelUseError
)?;
Ok(())
}
pub async fn tx_rollback(&self) -> Result<()> {
let select = TxRollback;
let responder_rx = self.register_responder(TxRollbackOk::header()).await?;
let _method = synchronous_request!(
self.shared.outgoing_tx,
(self.shared.channel_id, select.into_frame()),
responder_rx,
Frame::TxRollbackOk,
Error::ChannelUseError
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::BasicPublishArguments,
connection::{Connection, OpenConnectionArguments},
BasicProperties,
};
#[tokio::test]
async fn test_tx_apis() {
let args = OpenConnectionArguments::new("localhost", 5672, "user", "bitnami");
let connection = Connection::open(&args).await.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
channel.tx_select().await.unwrap();
let args = BasicPublishArguments::new("amq.topic", "amqprs.test.transaction");
let basic_properties = BasicProperties::default().with_persistence(true).finish();
let content = String::from("AMQPRS test transactions").into_bytes();
channel
.basic_publish(basic_properties, content, args)
.await
.unwrap();
channel.tx_commit().await.unwrap();
channel.tx_rollback().await.unwrap();
channel.close().await.unwrap();
connection.close().await.unwrap();
}
}