use async_trait::async_trait;
use celers_protocol::Message;
use std::collections::HashMap;
use std::time::Duration;
use crate::{QueueMode, Result};
#[derive(Debug, Clone)]
pub struct Envelope {
pub message: Message,
pub delivery_tag: String,
pub redelivered: bool,
}
impl Envelope {
pub fn new(message: Message, delivery_tag: String) -> Self {
Self {
message,
delivery_tag,
redelivered: false,
}
}
pub fn is_redelivered(&self) -> bool {
self.redelivered
}
pub fn task_id(&self) -> uuid::Uuid {
self.message.task_id()
}
pub fn task_name(&self) -> &str {
self.message.task_name()
}
}
impl std::fmt::Display for Envelope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Envelope[tag={}] task={} id={}{}",
self.delivery_tag,
self.task_name(),
&self.task_id().to_string()[..8],
if self.redelivered {
" (redelivered)"
} else {
""
}
)
}
}
#[async_trait]
pub trait Transport: Send + Sync {
async fn connect(&mut self) -> Result<()>;
async fn disconnect(&mut self) -> Result<()>;
fn is_connected(&self) -> bool;
fn name(&self) -> &str;
}
#[async_trait]
pub trait Producer: Transport {
async fn publish(&mut self, queue: &str, message: Message) -> Result<()>;
async fn publish_with_routing(
&mut self,
exchange: &str,
routing_key: &str,
message: Message,
) -> Result<()>;
}
#[async_trait]
pub trait Consumer: Transport {
async fn consume(&mut self, queue: &str, timeout: Duration) -> Result<Option<Envelope>>;
async fn ack(&mut self, delivery_tag: &str) -> Result<()>;
async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()>;
async fn queue_size(&mut self, queue: &str) -> Result<usize>;
}
#[async_trait]
pub trait Broker: Producer + Consumer + Transport {
async fn purge(&mut self, queue: &str) -> Result<usize>;
async fn create_queue(&mut self, queue: &str, mode: QueueMode) -> Result<()>;
async fn delete_queue(&mut self, queue: &str) -> Result<()>;
async fn list_queues(&mut self) -> Result<Vec<String>>;
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub name: String,
pub mode: QueueMode,
pub durable: bool,
pub auto_delete: bool,
pub max_message_size: Option<usize>,
pub message_ttl: Option<Duration>,
}
impl QueueConfig {
pub fn new(name: String) -> Self {
Self {
name,
mode: QueueMode::Fifo,
durable: true,
auto_delete: false,
max_message_size: None,
message_ttl: None,
}
}
pub fn with_mode(mut self, mode: QueueMode) -> Self {
self.mode = mode;
self
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.message_ttl = Some(ttl);
self
}
pub fn with_durable(mut self, durable: bool) -> Self {
self.durable = durable;
self
}
pub fn with_auto_delete(mut self, auto_delete: bool) -> Self {
self.auto_delete = auto_delete;
self
}
pub fn with_max_message_size(mut self, size: usize) -> Self {
self.max_message_size = Some(size);
self
}
}
#[derive(Debug, Clone)]
pub struct BatchPublishResult {
pub succeeded: usize,
pub failed: usize,
pub errors: HashMap<usize, String>,
}
impl BatchPublishResult {
pub fn success(count: usize) -> Self {
Self {
succeeded: count,
failed: 0,
errors: HashMap::new(),
}
}
pub fn is_complete_success(&self) -> bool {
self.failed == 0
}
pub fn total(&self) -> usize {
self.succeeded + self.failed
}
}
#[async_trait]
pub trait BatchProducer: Producer {
async fn publish_batch(
&mut self,
queue: &str,
messages: Vec<Message>,
) -> Result<BatchPublishResult>;
async fn publish_batch_with_routing(
&mut self,
exchange: &str,
routing_key: &str,
messages: Vec<Message>,
) -> Result<BatchPublishResult>;
}
#[async_trait]
pub trait BatchConsumer: Consumer {
async fn consume_batch(
&mut self,
queue: &str,
max_messages: usize,
timeout: Duration,
) -> Result<Vec<Envelope>>;
async fn ack_batch(&mut self, delivery_tags: &[String]) -> Result<()>;
async fn reject_batch(&mut self, delivery_tags: &[String], requeue: bool) -> Result<()>;
}