#![forbid(unsafe_code)]
use mobc::async_trait;
use mobc::Manager;
use crate::prelude::lapin::options::ConfirmSelectOptions;
use crate::prelude::lapin::types::{LongString, ShortString};
use crate::prelude::{AMQPValue, Channel, ConnectionProperties, FieldTable};
use crate::session::AmqpSession;
pub mod prelude;
mod ops;
mod session;
pub type AmqpResult<T> = lapin::Result<T>;
#[derive(Clone)]
pub struct AmqpManager {
conn_pool: mobc::Pool<AmqpConnectionManager>,
}
impl AmqpManager {
pub fn new(conn_pool: mobc::Pool<AmqpConnectionManager>) -> AmqpResult<Self> {
Ok(Self { conn_pool })
}
pub async fn get_session(&self) -> AmqpResult<AmqpSession> {
Ok(AmqpSession::new(self.get_channel().await?))
}
pub async fn get_session_with_confirm_select(&self) -> AmqpResult<AmqpSession> {
let channel = self.get_channel().await?;
channel.confirm_select(ConfirmSelectOptions::default()).await?;
Ok(AmqpSession::new(channel))
}
async fn get_channel(&self) -> AmqpResult<Channel> {
let conn = self
.conn_pool
.get()
.await
.map_err(|_mobc_err| lapin::Error::InvalidConnectionState(lapin::ConnectionState::Error))?;
conn.create_channel().await
}
pub fn dead_letter_args(args: FieldTable, dead_letter_exchange_name: &str) -> FieldTable {
let mut args = args;
args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(dead_letter_exchange_name)),
);
args
}
}
#[derive(Clone, Debug)]
pub struct AmqpConnectionManager {
addr: String,
connection_properties: ConnectionProperties,
}
impl AmqpConnectionManager {
pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
Self {
addr,
connection_properties,
}
}
}
#[async_trait]
impl Manager for AmqpConnectionManager {
type Connection = lapin::Connection;
type Error = lapin::Error;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
lapin::Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await
}
async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
match conn.status().state() {
lapin::ConnectionState::Connected => Ok(conn),
other_state => Err(lapin::Error::InvalidConnectionState(other_state)),
}
}
}