use crate::ack::{AckHandle, AckSubscriber};
use crate::rabbitmq::RabbitMQError;
use crate::Message;
use async_trait::async_trait;
use futures::StreamExt;
use lapin::options::{BasicAckOptions, BasicNackOptions, BasicConsumeOptions, QueueDeclareOptions};
use lapin::types::FieldTable;
use lapin::{Connection, Consumer};
use std::sync::Arc;
use std::time::SystemTime;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct RabbitMQAckHandle {
message_id: String,
topic: String,
timestamp: SystemTime,
delivery_count: u32,
delivery_tag: u64,
handle_id: String,
}
impl RabbitMQAckHandle {
pub fn new(
message_id: String,
topic: String,
timestamp: SystemTime,
delivery_count: u32,
delivery_tag: u64,
) -> Self {
Self {
message_id,
topic,
timestamp,
delivery_count,
delivery_tag,
handle_id: Uuid::new_v4().to_string(),
}
}
pub fn delivery_tag(&self) -> u64 {
self.delivery_tag
}
pub fn handle_id(&self) -> &str {
&self.handle_id
}
}
impl AckHandle for RabbitMQAckHandle {
fn message_id(&self) -> &str {
&self.message_id
}
fn topic(&self) -> &str {
&self.topic
}
fn timestamp(&self) -> SystemTime {
self.timestamp
}
fn delivery_count(&self) -> u32 {
self.delivery_count
}
}
pub struct RabbitMQAckSubscriber {
connection: Connection,
state: tokio::sync::Mutex<SubscriberState>,
#[cfg(feature = "logging")]
logger: Arc<dyn crate::logging::Logger>,
}
#[derive(Debug)]
struct SubscriberState {
topic: Option<String>,
consumer: Option<Consumer>,
channel: Option<lapin::Channel>,
}
impl RabbitMQAckSubscriber {
#[cfg(not(feature = "logging"))]
pub async fn new(uri: &str) -> Result<Self, RabbitMQError> {
let connection = Connection::connect(uri, lapin::ConnectionProperties::default())
.await
.map_err(RabbitMQError::RabbitMQ)?;
Ok(Self {
connection,
state: tokio::sync::Mutex::new(SubscriberState {
topic: None,
consumer: None,
channel: None,
}),
})
}
#[cfg(feature = "logging")]
pub async fn new(uri: &str) -> Result<Self, RabbitMQError> {
let connection = Connection::connect(uri, lapin::ConnectionProperties::default())
.await
.map_err(RabbitMQError::RabbitMQ)?;
let logger = Arc::new(crate::logging::NoOpLogger::new());
Ok(Self {
connection,
state: tokio::sync::Mutex::new(SubscriberState {
topic: None,
consumer: None,
channel: None,
}),
logger,
})
}
#[cfg(feature = "logging")]
pub fn with_logger(mut self, logger: Arc<dyn crate::logging::Logger>) -> Self {
self.logger = logger;
self
}
pub async fn subscribed_topic(&self) -> Option<String> {
let state = self.state.lock().await;
state.topic.clone()
}
pub async fn is_subscribed(&self) -> bool {
let state = self.state.lock().await;
state.topic.is_some()
}
async fn ack_delivery(&self, delivery_tag: u64) -> Result<(), RabbitMQError> {
let state = self.state.lock().await;
if let Some(channel) = &state.channel {
channel
.basic_ack(delivery_tag, BasicAckOptions::default())
.await
.map_err(RabbitMQError::RabbitMQ)?;
}
Ok(())
}
async fn nack_delivery(&self, delivery_tag: u64, requeue: bool) -> Result<(), RabbitMQError> {
let state = self.state.lock().await;
if let Some(channel) = &state.channel {
channel
.basic_nack(
delivery_tag,
BasicNackOptions {
multiple: false,
requeue,
},
)
.await
.map_err(RabbitMQError::RabbitMQ)?;
}
Ok(())
}
}
#[async_trait]
impl AckSubscriber for RabbitMQAckSubscriber {
type Error = Box<dyn std::error::Error + Send + Sync>;
type AckHandle = RabbitMQAckHandle;
async fn subscribe(&self, topic: &str) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!("Subscribing to topic {} with acknowledgment support", topic))
.await;
let channel = self.connection.create_channel().await.map_err(|e| {
Box::new(RabbitMQError::RabbitMQ(e)) as Box<dyn std::error::Error + Send + Sync>
})?;
channel
.queue_declare(topic, QueueDeclareOptions::default(), FieldTable::default())
.await
.map_err(|e| {
Box::new(RabbitMQError::RabbitMQ(e)) as Box<dyn std::error::Error + Send + Sync>
})?;
let consumer = channel
.basic_consume(
topic,
"kincir-ack-consumer",
BasicConsumeOptions {
no_local: false,
no_ack: false, exclusive: false,
nowait: false,
},
FieldTable::default(),
)
.await
.map_err(|e| {
Box::new(RabbitMQError::RabbitMQ(e)) as Box<dyn std::error::Error + Send + Sync>
})?;
let mut state = self.state.lock().await;
state.topic = Some(topic.to_string());
state.consumer = Some(consumer);
state.channel = Some(channel);
#[cfg(feature = "logging")]
self.logger
.info(&format!("Successfully subscribed to topic {}", topic))
.await;
Ok(())
}
async fn receive_with_ack(&mut self) -> Result<(Message, Self::AckHandle), Self::Error> {
#[cfg(feature = "logging")]
self.logger.info("Waiting to receive message with acknowledgment").await;
let mut consumer = {
let mut state = self.state.lock().await;
state.consumer.take()
};
if let Some(ref mut consumer_ref) = consumer {
if let Some(delivery_result) = consumer_ref.next().await {
let delivery = delivery_result.map_err(|e| {
Box::new(RabbitMQError::RabbitMQ(e)) as Box<dyn std::error::Error + Send + Sync>
})?;
let message: Message = serde_json::from_slice(&delivery.data).map_err(|e| {
Box::new(RabbitMQError::Serialization(e))
as Box<dyn std::error::Error + Send + Sync>
})?;
let topic = {
let state = self.state.lock().await;
state.topic.clone().unwrap_or_default()
};
let ack_handle = RabbitMQAckHandle::new(
message.uuid.clone(),
topic,
SystemTime::now(),
1, delivery.delivery_tag,
);
let mut state = self.state.lock().await;
state.consumer = consumer;
#[cfg(feature = "logging")]
self.logger
.info(&format!("Received message with ID: {}", message.uuid))
.await;
Ok((message, ack_handle))
} else {
let mut state = self.state.lock().await;
state.consumer = consumer;
Err(Box::new(RabbitMQError::RabbitMQ(
lapin::Error::InvalidChannelState(lapin::ChannelState::Error),
)) as Box<dyn std::error::Error + Send + Sync>)
}
} else {
Err(Box::new(RabbitMQError::RabbitMQ(
lapin::Error::InvalidChannelState(lapin::ChannelState::Error),
)) as Box<dyn std::error::Error + Send + Sync>)
}
}
async fn ack(&self, handle: Self::AckHandle) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!("Acknowledging message: {}", handle.message_id()))
.await;
self.ack_delivery(handle.delivery_tag())
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
async fn nack(&self, handle: Self::AckHandle, requeue: bool) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Negatively acknowledging message: {} (requeue: {})",
handle.message_id(),
requeue
))
.await;
self.nack_delivery(handle.delivery_tag(), requeue)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
async fn ack_batch(&self, handles: Vec<Self::AckHandle>) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!("Batch acknowledging {} messages", handles.len()))
.await;
if let Some(max_handle) = handles.iter().max_by_key(|h| h.delivery_tag()) {
let state = self.state.lock().await;
if let Some(channel) = &state.channel {
channel
.basic_ack(
max_handle.delivery_tag(),
BasicAckOptions { multiple: true },
)
.await
.map_err(|e| {
Box::new(RabbitMQError::RabbitMQ(e))
as Box<dyn std::error::Error + Send + Sync>
})?;
}
}
Ok(())
}
async fn nack_batch(&self, handles: Vec<Self::AckHandle>, requeue: bool) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Batch negatively acknowledging {} messages (requeue: {})",
handles.len(),
requeue
))
.await;
if let Some(max_handle) = handles.iter().max_by_key(|h| h.delivery_tag()) {
let state = self.state.lock().await;
if let Some(channel) = &state.channel {
channel
.basic_nack(
max_handle.delivery_tag(),
BasicNackOptions {
multiple: true,
requeue,
},
)
.await
.map_err(|e| {
Box::new(RabbitMQError::RabbitMQ(e))
as Box<dyn std::error::Error + Send + Sync>
})?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::SystemTime;
#[test]
fn test_rabbitmq_ack_handle_creation() {
let handle = RabbitMQAckHandle::new(
"msg-123".to_string(),
"test-topic".to_string(),
SystemTime::now(),
1,
12345,
);
assert_eq!(handle.message_id(), "msg-123");
assert_eq!(handle.topic(), "test-topic");
assert_eq!(handle.delivery_count(), 1);
assert!(!handle.is_retry());
assert_eq!(handle.delivery_tag(), 12345);
}
#[test]
fn test_rabbitmq_ack_handle_retry() {
let handle = RabbitMQAckHandle::new(
"msg-456".to_string(),
"test-topic".to_string(),
SystemTime::now(),
3,
67890,
);
assert_eq!(handle.delivery_count(), 3);
assert!(handle.is_retry());
}
}