use async_trait::async_trait;
use celers_protocol::Message;
use std::collections::HashMap;
use std::time::Duration;
use crate::{Envelope, Result};
#[derive(Debug, Clone)]
pub struct DlqConfig {
pub queue_name: String,
pub max_retries: Option<u32>,
pub ttl: Option<Duration>,
pub include_metadata: bool,
}
impl DlqConfig {
pub fn new(queue_name: String) -> Self {
Self {
queue_name,
max_retries: Some(3),
ttl: None,
include_metadata: true,
}
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = Some(max_retries);
self
}
pub fn without_retry_limit(mut self) -> Self {
self.max_retries = None;
self
}
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
pub fn with_metadata(mut self, include: bool) -> Self {
self.include_metadata = include;
self
}
}
#[async_trait]
pub trait DeadLetterQueue: Send + Sync {
async fn send_to_dlq(
&mut self,
message: &Message,
original_queue: &str,
reason: &str,
) -> Result<()>;
async fn get_from_dlq(&mut self, dlq_name: &str, limit: usize) -> Result<Vec<Envelope>>;
async fn retry_from_dlq(
&mut self,
dlq_name: &str,
delivery_tag: &str,
target_queue: &str,
) -> Result<()>;
async fn purge_dlq(&mut self, dlq_name: &str) -> Result<usize>;
async fn dlq_stats(&mut self, dlq_name: &str) -> Result<DlqStats>;
}
#[derive(Debug, Clone, Default)]
pub struct DlqStats {
pub message_count: usize,
pub by_reason: HashMap<String, usize>,
pub oldest_message_time: Option<u64>,
pub newest_message_time: Option<u64>,
}
impl DlqStats {
pub fn is_empty(&self) -> bool {
self.message_count == 0
}
pub fn oldest_message_age_secs(&self) -> Option<u64> {
self.oldest_message_time.map(|ts| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.saturating_sub(ts)
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
Active,
Committed,
RolledBack,
}
#[async_trait]
pub trait MessageTransaction: Send + Sync {
async fn begin_transaction(&mut self, isolation: IsolationLevel) -> Result<String>;
async fn publish_transactional(
&mut self,
tx_id: &str,
queue: &str,
message: Message,
) -> Result<()>;
async fn consume_transactional(
&mut self,
tx_id: &str,
queue: &str,
timeout: Duration,
) -> Result<Option<Envelope>>;
async fn commit_transaction(&mut self, tx_id: &str) -> Result<()>;
async fn rollback_transaction(&mut self, tx_id: &str) -> Result<()>;
async fn transaction_state(&self, tx_id: &str) -> Result<TransactionState>;
}