#![forbid(unsafe_code)]
use lapin::message::Delivery;
use lapin::options::{BasicNackOptions, ConfirmSelectOptions};
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
use lapin::types::{LongString, ShortString};
use lapin::{Channel, Connection, ConnectionProperties, ConnectionState, Error as LapinError};
use mobc::async_trait;
use mobc::Manager;
use crate::prelude::{AMQPValue, FieldTable};
use crate::session::AmqpSession;
pub mod prelude;
mod ops;
mod session;
pub type AmqpResult<T> = lapin::Result<T>;
pub type AmqpConsumerResult<T> = std::result::Result<T, AmqpConsumerError>;
#[derive(Clone, Debug)]
pub enum AmqpConsumerError {
RecoverableError(String),
UnrecoverableError(String),
DuplicatedEventError(String),
}
impl AmqpConsumerError {
pub fn nack_options(error: &Self) -> BasicNackOptions {
match error {
Self::RecoverableError(_) => BasicNackOptions {
multiple: false,
requeue: true,
},
Self::UnrecoverableError(_) => BasicNackOptions {
multiple: false,
requeue: false,
},
Self::DuplicatedEventError(_) => BasicNackOptions {
multiple: false,
requeue: false,
},
}
}
}
#[derive(Clone)]
pub struct AmqpManager {
conn_pool: mobc::Pool<RMQConnectionManager>,
}
impl AmqpManager {
pub fn new(conn_pool: mobc::Pool<RMQConnectionManager>) -> AmqpResult<Self> {
Ok(Self { conn_pool })
}
pub async fn get_session(&self) -> AmqpResult<AmqpSession> {
Ok(AmqpSession::new(self.get_channel().await?))
}
pub async fn get_session_with_confirm_select(&self) -> AmqpResult<AmqpSession> {
let channel = self.get_channel().await?;
channel.confirm_select(ConfirmSelectOptions::default()).await?;
Ok(AmqpSession::new(channel))
}
async fn get_channel(&self) -> AmqpResult<Channel> {
let conn = self.conn_pool.get().await.map_err(|e| {
lapin::Error::ProtocolError(AMQPError::new(
AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
ShortString::from(e.to_string()),
))
})?;
conn.create_channel().await
}
pub fn dead_letter_args(args: FieldTable, dead_letter_exchange_name: &str) -> FieldTable {
let mut args = args;
args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(dead_letter_exchange_name)),
);
args
}
pub fn deserialize_json_delivery<'de, T: serde::de::Deserialize<'de>>(delivery: &'de Delivery) -> AmqpConsumerResult<T> {
match serde_json::from_slice(&delivery.data) {
Ok(x) => Ok(x),
Err(_) => {
let msg = "Failed deserializing delivery data into struct".to_string();
Err(AmqpConsumerError::UnrecoverableError(msg))
}
}
}
}
#[derive(Clone, Debug)]
pub struct RMQConnectionManager {
addr: String,
connection_properties: ConnectionProperties,
}
impl RMQConnectionManager {
pub fn new(addr: String, connection_properties: ConnectionProperties) -> Self {
Self {
addr,
connection_properties,
}
}
}
#[async_trait]
impl Manager for RMQConnectionManager {
type Connection = Connection;
type Error = LapinError;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let c = Connection::connect(self.addr.as_str(), self.connection_properties.clone()).await?;
Ok(c)
}
async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
match conn.status().state() {
ConnectionState::Connected => Ok(conn),
other_state => Err(LapinError::InvalidConnectionState(other_state)),
}
}
}