#![forbid(unsafe_code)]
use lapin::message::Delivery;
use lapin::options::{BasicNackOptions, ConfirmSelectOptions};
use lapin::protocol::{AMQPError, AMQPErrorKind, AMQPHardError};
use lapin::types::{LongString, ShortString};
use mobc_lapin::*;
use crate::prelude::{AMQPValue, FieldTable};
use crate::session::AmqpSession;
use lapin::Channel;
pub mod prelude;
mod ops;
mod session;
pub type AmqpResult<T> = lapin::Result<T>;
pub type AmqpConsumerResult<T> = std::result::Result<T, AmqpConsumerError>;
#[derive(Clone, Debug)]
pub enum AmqpConsumerError {
RecoverableError(String),
UnrecoverableError(String),
DuplicatedEventError(String),
}
impl AmqpConsumerError {
pub fn nack_options(error: &Self) -> BasicNackOptions {
match error {
Self::RecoverableError(_) => BasicNackOptions {
multiple: false,
requeue: true,
},
Self::UnrecoverableError(_) => BasicNackOptions {
multiple: false,
requeue: false,
},
Self::DuplicatedEventError(_) => BasicNackOptions {
multiple: false,
requeue: false,
},
}
}
}
#[derive(Clone)]
pub struct AmqpManager {
conn_pool: mobc::Pool<mobc_lapin::RMQConnectionManager>,
}
impl AmqpManager {
pub fn new(conn_pool: mobc::Pool<mobc_lapin::RMQConnectionManager>) -> AmqpResult<Self> {
Ok(Self { conn_pool })
}
pub async fn get_session(&self) -> AmqpResult<AmqpSession> {
Ok(AmqpSession::new(self.get_channel().await?))
}
pub async fn get_session_with_confirm_select(&self) -> AmqpResult<AmqpSession> {
let channel = self.get_channel().await?;
channel.confirm_select(ConfirmSelectOptions::default()).await?;
Ok(AmqpSession::new(channel))
}
async fn get_channel(&self) -> AmqpResult<Channel> {
let conn = self.conn_pool.get().await.map_err(|e| {
lapin::Error::ProtocolError(AMQPError::new(
AMQPErrorKind::Hard(AMQPHardError::CONNECTIONFORCED),
ShortString::from(e.to_string()),
))
})?;
conn.create_channel().await
}
pub fn dead_letter_args(args: FieldTable, dead_letter_exchange_name: &str) -> FieldTable {
let mut args = args;
args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(dead_letter_exchange_name)),
);
args
}
pub fn deserialize_json_delivery<'de, T: serde::de::Deserialize<'de>>(delivery: &'de Delivery) -> AmqpConsumerResult<T> {
match serde_json::from_slice(&delivery.data) {
Ok(x) => Ok(x),
Err(_) => {
let msg = "Failed deserializing delivery data into struct".to_string();
Err(AmqpConsumerError::UnrecoverableError(msg))
}
}
}
}