#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use std::{fmt::Debug, marker::PhantomData, str::FromStr};
use lapin::options::{BasicAckOptions, BasicNackOptions};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
use uuid::Uuid;
mod chan;
mod error;
pub use chan::*;
pub use error::*;
pub type Result<T> = std::result::Result<T, Error>;
pub struct Connection {
inner: lapin::Connection,
}
impl Connection {
pub async fn connect(mq_url: &str) -> Result<Self> {
let connection = lapin::Connection::connect(mq_url, Default::default()).await?;
debug!("Connected to RabbitMQ instance");
Ok(Self { inner: connection })
}
}
#[derive(Debug)]
pub struct Delivery<B> {
inner: lapin::message::Delivery,
_marker: PhantomData<B>,
}
impl<'p, 'r, B> Delivery<B>
where
B: Bus,
B::PublishPayload: Deserialize<'p> + Serialize,
{
pub fn get_payload(&'p self) -> Result<B::PublishPayload> {
B::deserialize_payload(&self.inner.data)
}
pub fn get_uuid(&self) -> Option<Result<Uuid>> {
delivery_uuid(&self.inner, 0)
}
pub fn get_reply_uuid(&self) -> Option<Result<Uuid>> {
delivery_uuid(&self.inner, 1)
}
pub fn routing_key(&self) -> &str {
self.inner.routing_key.as_str()
}
pub async fn ack(&self, multiple: bool) -> Result<()> {
self.inner.ack(BasicAckOptions { multiple }).await?;
if let Some(Ok(uuid)) = self.get_uuid() {
trace!("Acked message with correlation UUID {uuid}");
}
Ok(())
}
pub fn redelivered(&self) -> bool {
self.inner.redelivered
}
pub async fn nack(&self, multiple: bool, requeue: bool) -> Result<()> {
self.inner
.nack(BasicNackOptions { multiple, requeue })
.await?;
if let Some(Ok(uuid)) = self.get_uuid() {
trace!("Nacked message with correlation UUID {uuid}");
}
Ok(())
}
}
impl<B> From<lapin::message::Delivery> for Delivery<B> {
fn from(delivery: lapin::message::Delivery) -> Self {
Self {
inner: delivery,
_marker: PhantomData,
}
}
}
fn delivery_uuid(delivery: &lapin::message::Delivery, index: usize) -> Option<Result<Uuid>> {
let Some(correlation_id) = delivery.properties.correlation_id() else {
return None;
};
let mut parts = correlation_id.as_str().split(':');
parts
.nth(index)
.map(|uuid| Uuid::from_str(uuid).map_err(Into::into))
}
fn fmt_correlation_id(correlation_uuid: Uuid, reply_uuid: Option<Uuid>) -> String {
format!(
"{correlation_uuid}:{}",
reply_uuid.map(|r| r.to_string()).unwrap_or_default()
)
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[doc(hidden)]
pub enum Never {}