use crate::ack::{AckHandle, AckSubscriber};
use crate::router::HandlerFunc;
use crate::{Message, Publisher};
use std::error::Error;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::time::timeout;
#[derive(Debug, Clone)]
pub struct RouterAckConfig {
pub strategy: AckStrategy,
pub processing_timeout: Option<Duration>,
pub max_retries: u32,
pub requeue_on_failure: bool,
pub batch_size: Option<usize>,
}
impl Default for RouterAckConfig {
fn default() -> Self {
Self {
strategy: AckStrategy::AutoAckOnSuccess,
processing_timeout: Some(Duration::from_secs(30)),
max_retries: 3,
requeue_on_failure: true,
batch_size: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum AckStrategy {
AutoAckOnSuccess,
Manual,
AlwaysAck,
NeverAck,
}
#[derive(Debug, Clone, Default)]
pub struct RouterAckStats {
pub messages_processed: u64,
pub messages_acked: u64,
pub messages_nacked: u64,
pub messages_timed_out: u64,
pub messages_max_retries_exceeded: u64,
pub avg_processing_time_ms: f64,
total_processing_time_ms: u64,
}
impl RouterAckStats {
pub fn update_processing_time(&mut self, duration: Duration) {
self.total_processing_time_ms += duration.as_millis() as u64;
self.avg_processing_time_ms = if self.messages_processed > 0 {
self.total_processing_time_ms as f64 / self.messages_processed as f64
} else {
0.0
};
}
pub fn ack_rate(&self) -> f64 {
if self.messages_processed > 0 {
(self.messages_acked as f64 / self.messages_processed as f64) * 100.0
} else {
0.0
}
}
pub fn nack_rate(&self) -> f64 {
if self.messages_processed > 0 {
(self.messages_nacked as f64 / self.messages_processed as f64) * 100.0
} else {
0.0
}
}
}
#[cfg(feature = "logging")]
pub struct AckRouter<S, H>
where
S: AckSubscriber + Send + Sync,
H: AckHandle + Send + Sync,
{
logger: Arc<dyn crate::logging::Logger>,
consume_topic: String,
publish_topic: String,
subscriber: Arc<Mutex<S>>,
publisher: Arc<dyn Publisher<Error = Box<dyn Error + Send + Sync>>>,
handler: HandlerFunc,
config: RouterAckConfig,
stats: Arc<Mutex<RouterAckStats>>,
_phantom: PhantomData<H>,
}
#[cfg(not(feature = "logging"))]
pub struct AckRouter<S, H>
where
S: AckSubscriber + Send + Sync,
H: AckHandle + Send + Sync,
{
consume_topic: String,
publish_topic: String,
subscriber: Arc<Mutex<S>>,
publisher: Arc<dyn Publisher<Error = Box<dyn Error + Send + Sync>>>,
handler: HandlerFunc,
config: RouterAckConfig,
stats: Arc<Mutex<RouterAckStats>>,
_phantom: PhantomData<H>,
}
#[cfg(feature = "logging")]
impl<S, H> AckRouter<S, H>
where
S: AckSubscriber<AckHandle = H> + Send + Sync + 'static,
H: AckHandle + Send + Sync + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>>,
{
pub fn new(
logger: Arc<dyn crate::logging::Logger>,
consume_topic: String,
publish_topic: String,
subscriber: Arc<Mutex<S>>,
publisher: Arc<dyn Publisher<Error = Box<dyn Error + Send + Sync>>>,
handler: HandlerFunc,
config: RouterAckConfig,
) -> Self {
Self {
logger,
consume_topic,
publish_topic,
subscriber,
publisher,
handler,
config,
stats: Arc::new(Mutex::new(RouterAckStats::default())),
_phantom: PhantomData,
}
}
pub fn with_default_config(
logger: Arc<dyn crate::logging::Logger>,
consume_topic: String,
publish_topic: String,
subscriber: Arc<Mutex<S>>,
publisher: Arc<dyn Publisher<Error = Box<dyn Error + Send + Sync>>>,
handler: HandlerFunc,
) -> Self {
Self::new(
logger,
consume_topic,
publish_topic,
subscriber,
publisher,
handler,
RouterAckConfig::default(),
)
}
}
#[cfg(not(feature = "logging"))]
impl<S, H> AckRouter<S, H>
where
S: AckSubscriber<AckHandle = H> + Send + Sync + 'static,
H: AckHandle + Send + Sync + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>>,
{
pub fn new(
consume_topic: String,
publish_topic: String,
subscriber: Arc<Mutex<S>>,
publisher: Arc<dyn Publisher<Error = Box<dyn Error + Send + Sync>>>,
handler: HandlerFunc,
config: RouterAckConfig,
) -> Self {
Self {
consume_topic,
publish_topic,
subscriber,
publisher,
handler,
config,
stats: Arc::new(Mutex::new(RouterAckStats::default())),
_phantom: PhantomData,
}
}
pub fn with_default_config(
consume_topic: String,
publish_topic: String,
subscriber: Arc<Mutex<S>>,
publisher: Arc<dyn Publisher<Error = Box<dyn Error + Send + Sync>>>,
handler: HandlerFunc,
) -> Self {
Self::new(
consume_topic,
publish_topic,
subscriber,
publisher,
handler,
RouterAckConfig::default(),
)
}
}
impl<S, H> AckRouter<S, H>
where
S: AckSubscriber<AckHandle = H> + Send + Sync + 'static,
H: AckHandle + Send + Sync + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>>,
{
pub async fn stats(&self) -> RouterAckStats {
let stats = self.stats.lock().await;
stats.clone()
}
pub async fn reset_stats(&self) {
let mut stats = self.stats.lock().await;
*stats = RouterAckStats::default();
}
pub fn with_config(mut self, config: RouterAckConfig) -> Self {
self.config = config;
self
}
pub async fn run(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
#[cfg(feature = "logging")]
self.logger.info("Starting acknowledgment-aware router...").await;
{
let subscriber = self.subscriber.lock().await;
subscriber.subscribe(&self.consume_topic).await
.map_err(|e| e.into())?;
}
#[cfg(feature = "logging")]
self.logger.info(&format!("Subscribed to topic: {}", self.consume_topic)).await;
loop {
match self.process_single_message().await {
Ok(_) => {
}
Err(e) => {
#[cfg(feature = "logging")]
self.logger.error(&format!("Error in message processing loop: {}", e)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
pub async fn process_single_message(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let start_time = Instant::now();
let (message, ack_handle) = {
let mut subscriber = self.subscriber.lock().await;
subscriber.receive_with_ack().await.map_err(|e| e.into())?
};
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Received message: id={}, topic={}, delivery_count={}",
ack_handle.message_id(),
ack_handle.topic(),
ack_handle.delivery_count()
)).await;
{
let mut stats = self.stats.lock().await;
stats.messages_processed += 1;
}
let processing_result = if let Some(timeout_duration) = self.config.processing_timeout {
match timeout(timeout_duration, (self.handler)(message.clone())).await {
Ok(result) => result,
Err(_) => {
#[cfg(feature = "logging")]
self.logger.error(&format!(
"Message processing timed out after {:?} for message: {}",
timeout_duration,
ack_handle.message_id()
)).await;
{
let mut stats = self.stats.lock().await;
stats.messages_timed_out += 1;
}
return self.handle_processing_failure(ack_handle, "Processing timeout").await;
}
}
} else {
(self.handler)(message.clone()).await
};
let processing_time = start_time.elapsed();
{
let mut stats = self.stats.lock().await;
stats.update_processing_time(processing_time);
}
match processing_result {
Ok(processed_messages) => {
self.handle_processing_success(message, ack_handle, processed_messages).await
}
Err(e) => {
#[cfg(feature = "logging")]
self.logger.error(&format!(
"Message processing failed for message {}: {}",
ack_handle.message_id(),
e
)).await;
self.handle_processing_failure(ack_handle, &e.to_string()).await
}
}
}
async fn handle_processing_success(
&self,
_original_message: Message,
ack_handle: H,
processed_messages: Vec<Message>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
if !processed_messages.is_empty() {
let message_count = processed_messages.len();
self.publisher
.publish(&self.publish_topic, processed_messages)
.await?;
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Published {} processed messages to topic: {}",
message_count,
self.publish_topic
)).await;
}
match self.config.strategy {
AckStrategy::AutoAckOnSuccess | AckStrategy::AlwaysAck => {
let message_id = ack_handle.message_id().to_string();
let subscriber = self.subscriber.lock().await;
subscriber.ack(ack_handle).await.map_err(|e| e.into())?;
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Message acknowledged: {}",
message_id
)).await;
{
let mut stats = self.stats.lock().await;
stats.messages_acked += 1;
}
}
AckStrategy::Manual => {
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Manual acknowledgment mode - handler should handle ack for message: {}",
ack_handle.message_id()
)).await;
}
AckStrategy::NeverAck => {
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Never acknowledge mode - message not acknowledged: {}",
ack_handle.message_id()
)).await;
}
}
Ok(())
}
async fn handle_processing_failure(
&self,
ack_handle: H,
error_message: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let delivery_count = ack_handle.delivery_count();
let message_id = ack_handle.message_id().to_string();
let should_retry = delivery_count <= self.config.max_retries;
if should_retry && self.config.requeue_on_failure {
let subscriber = self.subscriber.lock().await;
subscriber.nack(ack_handle, true).await.map_err(|e| e.into())?;
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Message negatively acknowledged with requeue (attempt {}): {} - {}",
delivery_count,
message_id,
error_message
)).await;
} else {
let subscriber = self.subscriber.lock().await;
subscriber.nack(ack_handle, false).await.map_err(|e| e.into())?;
#[cfg(feature = "logging")]
if delivery_count > self.config.max_retries {
self.logger.error(&format!(
"Max retries exceeded - message discarded: {} (attempts: {})",
message_id,
delivery_count
)).await;
{
let mut stats = self.stats.lock().await;
stats.messages_max_retries_exceeded += 1;
}
} else {
self.logger.info(&format!(
"Message negatively acknowledged without requeue: {} - {}",
message_id,
error_message
)).await;
}
}
{
let mut stats = self.stats.lock().await;
stats.messages_nacked += 1;
}
Ok(())
}
pub async fn run_with_batching(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
if let Some(batch_size) = self.config.batch_size {
self.run_batched(batch_size).await
} else {
self.run().await
}
}
async fn run_batched(&self, batch_size: usize) -> Result<(), Box<dyn Error + Send + Sync>> {
#[cfg(feature = "logging")]
self.logger.info(&format!(
"Starting acknowledgment-aware router with batch size: {}",
batch_size
)).await;
{
let subscriber = self.subscriber.lock().await;
subscriber.subscribe(&self.consume_topic).await
.map_err(|e| e.into())?;
}
let mut batch_messages = Vec::new();
let mut batch_handles = Vec::new();
loop {
for _ in 0..batch_size {
match self.receive_single_message().await {
Ok((message, handle)) => {
batch_messages.push(message);
batch_handles.push(handle);
}
Err(e) => {
#[cfg(feature = "logging")]
self.logger.error(&format!("Error receiving message: {}", e)).await;
break;
}
}
}
if !batch_messages.is_empty() {
self.process_message_batch(batch_messages, batch_handles).await?;
batch_messages = Vec::new();
batch_handles = Vec::new();
}
}
}
async fn receive_single_message(&self) -> Result<(Message, H), Box<dyn Error + Send + Sync>> {
let mut subscriber = self.subscriber.lock().await;
subscriber.receive_with_ack().await.map_err(|e| e.into())
}
async fn process_message_batch(
&self,
messages: Vec<Message>,
handles: Vec<H>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut success_handles = Vec::new();
let mut failed_handles = Vec::new();
let mut all_processed_messages = Vec::new();
for (message, handle) in messages.into_iter().zip(handles.into_iter()) {
match (self.handler)(message).await {
Ok(mut processed_messages) => {
all_processed_messages.append(&mut processed_messages);
success_handles.push(handle);
}
Err(_) => {
failed_handles.push(handle);
}
}
}
if !all_processed_messages.is_empty() {
self.publisher
.publish(&self.publish_topic, all_processed_messages)
.await?;
}
if !success_handles.is_empty() {
let subscriber = self.subscriber.lock().await;
subscriber.ack_batch(success_handles).await.map_err(|e| e.into())?;
}
if !failed_handles.is_empty() {
let subscriber = self.subscriber.lock().await;
subscriber.nack_batch(failed_handles, self.config.requeue_on_failure)
.await
.map_err(|e| e.into())?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ack_config_default() {
let config = RouterAckConfig::default();
assert_eq!(config.strategy, AckStrategy::AutoAckOnSuccess);
assert_eq!(config.processing_timeout, Some(Duration::from_secs(30)));
assert_eq!(config.max_retries, 3);
assert!(config.requeue_on_failure);
assert_eq!(config.batch_size, None);
}
#[test]
fn test_ack_stats_calculations() {
let mut stats = RouterAckStats::default();
assert_eq!(stats.ack_rate(), 0.0);
assert_eq!(stats.nack_rate(), 0.0);
assert_eq!(stats.avg_processing_time_ms, 0.0);
stats.messages_processed = 10;
stats.messages_acked = 8;
stats.messages_nacked = 2;
stats.update_processing_time(Duration::from_millis(100));
stats.update_processing_time(Duration::from_millis(200));
assert_eq!(stats.ack_rate(), 80.0);
assert_eq!(stats.nack_rate(), 20.0);
assert!(stats.avg_processing_time_ms > 0.0);
}
#[test]
fn test_ack_strategy_variants() {
let strategies = vec![
AckStrategy::AutoAckOnSuccess,
AckStrategy::Manual,
AckStrategy::AlwaysAck,
AckStrategy::NeverAck,
];
for strategy in strategies {
let config = RouterAckConfig {
strategy: strategy.clone(),
..Default::default()
};
assert_eq!(config.strategy, strategy);
}
}
}