use super::channel::{BasicAckArguments, Channel};
use crate::frame::{BasicProperties, Deliver};
use async_trait::async_trait;
#[cfg(feature = "traces")]
use tracing::info;
#[async_trait]
pub trait AsyncConsumer {
async fn consume(
&mut self, channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
);
}
pub struct DefaultConsumer {
no_ack: bool,
}
impl DefaultConsumer {
pub fn new(no_ack: bool) -> Self {
Self { no_ack }
}
}
#[async_trait]
impl AsyncConsumer for DefaultConsumer {
async fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
#[cfg(feature = "traces")]
info!(
"consume delivery {} on channel {}, content size: {}",
deliver,
channel,
content.len()
);
if !self.no_ack {
#[cfg(feature = "traces")]
info!("ack to delivery {} on channel {}", deliver, channel);
let args = BasicAckArguments::new(deliver.delivery_tag(), false);
channel.basic_ack(args).await.unwrap();
}
}
}
pub trait BlockingConsumer {
fn consume(
&mut self, channel: &Channel,
deliver: Deliver,
basic_properties: BasicProperties,
content: Vec<u8>,
);
}
pub struct DefaultBlockingConsumer {
no_ack: bool,
}
impl DefaultBlockingConsumer {
pub fn new(no_ack: bool) -> Self {
Self { no_ack }
}
}
impl BlockingConsumer for DefaultBlockingConsumer {
fn consume(
&mut self,
channel: &Channel,
deliver: Deliver,
_basic_properties: BasicProperties,
content: Vec<u8>,
) {
#[cfg(feature = "traces")]
info!(
"consume delivery {} on channel {}, content size: {}",
deliver,
channel,
content.len()
);
if !self.no_ack {
#[cfg(feature = "traces")]
info!("ack to delivery {} on channel {}", deliver, channel);
let args = BasicAckArguments::new(deliver.delivery_tag(), false);
channel.basic_ack_blocking(args).unwrap();
}
}
}