use std::time::{Duration, SystemTime};
use async_trait::async_trait;
#[derive(Debug, Clone)]
pub struct AckConfig {
pub mode: AckMode,
pub timeout: Duration,
pub max_retries: u32,
pub retry_delay: Duration,
pub dead_letter_topic: Option<String>,
}
impl Default for AckConfig {
fn default() -> Self {
Self {
mode: AckMode::Manual,
timeout: Duration::from_secs(30),
max_retries: 3,
retry_delay: Duration::from_secs(1),
dead_letter_topic: None,
}
}
}
impl AckConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_mode(mut self, mode: AckMode) -> Self {
self.mode = mode;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_retry_delay(mut self, retry_delay: Duration) -> Self {
self.retry_delay = retry_delay;
self
}
pub fn with_dead_letter_topic(mut self, topic: Option<String>) -> Self {
self.dead_letter_topic = topic;
self
}
pub fn auto() -> Self {
Self::new().with_mode(AckMode::Auto)
}
pub fn manual() -> Self {
Self::new().with_mode(AckMode::Manual)
}
pub fn client_auto() -> Self {
Self::new().with_mode(AckMode::ClientAuto)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AckMode {
Auto,
Manual,
ClientAuto,
}
pub trait AckHandle: Send + Sync + Clone {
fn message_id(&self) -> &str;
fn topic(&self) -> &str;
fn timestamp(&self) -> SystemTime;
fn delivery_count(&self) -> u32 {
1
}
fn is_retry(&self) -> bool {
self.delivery_count() > 1
}
}
#[async_trait]
pub trait AckSubscriber {
type Error;
type AckHandle: AckHandle;
async fn subscribe(&self, topic: &str) -> Result<(), Self::Error>;
async fn receive_with_ack(&mut self) -> Result<(crate::Message, Self::AckHandle), Self::Error>;
async fn ack(&self, handle: Self::AckHandle) -> Result<(), Self::Error>;
async fn nack(&self, handle: Self::AckHandle, requeue: bool) -> Result<(), Self::Error>;
async fn ack_batch(&self, handles: Vec<Self::AckHandle>) -> Result<(), Self::Error> {
for handle in handles {
self.ack(handle).await?;
}
Ok(())
}
async fn nack_batch(&self, handles: Vec<Self::AckHandle>, requeue: bool) -> Result<(), Self::Error> {
for handle in handles {
self.nack(handle, requeue).await?;
}
Ok(())
}
}
pub struct CompatSubscriber<S> {
inner: S,
config: AckConfig,
}
impl<S> CompatSubscriber<S> {
pub fn new(subscriber: S, config: AckConfig) -> Self {
Self {
inner: subscriber,
config,
}
}
pub fn config(&self) -> &AckConfig {
&self.config
}
pub fn inner(&self) -> &S {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut S {
&mut self.inner
}
pub fn into_inner(self) -> S {
self.inner
}
}
#[async_trait]
impl<S> crate::Subscriber for CompatSubscriber<S>
where
S: crate::Subscriber + Send + Sync,
{
type Error = S::Error;
async fn subscribe(&self, topic: &str) -> Result<(), Self::Error> {
self.inner.subscribe(topic).await
}
async fn receive(&mut self) -> Result<crate::Message, Self::Error> {
self.inner.receive().await
}
}
#[derive(Debug, Clone, Default)]
pub struct AckStats {
pub acked: u64,
pub nacked: u64,
pub requeued: u64,
pub dead_lettered: u64,
pub timeouts: u64,
pub errors: u64,
}
impl AckStats {
pub fn new() -> Self {
Self::default()
}
pub fn increment_acked(&mut self) {
self.acked += 1;
}
pub fn increment_nacked(&mut self) {
self.nacked += 1;
}
pub fn increment_requeued(&mut self) {
self.requeued += 1;
}
pub fn increment_dead_lettered(&mut self) {
self.dead_lettered += 1;
}
pub fn increment_timeouts(&mut self) {
self.timeouts += 1;
}
pub fn increment_errors(&mut self) {
self.errors += 1;
}
pub fn total_processed(&self) -> u64 {
self.acked + self.nacked
}
pub fn success_rate(&self) -> f64 {
let total = self.total_processed();
if total == 0 {
0.0
} else {
self.acked as f64 / total as f64
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ack_config_builder() {
let config = AckConfig::new()
.with_mode(AckMode::Auto)
.with_timeout(Duration::from_secs(60))
.with_max_retries(5)
.with_retry_delay(Duration::from_secs(2))
.with_dead_letter_topic(Some("dead-letters".to_string()));
assert_eq!(config.mode, AckMode::Auto);
assert_eq!(config.timeout, Duration::from_secs(60));
assert_eq!(config.max_retries, 5);
assert_eq!(config.retry_delay, Duration::from_secs(2));
assert_eq!(config.dead_letter_topic, Some("dead-letters".to_string()));
}
#[test]
fn test_ack_config_presets() {
let auto_config = AckConfig::auto();
assert_eq!(auto_config.mode, AckMode::Auto);
let manual_config = AckConfig::manual();
assert_eq!(manual_config.mode, AckMode::Manual);
let client_auto_config = AckConfig::client_auto();
assert_eq!(client_auto_config.mode, AckMode::ClientAuto);
}
#[test]
fn test_ack_stats() {
let mut stats = AckStats::new();
stats.increment_acked();
stats.increment_acked();
stats.increment_nacked();
assert_eq!(stats.acked, 2);
assert_eq!(stats.nacked, 1);
assert_eq!(stats.total_processed(), 3);
assert!((stats.success_rate() - 0.6666666666666666).abs() < f64::EPSILON);
}
}