#![forbid(unsafe_code)]
use lapin::options::ConfirmSelectOptions;
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
use lapin::types::{LongString, ShortString};
use lapin::{Channel, Connection, ConnectionProperties, ConnectionState, Error as LapinError};
use mobc::async_trait;
use mobc::Manager;
use crate::prelude::{AMQPValue, FieldTable};
use crate::session::AmqpSession;
pub mod prelude;
mod ops;
mod session;
pub type AmqpResult<T> = lapin::Result<T>;
#[derive(Clone)]
pub struct AmqpManager {
conn_pool: mobc::Pool<AmqpConnectionManager>,
}
impl AmqpManager {
pub fn new(conn_pool: mobc::Pool<AmqpConnectionManager>) -> AmqpResult<Self> {
Ok(Self { conn_pool })
}
pub async fn get_session(&self) -> AmqpResult<AmqpSession> {
Ok(AmqpSession::new(self.get_channel().await?))
}
pub async fn get_session_with_confirm_select(&self) -> AmqpResult<AmqpSession> {
let channel = self.get_channel().await?;
channel.confirm_select(ConfirmSelectOptions::default()).await?;
Ok(AmqpSession::new(channel))
}
async fn get_channel(&self) -> AmqpResult<Channel> {
let conn = self.conn_pool.get().await.map_err(|e| {
lapin::Error::ProtocolError(AMQPError::new(
AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
ShortString::from(e.to_string()),
))
})?;
conn.create_channel().await
}
pub fn dead_letter_args(args: FieldTable, dead_letter_exchange_name: &str) -> FieldTable {
let mut args = args;
args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(dead_letter_exchange_name)),
);
args
}
}
#[derive(Clone, Debug)]
pub struct AmqpConnectionManager {
addr: String,
connection_properties: ConnectionProperties,
}
impl AmqpConnectionManager {
pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
Self {
addr,
connection_properties,
}
}
}
#[async_trait]
impl Manager for AmqpConnectionManager {
type Connection = Connection;
type Error = LapinError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await
}
async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
match conn.status().state() {
ConnectionState::Connected => Ok(conn),
other_state => Err(LapinError::InvalidConnectionState(other_state)),
}
}
}