use crate::ack::{AckHandle, AckSubscriber};
use crate::kafka::KafkaError;
use crate::Message;
use async_trait::async_trait;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::BorrowedMessage;
use rdkafka::{ClientConfig, TopicPartitionList, Message as KafkaMessage};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct KafkaAckHandle {
message_id: String,
topic: String,
timestamp: SystemTime,
delivery_count: u32,
partition: i32,
offset: i64,
handle_id: String,
}
impl KafkaAckHandle {
pub fn new(
message_id: String,
topic: String,
timestamp: SystemTime,
delivery_count: u32,
partition: i32,
offset: i64,
) -> Self {
Self {
message_id,
topic,
timestamp,
delivery_count,
partition,
offset,
handle_id: Uuid::new_v4().to_string(),
}
}
pub fn partition(&self) -> i32 {
self.partition
}
pub fn offset(&self) -> i64 {
self.offset
}
pub fn handle_id(&self) -> &str {
&self.handle_id
}
}
impl AckHandle for KafkaAckHandle {
fn message_id(&self) -> &str {
&self.message_id
}
fn topic(&self) -> &str {
&self.topic
}
fn timestamp(&self) -> SystemTime {
self.timestamp
}
fn delivery_count(&self) -> u32 {
self.delivery_count
}
}
pub struct KafkaAckSubscriber {
consumer: StreamConsumer,
state: Mutex<SubscriberState>,
group_id: String,
brokers: Vec<String>,
#[cfg(feature = "logging")]
logger: Arc<dyn crate::logging::Logger>,
}
struct SubscriberState {
topics: Vec<String>,
pending_acks: HashMap<(String, i32, i64), KafkaAckHandle>,
committed_offsets: HashMap<(String, i32), i64>,
}
impl KafkaAckSubscriber {
#[cfg(not(feature = "logging"))]
pub async fn new(
brokers: Vec<String>,
group_id: String,
) -> Result<Self, KafkaError> {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", brokers.join(","))
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false") .set("auto.offset.reset", "earliest")
.create()
.map_err(KafkaError::Kafka)?;
Ok(Self {
consumer,
state: Mutex::new(SubscriberState {
topics: Vec::new(),
pending_acks: HashMap::new(),
committed_offsets: HashMap::new(),
}),
group_id,
brokers,
})
}
#[cfg(feature = "logging")]
pub async fn new(
brokers: Vec<String>,
group_id: String,
) -> Result<Self, KafkaError> {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", &group_id)
.set("bootstrap.servers", brokers.join(","))
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false") .set("auto.offset.reset", "earliest")
.create()
.map_err(KafkaError::Kafka)?;
let logger = Arc::new(crate::logging::NoOpLogger::new());
Ok(Self {
consumer,
state: Mutex::new(SubscriberState {
topics: Vec::new(),
pending_acks: HashMap::new(),
committed_offsets: HashMap::new(),
}),
group_id,
brokers,
logger,
})
}
#[cfg(feature = "logging")]
pub fn with_logger(mut self, logger: Arc<dyn crate::logging::Logger>) -> Self {
self.logger = logger;
self
}
pub fn group_id(&self) -> &str {
&self.group_id
}
pub fn brokers(&self) -> &[String] {
&self.brokers
}
pub async fn subscribed_topics(&self) -> Vec<String> {
let state = self.state.lock().await;
state.topics.clone()
}
pub async fn is_subscribed(&self) -> bool {
let state = self.state.lock().await;
!state.topics.is_empty()
}
async fn commit_offset(&self, topic: &str, partition: i32, offset: i64) -> Result<(), KafkaError> {
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(topic, partition, rdkafka::Offset::Offset(offset + 1))
.map_err(|e| KafkaError::ConfigurationError(format!("Failed to add partition offset: {:?}", e)))?;
self.consumer
.commit(&tpl, rdkafka::consumer::CommitMode::Sync)
.map_err(KafkaError::Kafka)?;
let mut state = self.state.lock().await;
state.committed_offsets.insert((topic.to_string(), partition), offset);
Ok(())
}
async fn commit_offsets_batch(&self, offsets: Vec<(String, i32, i64)>) -> Result<(), KafkaError> {
if offsets.is_empty() {
return Ok(());
}
let mut tpl = TopicPartitionList::new();
for (topic, partition, offset) in &offsets {
tpl.add_partition_offset(topic, *partition, rdkafka::Offset::Offset(offset + 1))
.map_err(|e| KafkaError::ConfigurationError(format!("Failed to add partition offset: {:?}", e)))?;
}
self.consumer
.commit(&tpl, rdkafka::consumer::CommitMode::Sync)
.map_err(KafkaError::Kafka)?;
let mut state = self.state.lock().await;
for (topic, partition, offset) in offsets {
state.committed_offsets.insert((topic, partition), offset);
}
Ok(())
}
fn convert_message(&self, kafka_msg: &BorrowedMessage) -> Result<Message, KafkaError> {
let payload = kafka_msg.payload()
.ok_or_else(|| KafkaError::Serialization("Empty message payload".to_string()))?
.to_vec();
match serde_json::from_slice::<Message>(&payload) {
Ok(message) => Ok(message),
Err(_) => {
Ok(Message::new(payload))
}
}
}
}
#[async_trait]
impl AckSubscriber for KafkaAckSubscriber {
type Error = Box<dyn std::error::Error + Send + Sync>;
type AckHandle = KafkaAckHandle;
async fn subscribe(&self, topic: &str) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!("Subscribing to Kafka topic {} with acknowledgment support", topic))
.await;
self.consumer
.subscribe(&[topic])
.map_err(|e| Box::new(KafkaError::Kafka(e)) as Box<dyn std::error::Error + Send + Sync>)?;
let mut state = self.state.lock().await;
if !state.topics.contains(&topic.to_string()) {
state.topics.push(topic.to_string());
}
#[cfg(feature = "logging")]
self.logger
.info(&format!("Successfully subscribed to Kafka topic {}", topic))
.await;
Ok(())
}
async fn receive_with_ack(&mut self) -> Result<(Message, Self::AckHandle), Self::Error> {
#[cfg(feature = "logging")]
self.logger.info("Waiting to receive Kafka message with acknowledgment").await;
use futures::StreamExt;
let kafka_msg = self.consumer
.stream()
.next()
.await
.ok_or_else(|| {
Box::new(KafkaError::Kafka(rdkafka::error::KafkaError::NoMessageReceived))
as Box<dyn std::error::Error + Send + Sync>
})?
.map_err(|e| {
Box::new(KafkaError::Kafka(e)) as Box<dyn std::error::Error + Send + Sync>
})?;
let message = self.convert_message(&kafka_msg)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
let ack_handle = KafkaAckHandle::new(
message.uuid.clone(),
kafka_msg.topic().to_string(),
SystemTime::now(),
1, kafka_msg.partition(),
kafka_msg.offset(),
);
let mut state = self.state.lock().await;
let key = (
kafka_msg.topic().to_string(),
kafka_msg.partition(),
kafka_msg.offset(),
);
state.pending_acks.insert(key, ack_handle.clone());
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Received Kafka message: topic={}, partition={}, offset={}, message_id={}",
kafka_msg.topic(),
kafka_msg.partition(),
kafka_msg.offset(),
message.uuid
))
.await;
Ok((message, ack_handle))
}
async fn ack(&self, handle: Self::AckHandle) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Acknowledging Kafka message: topic={}, partition={}, offset={}, message_id={}",
handle.topic(),
handle.partition(),
handle.offset(),
handle.message_id()
))
.await;
self.commit_offset(handle.topic(), handle.partition(), handle.offset())
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
let mut state = self.state.lock().await;
let key = (handle.topic().to_string(), handle.partition(), handle.offset());
state.pending_acks.remove(&key);
Ok(())
}
async fn nack(&self, handle: Self::AckHandle, requeue: bool) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Negatively acknowledging Kafka message: topic={}, partition={}, offset={}, message_id={}, requeue={}",
handle.topic(),
handle.partition(),
handle.offset(),
handle.message_id(),
requeue
))
.await;
if requeue {
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Message will be redelivered: topic={}, partition={}, offset={}",
handle.topic(),
handle.partition(),
handle.offset()
))
.await;
} else {
self.commit_offset(handle.topic(), handle.partition(), handle.offset())
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
}
let mut state = self.state.lock().await;
let key = (handle.topic().to_string(), handle.partition(), handle.offset());
state.pending_acks.remove(&key);
Ok(())
}
async fn ack_batch(&self, handles: Vec<Self::AckHandle>) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!("Batch acknowledging {} Kafka messages", handles.len()))
.await;
if handles.is_empty() {
return Ok(());
}
let mut max_offsets: HashMap<(String, i32), i64> = HashMap::new();
for handle in &handles {
let key = (handle.topic().to_string(), handle.partition());
let current_max = max_offsets.get(&key).copied().unwrap_or(-1);
if handle.offset() > current_max {
max_offsets.insert(key, handle.offset());
}
}
let offsets: Vec<(String, i32, i64)> = max_offsets
.into_iter()
.map(|((topic, partition), offset)| (topic, partition, offset))
.collect();
self.commit_offsets_batch(offsets)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
let mut state = self.state.lock().await;
for handle in handles {
let key = (handle.topic().to_string(), handle.partition(), handle.offset());
state.pending_acks.remove(&key);
}
Ok(())
}
async fn nack_batch(&self, handles: Vec<Self::AckHandle>, requeue: bool) -> Result<(), Self::Error> {
#[cfg(feature = "logging")]
self.logger
.info(&format!(
"Batch negatively acknowledging {} Kafka messages (requeue: {})",
handles.len(),
requeue
))
.await;
if handles.is_empty() {
return Ok(());
}
if !requeue {
let offsets: Vec<(String, i32, i64)> = handles
.iter()
.map(|h| (h.topic().to_string(), h.partition(), h.offset()))
.collect();
self.commit_offsets_batch(offsets)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
}
let mut state = self.state.lock().await;
for handle in handles {
let key = (handle.topic().to_string(), handle.partition(), handle.offset());
state.pending_acks.remove(&key);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::SystemTime;
#[test]
fn test_kafka_ack_handle_creation() {
let handle = KafkaAckHandle::new(
"msg-123".to_string(),
"test-topic".to_string(),
SystemTime::now(),
1,
0,
12345,
);
assert_eq!(handle.message_id(), "msg-123");
assert_eq!(handle.topic(), "test-topic");
assert_eq!(handle.delivery_count(), 1);
assert!(!handle.is_retry());
assert_eq!(handle.partition(), 0);
assert_eq!(handle.offset(), 12345);
assert!(!handle.handle_id().is_empty());
}
#[test]
fn test_kafka_ack_handle_retry() {
let handle = KafkaAckHandle::new(
"msg-456".to_string(),
"test-topic".to_string(),
SystemTime::now(),
3,
1,
67890,
);
assert_eq!(handle.delivery_count(), 3);
assert!(handle.is_retry());
assert_eq!(handle.partition(), 1);
assert_eq!(handle.offset(), 67890);
}
}