use crate::client::{QueueProvider, SessionProvider};
use crate::error::QueueError;
use crate::message::{
Message, MessageId, QueueName, ReceiptHandle, ReceivedMessage, SessionId, Timestamp,
};
use crate::provider::{ProviderType, SessionSupport};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Duration;
use futures::StreamExt;
use lapin::{
options::{
BasicAckOptions, BasicConsumeOptions, BasicGetOptions, BasicNackOptions,
BasicPublishOptions, BasicQosOptions, QueueDeclareOptions,
},
types::{AMQPValue, FieldTable, LongString, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tracing::{debug, instrument, warn};
#[cfg(test)]
#[path = "rabbitmq_tests.rs"]
mod tests;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RabbitMqConfig {
pub url: String,
pub virtual_host: String,
pub prefetch_count: u16,
pub session_lock_duration: Duration,
pub message_ttl: Option<Duration>,
pub enable_dead_letter: bool,
pub dead_letter_exchange: Option<String>,
}
impl Default for RabbitMqConfig {
fn default() -> Self {
Self {
url: "amqp://guest:guest@localhost:5672".to_string(),
virtual_host: "/".to_string(),
prefetch_count: 10,
session_lock_duration: Duration::minutes(5),
message_ttl: None,
enable_dead_letter: true,
dead_letter_exchange: Some("dlx".to_string()),
}
}
}
#[derive(Debug)]
pub struct RabbitMqError {
message: String,
}
impl RabbitMqError {
fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
pub fn to_queue_error(&self) -> QueueError {
QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "AMQP_ERROR".to_string(),
message: self.message.clone(),
}
}
}
impl std::fmt::Display for RabbitMqError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RabbitMQ error: {}", self.message)
}
}
impl std::error::Error for RabbitMqError {}
struct InFlightEntry {
channel: Channel,
delivery_tag: u64,
lock_expires_at: Timestamp,
}
fn redact_url(url: &str) -> String {
match url::Url::parse(url) {
Ok(mut parsed) => {
let has_credentials = !parsed.username().is_empty() || parsed.password().is_some();
if has_credentials {
let _ = parsed.set_username("***");
let _ = parsed.set_password(Some("***"));
}
parsed.to_string()
}
Err(_) => "<invalid-url>".to_string(),
}
}
fn session_queue_name(queue: &QueueName, session_id: &SessionId) -> String {
let safe = session_id.as_str().replace(['/', ' ', '\\'], "_");
format!("{}.session.{}", queue.as_str(), safe)
}
pub struct RabbitMqProvider {
connection: Arc<Connection>,
config: RabbitMqConfig,
in_flight: Arc<Mutex<HashMap<String, InFlightEntry>>>,
publish_channel: Arc<Mutex<Option<Channel>>>,
receive_channel: Arc<Mutex<Option<Channel>>>,
}
impl RabbitMqProvider {
pub async fn new(config: RabbitMqConfig) -> Result<Self, RabbitMqError> {
let conn = Connection::connect(&config.url, ConnectionProperties::default())
.await
.map_err(|e| {
RabbitMqError::new(format!(
"failed to connect to RabbitMQ at '{}': {}",
redact_url(&config.url),
e
))
})?;
debug!(url = %redact_url(&config.url), "Connected to RabbitMQ");
Ok(Self {
connection: Arc::new(conn),
config,
in_flight: Arc::new(Mutex::new(HashMap::new())),
publish_channel: Arc::new(Mutex::new(None)),
receive_channel: Arc::new(Mutex::new(None)),
})
}
async fn open_channel(&self) -> Result<Channel, QueueError> {
let channel =
self.connection
.create_channel()
.await
.map_err(|e| QueueError::ConnectionFailed {
message: format!("failed to create AMQP channel: {}", e),
})?;
if self.config.prefetch_count > 0 {
channel
.basic_qos(self.config.prefetch_count, BasicQosOptions::default())
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "QOS_FAILED".to_string(),
message: format!("failed to set QoS prefetch: {}", e),
})?;
}
Ok(channel)
}
async fn get_publish_channel(&self) -> Result<Channel, QueueError> {
let mut guard = self.publish_channel.lock().await;
if let Some(ref ch) = *guard {
if ch.status().connected() {
return Ok(ch.clone());
}
}
let ch = self.open_channel().await?;
*guard = Some(ch.clone());
Ok(ch)
}
async fn get_receive_channel(&self) -> Result<Channel, QueueError> {
let mut guard = self.receive_channel.lock().await;
if let Some(ref ch) = *guard {
if ch.status().connected() {
return Ok(ch.clone());
}
}
let ch = self.open_channel().await?;
*guard = Some(ch.clone());
Ok(ch)
}
async fn declare_queue(&self, channel: &Channel, queue: &QueueName) -> Result<(), QueueError> {
let mut args = FieldTable::default();
if self.config.enable_dead_letter {
if let Some(ref dlx) = self.config.dead_letter_exchange {
args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(dlx.as_bytes())),
);
}
}
if let Some(ttl) = self.config.message_ttl {
let ttl_ms = ttl.num_milliseconds();
if ttl_ms > 0 {
args.insert(
ShortString::from("x-message-ttl"),
AMQPValue::LongLongInt(ttl_ms),
);
}
}
let opts = QueueDeclareOptions {
durable: true,
..Default::default()
};
channel
.queue_declare(queue.as_str().into(), opts, args)
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "QUEUE_DECLARE_FAILED".to_string(),
message: format!("failed to declare queue '{}': {}", queue.as_str(), e),
})?;
Ok(())
}
async fn declare_session_queue(
&self,
channel: &Channel,
queue: &QueueName,
session_id: &SessionId,
) -> Result<String, QueueError> {
let name = session_queue_name(queue, session_id);
let opts = QueueDeclareOptions {
durable: true,
..Default::default()
};
channel
.queue_declare(name.as_str().into(), opts, FieldTable::default())
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "SESSION_QUEUE_DECLARE_FAILED".to_string(),
message: format!("failed to declare session queue '{}': {}", name, e),
})?;
Ok(name)
}
fn build_properties(message: &Message) -> BasicProperties {
let mut props = BasicProperties::default().with_delivery_mode(2);
if let Some(ref corr_id) = message.correlation_id {
props = props.with_correlation_id(ShortString::from(corr_id.as_str()));
}
if let Some(ttl) = message.time_to_live {
let ttl_ms = ttl.num_milliseconds();
if ttl_ms > 0 {
props = props.with_expiration(ShortString::from(ttl_ms.to_string().as_str()));
}
}
let mut headers = FieldTable::default();
for (k, v) in &message.attributes {
let header_key = format!("x-attr-{}", k);
headers.insert(
ShortString::from(header_key.as_str()),
AMQPValue::LongString(LongString::from(v.as_bytes())),
);
}
if let Some(ref sid) = message.session_id {
headers.insert(
ShortString::from("x-session-id"),
AMQPValue::LongString(LongString::from(sid.as_str().as_bytes())),
);
}
props.with_headers(headers)
}
fn extract_attributes(headers: &Option<FieldTable>) -> HashMap<String, String> {
let mut attrs = HashMap::new();
if let Some(ht) = headers {
for (k, v) in ht.inner() {
let key = k.as_str();
if let Some(attr_key) = key.strip_prefix("x-attr-") {
if let AMQPValue::LongString(s) = v {
attrs.insert(
attr_key.to_string(),
String::from_utf8_lossy(s.as_bytes()).to_string(),
);
}
}
}
}
attrs
}
fn extract_session_id(headers: &Option<FieldTable>) -> Option<SessionId> {
if let Some(ht) = headers {
if let Some(AMQPValue::LongString(s)) = ht.inner().get("x-session-id") {
let id = String::from_utf8_lossy(s.as_bytes()).to_string();
return SessionId::new(id).ok();
}
}
None
}
fn extract_delivery_count(headers: &Option<FieldTable>, redelivered: bool) -> u32 {
if let Some(ht) = headers {
if let Some(AMQPValue::LongLongInt(n)) = ht.inner().get("x-delivery-count") {
return (*n as u32).saturating_add(1);
}
}
if redelivered {
2
} else {
1
}
}
async fn register_delivery(
&self,
channel: &Channel,
delivery_tag: u64,
data: &[u8],
headers: Option<FieldTable>,
correlation_id: Option<String>,
redelivered: bool,
) -> ReceivedMessage {
let session_id = Self::extract_session_id(&headers);
let attributes = Self::extract_attributes(&headers);
let delivery_count = Self::extract_delivery_count(&headers, redelivered);
let now = Timestamp::now();
let lock_expires_at =
Timestamp::from_datetime(now.as_datetime() + self.config.session_lock_duration);
let receipt_id = uuid::Uuid::new_v4().to_string();
let message_id = MessageId::new();
let body = Bytes::copy_from_slice(data);
self.in_flight.lock().await.insert(
receipt_id.clone(),
InFlightEntry {
channel: channel.clone(),
delivery_tag,
lock_expires_at,
},
);
ReceivedMessage {
message_id,
body,
attributes,
session_id,
correlation_id,
receipt_handle: ReceiptHandle::new(receipt_id, lock_expires_at, ProviderType::RabbitMq),
delivery_count,
first_delivered_at: now,
delivered_at: now,
}
}
async fn settle_message(
&self,
receipt: &ReceiptHandle,
requeue: Option<bool>,
) -> Result<(), QueueError> {
let mut in_flight = self.in_flight.lock().await;
match in_flight.get(receipt.handle()) {
None => {
return Err(QueueError::MessageNotFound {
receipt: receipt.handle().to_string(),
});
}
Some(entry) if Timestamp::now() > entry.lock_expires_at => {
in_flight.remove(receipt.handle());
return Err(QueueError::MessageNotFound {
receipt: format!("{}(expired)", receipt.handle()),
});
}
Some(_) => {}
}
let entry = in_flight
.remove(receipt.handle())
.expect("entry present after pre-check");
match requeue {
None => {
entry
.channel
.basic_ack(entry.delivery_tag, BasicAckOptions::default())
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "BASIC_ACK_FAILED".to_string(),
message: format!("basic_ack failed: {}", e),
})?;
}
Some(requeue_flag) => {
entry
.channel
.basic_nack(
entry.delivery_tag,
BasicNackOptions {
requeue: requeue_flag,
..Default::default()
},
)
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "BASIC_NACK_FAILED".to_string(),
message: format!("basic_nack failed: {}", e),
})?;
}
}
Ok(())
}
}
#[async_trait]
impl QueueProvider for RabbitMqProvider {
#[instrument(skip(self, message), fields(queue = %queue))]
async fn send_message(
&self,
queue: &QueueName,
message: &Message,
) -> Result<MessageId, QueueError> {
let size = message.body.len();
let max_size = self.provider_type().max_message_size();
if size > max_size {
return Err(QueueError::MessageTooLarge { size, max_size });
}
let channel = self.get_publish_channel().await?;
let routing_key = if let Some(ref sid) = message.session_id {
self.declare_session_queue(&channel, queue, sid).await?
} else {
self.declare_queue(&channel, queue).await?;
queue.as_str().to_string()
};
let props = Self::build_properties(message);
channel
.basic_publish(
"".into(),
routing_key.as_str().into(),
BasicPublishOptions::default(),
&message.body,
props,
)
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "PUBLISH_FAILED".to_string(),
message: format!("failed to publish message to '{}': {}", routing_key, e),
})?
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "PUBLISH_CONFIRM_FAILED".to_string(),
message: format!("publish confirmation failed: {}", e),
})?;
let message_id = MessageId::new();
debug!(%message_id, %queue, "Published message to RabbitMQ");
Ok(message_id)
}
#[instrument(skip(self, messages), fields(queue = %queue, count = messages.len()))]
async fn send_messages(
&self,
queue: &QueueName,
messages: &[Message],
) -> Result<Vec<MessageId>, QueueError> {
if messages.len() > self.max_batch_size() as usize {
return Err(QueueError::BatchTooLarge {
size: messages.len(),
max_size: self.max_batch_size() as usize,
});
}
let mut ids = Vec::with_capacity(messages.len());
for message in messages {
ids.push(self.send_message(queue, message).await?);
}
Ok(ids)
}
#[instrument(skip(self), fields(queue = %queue))]
async fn receive_message(
&self,
queue: &QueueName,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError> {
let channel = self.get_receive_channel().await?;
self.declare_queue(&channel, queue).await?;
let start = std::time::Instant::now();
let timeout_std = timeout
.to_std()
.unwrap_or(std::time::Duration::from_secs(30));
loop {
let get = channel
.basic_get(queue.as_str().into(), BasicGetOptions { no_ack: false })
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "BASIC_GET_FAILED".to_string(),
message: format!("basic_get on '{}' failed: {}", queue.as_str(), e),
})?;
if let Some(delivery) = get {
let headers = delivery.delivery.properties.headers().clone();
let redelivered = delivery.delivery.redelivered;
let correlation_id = delivery
.delivery
.properties
.correlation_id()
.as_ref()
.map(|s| s.to_string());
let msg = self
.register_delivery(
&channel,
delivery.delivery.delivery_tag,
&delivery.delivery.data,
headers,
correlation_id,
redelivered,
)
.await;
return Ok(Some(msg));
}
if start.elapsed() >= timeout_std {
return Ok(None);
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
#[instrument(skip(self), fields(queue = %queue, max = max_messages))]
async fn receive_messages(
&self,
queue: &QueueName,
max_messages: u32,
timeout: Duration,
) -> Result<Vec<ReceivedMessage>, QueueError> {
let channel = self.get_receive_channel().await?;
self.declare_queue(&channel, queue).await?;
let mut messages = Vec::new();
let start = std::time::Instant::now();
let timeout_std = timeout
.to_std()
.unwrap_or(std::time::Duration::from_secs(30));
while messages.len() < max_messages as usize {
if start.elapsed() >= timeout_std {
break;
}
let get = channel
.basic_get(queue.as_str().into(), BasicGetOptions { no_ack: false })
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "BASIC_GET_FAILED".to_string(),
message: format!("basic_get on '{}' failed: {}", queue.as_str(), e),
})?;
match get {
Some(delivery) => {
let headers = delivery.delivery.properties.headers().clone();
let redelivered = delivery.delivery.redelivered;
let correlation_id = delivery
.delivery
.properties
.correlation_id()
.as_ref()
.map(|s| s.to_string());
let msg = self
.register_delivery(
&channel,
delivery.delivery.delivery_tag,
&delivery.delivery.data,
headers,
correlation_id,
redelivered,
)
.await;
messages.push(msg);
}
None => {
if start.elapsed() >= timeout_std {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
}
Ok(messages)
}
#[instrument(skip(self, receipt))]
async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
self.settle_message(receipt, None).await
}
#[instrument(skip(self, receipt))]
async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
self.settle_message(receipt, Some(true)).await
}
#[instrument(skip(self, receipt), fields(reason = %reason))]
async fn dead_letter_message(
&self,
receipt: &ReceiptHandle,
reason: &str,
) -> Result<(), QueueError> {
debug!(reason, "Dead-lettering RabbitMQ message");
self.settle_message(receipt, Some(false)).await
}
#[instrument(skip(self), fields(queue = %queue))]
async fn create_session_client(
&self,
queue: &QueueName,
session_id: Option<SessionId>,
) -> Result<Box<dyn SessionProvider>, QueueError> {
let sid = match session_id {
Some(id) => id,
None => {
return Err(QueueError::SessionNotFound {
session_id: "<any>".to_string(),
});
}
};
let channel = self.open_channel().await?;
let session_queue = self.declare_session_queue(&channel, queue, &sid).await?;
let consumer = channel
.basic_consume(
session_queue.as_str().into(),
format!("session-{}", uuid::Uuid::new_v4()).as_str().into(),
BasicConsumeOptions {
exclusive: true,
no_ack: false,
..Default::default()
},
FieldTable::default(),
)
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "CONSUME_FAILED".to_string(),
message: format!(
"failed to start exclusive consumer on '{}': {}",
session_queue, e
),
})?;
let now = Timestamp::now();
let lock_expires_at =
Timestamp::from_datetime(now.as_datetime() + self.config.session_lock_duration);
let (tx, rx) = mpsc::unbounded_channel::<lapin::message::Delivery>();
tokio::spawn(async move {
let mut consumer = consumer;
while let Some(result) = consumer.next().await {
match result {
Ok(delivery) => {
if tx.send(delivery).is_err() {
break;
}
}
Err(e) => {
warn!(error = %e, "RabbitMQ session consumer error");
break;
}
}
}
});
Ok(Box::new(RabbitMqSessionProvider {
channel,
deliveries: Arc::new(Mutex::new(rx)),
session_id: sid,
in_flight: self.in_flight.clone(),
lock_expires_at: Arc::new(std::sync::Mutex::new(lock_expires_at)),
config: self.config.clone(),
}))
}
fn provider_type(&self) -> ProviderType {
ProviderType::RabbitMq
}
fn supports_sessions(&self) -> SessionSupport {
SessionSupport::Emulated
}
fn supports_batching(&self) -> bool {
true
}
fn max_batch_size(&self) -> u32 {
100
}
}
pub struct RabbitMqSessionProvider {
channel: Channel,
deliveries: Arc<Mutex<mpsc::UnboundedReceiver<lapin::message::Delivery>>>,
session_id: SessionId,
in_flight: Arc<Mutex<HashMap<String, InFlightEntry>>>,
lock_expires_at: Arc<std::sync::Mutex<Timestamp>>,
config: RabbitMqConfig,
}
#[async_trait]
impl SessionProvider for RabbitMqSessionProvider {
#[instrument(skip(self), fields(session_id = %self.session_id))]
async fn receive_message(
&self,
timeout: Duration,
) -> Result<Option<ReceivedMessage>, QueueError> {
self.check_lock()?;
let timeout_std = timeout
.to_std()
.unwrap_or(std::time::Duration::from_secs(30));
let mut rx = self.deliveries.lock().await;
match tokio::time::timeout(timeout_std, rx.recv()).await {
Ok(Some(delivery)) => {
let msg = self.register_session_delivery(delivery).await;
Ok(Some(msg))
}
Ok(None) => Ok(None),
Err(_) => Ok(None), }
}
#[instrument(skip(self, receipt))]
async fn complete_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
self.check_lock()?;
self.settle(receipt, None).await
}
#[instrument(skip(self, receipt))]
async fn abandon_message(&self, receipt: &ReceiptHandle) -> Result<(), QueueError> {
self.check_lock()?;
self.settle(receipt, Some(true)).await
}
#[instrument(skip(self, receipt), fields(reason = %reason))]
async fn dead_letter_message(
&self,
receipt: &ReceiptHandle,
reason: &str,
) -> Result<(), QueueError> {
self.check_lock()?;
debug!(reason, "Dead-lettering session message");
self.settle(receipt, Some(false)).await
}
async fn renew_session_lock(&self) -> Result<(), QueueError> {
advance_session_lock(&self.lock_expires_at, self.config.session_lock_duration)?;
debug!(session_id = %self.session_id, "RabbitMQ session lock renewed");
Ok(())
}
async fn close_session(&self) -> Result<(), QueueError> {
if let Err(e) = self.channel.close(200, "session closed".into()).await {
warn!(error = %e, "Failed to cleanly close RabbitMQ session channel");
}
Ok(())
}
fn session_id(&self) -> &SessionId {
&self.session_id
}
fn session_expires_at(&self) -> Timestamp {
*self
.lock_expires_at
.lock()
.unwrap_or_else(|e| e.into_inner())
}
}
fn check_session_lock(
lock_expires_at: &std::sync::Mutex<Timestamp>,
session_id: &SessionId,
) -> Result<(), QueueError> {
let expires = *lock_expires_at
.lock()
.map_err(|_| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "INTERNAL_ERROR".to_string(),
message: "session lock mutex poisoned".to_string(),
})?;
if Timestamp::now() > expires {
return Err(QueueError::SessionLocked {
session_id: session_id.as_str().to_string(),
locked_until: expires,
});
}
Ok(())
}
fn advance_session_lock(
lock_expires_at: &std::sync::Mutex<Timestamp>,
duration: Duration,
) -> Result<Timestamp, QueueError> {
let new_expiry = Timestamp::from_datetime(Timestamp::now().as_datetime() + duration);
*lock_expires_at
.lock()
.map_err(|_| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "INTERNAL_ERROR".to_string(),
message: "session lock mutex poisoned".to_string(),
})? = new_expiry;
Ok(new_expiry)
}
impl RabbitMqSessionProvider {
fn check_lock(&self) -> Result<(), QueueError> {
check_session_lock(&self.lock_expires_at, &self.session_id)
}
async fn register_session_delivery(
&self,
delivery: lapin::message::Delivery,
) -> ReceivedMessage {
let delivery_tag = delivery.delivery_tag;
let redelivered = delivery.redelivered;
let headers = delivery.properties.headers().clone();
let attributes = RabbitMqProvider::extract_attributes(&headers);
let delivery_count = RabbitMqProvider::extract_delivery_count(&headers, redelivered);
let correlation_id = delivery
.properties
.correlation_id()
.as_ref()
.map(|s| s.to_string());
let now = Timestamp::now();
let lock_expires_at =
Timestamp::from_datetime(now.as_datetime() + self.config.session_lock_duration);
let receipt_id = uuid::Uuid::new_v4().to_string();
let message_id = MessageId::new();
let body = Bytes::copy_from_slice(&delivery.data);
self.in_flight.lock().await.insert(
receipt_id.clone(),
InFlightEntry {
channel: self.channel.clone(),
delivery_tag,
lock_expires_at,
},
);
ReceivedMessage {
message_id,
body,
attributes,
session_id: Some(self.session_id.clone()),
correlation_id,
receipt_handle: ReceiptHandle::new(receipt_id, lock_expires_at, ProviderType::RabbitMq),
delivery_count,
first_delivered_at: now,
delivered_at: now,
}
}
async fn settle(
&self,
receipt: &ReceiptHandle,
requeue: Option<bool>,
) -> Result<(), QueueError> {
let mut in_flight = self.in_flight.lock().await;
match in_flight.get(receipt.handle()) {
None => {
return Err(QueueError::MessageNotFound {
receipt: receipt.handle().to_string(),
});
}
Some(entry) if Timestamp::now() > entry.lock_expires_at => {
in_flight.remove(receipt.handle());
return Err(QueueError::MessageNotFound {
receipt: format!("{}(expired)", receipt.handle()),
});
}
Some(_) => {}
}
let entry = in_flight
.remove(receipt.handle())
.expect("entry present after pre-check");
match requeue {
None => {
entry
.channel
.basic_ack(entry.delivery_tag, BasicAckOptions::default())
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "BASIC_ACK_FAILED".to_string(),
message: format!("basic_ack failed: {}", e),
})?;
}
Some(requeue_flag) => {
entry
.channel
.basic_nack(
entry.delivery_tag,
BasicNackOptions {
requeue: requeue_flag,
..Default::default()
},
)
.await
.map_err(|e| QueueError::ProviderError {
provider: "rabbitmq".to_string(),
code: "BASIC_NACK_FAILED".to_string(),
message: format!("basic_nack failed: {}", e),
})?;
}
}
Ok(())
}
}