use super::{Message, MessageConsumer, MessageProducer};
use async_trait::async_trait;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::base_consumer::BaseConsumer;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaError;
use rdkafka::message::Message as RdKafkaMessage;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{ClientContext, TopicPartitionList};
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, warn};
pub use crate::config::{KafkaConsumerConfig, KafkaProducerConfig};
struct KafkaConsumerContext;
impl ClientContext for KafkaConsumerContext {}
impl ConsumerContext for KafkaConsumerContext {
fn pre_rebalance(&self, _consumer: &BaseConsumer<KafkaConsumerContext>, rebalance: &Rebalance) {
info!("Kafka rebalance: {:?}", rebalance);
}
fn post_rebalance(
&self,
_consumer: &BaseConsumer<KafkaConsumerContext>,
rebalance: &Rebalance,
) {
info!("Kafka rebalance completed: {:?}", rebalance);
}
fn commit_callback(
&self,
result: rdkafka::error::KafkaResult<()>,
_offsets: &TopicPartitionList,
) {
if let Err(e) = result {
warn!("Kafka commit error: {}", e);
}
}
}
pub struct KafkaProducer {
producer: FutureProducer,
}
impl KafkaProducer {
pub fn new(brokers: &str, config: &KafkaProducerConfig) -> Result<Self, KafkaError> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("acks", &config.acks)
.set("retries", config.retries.to_string())
.set("enable.idempotence", config.enable_idempotence.to_string())
.set("compression.type", "snappy")
.set("linger.ms", "10") .set("batch.size", "32768") .create()?;
info!("✅ Kafka Producer 创建成功");
Ok(Self { producer })
}
}
#[async_trait]
impl MessageProducer for KafkaProducer {
async fn publish(
&self,
topic: &str,
message: Message,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let payload = serde_json::to_vec(&message)?;
let key = message.topic.as_bytes();
let record = FutureRecord::to(topic).key(key).payload(&payload);
self.producer
.send(record, Duration::from_secs(5))
.await
.map_err(|(e, _)| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
Ok(())
}
}
pub struct KafkaConsumer {
consumer: Arc<StreamConsumer<KafkaConsumerContext>>,
}
impl KafkaConsumer {
pub fn new_with_group_id(
brokers: &str,
config: &KafkaConsumerConfig,
group_id: &str,
) -> Result<Self, KafkaError> {
let consumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", group_id)
.set("enable.auto.commit", config.enable_auto_commit.to_string())
.set("auto.offset.reset", "earliest")
.set("session.timeout.ms", "30000")
.set("heartbeat.interval.ms", "10000")
.create_with_context(KafkaConsumerContext)?;
info!("✅ Kafka Consumer 创建成功 (group: {})", group_id);
Ok(Self {
consumer: Arc::new(consumer),
})
}
}
#[async_trait]
impl MessageConsumer for KafkaConsumer {
async fn subscribe(
&self,
topic: &str,
handler: Arc<dyn Fn(Message) + Send + Sync>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.subscribe_topics(vec![topic.to_string()], handler)
.await
}
async fn subscribe_topics(
&self,
topics: Vec<String>,
handler: Arc<dyn Fn(Message) + Send + Sync>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let consumer = self.consumer.clone();
let topics_str: Vec<&str> = topics.iter().map(|s| s.as_str()).collect();
consumer.subscribe(&topics_str)?;
info!("✅ Kafka Consumer 开始订阅 topics: {:?}", topics);
tokio::spawn(async move {
loop {
match consumer.recv().await {
Ok(kafka_message) => {
if let Some(payload) = kafka_message.payload() {
let message = match serde_json::from_slice::<Message>(payload) {
Ok(m) => Some(m),
Err(e) => {
match serde_json::from_slice::<serde_json::Value>(payload) {
Ok(raw_json) => {
Some(Message {
topic: kafka_message.topic().to_string(),
from: "external".to_string(),
data: raw_json,
timestamp: chrono::Utc::now().timestamp_millis(),
})
}
Err(je) => {
error!(
"Failed to deserialize message: {} \n Raw JSON fallback also failed: {}",
e, je
);
None
}
}
}
};
if let Some(message) = message {
handler(message);
if let Err(e) =
consumer.store_offset_from_message(&kafka_message)
{
error!("Failed to store offset: {}", e);
}
if let Err(e) = consumer.commit_consumer_state(CommitMode::Sync)
{
error!("Failed to commit offset: {}", e);
}
}
} else {
warn!("Received empty Kafka message");
}
}
Err(e) => {
error!("Kafka consumer error: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
Ok(())
}
}
#[cfg(feature = "consumer")]
pub fn create_consumers_from_handlers(
brokers: &str,
config: &KafkaConsumerConfig,
handlers: &[std::sync::Arc<dyn crate::messaging::KafkaMessageHandler>],
) -> Result<std::collections::HashMap<String, (KafkaConsumer, Vec<String>)>, KafkaError> {
use std::collections::HashMap;
let mut result: HashMap<String, (KafkaConsumer, Vec<String>)> = HashMap::new();
let mut group_topics: HashMap<String, Vec<String>> = HashMap::new();
if handlers.is_empty() {
return Ok(result);
}
for handler in handlers {
let group_id = handler.group_id();
for topic in handler.topics() {
group_topics
.entry(group_id.clone())
.or_insert_with(Vec::new)
.push(topic);
}
}
for (group_id, topics) in group_topics {
if topics.is_empty() {
continue;
}
let consumer = KafkaConsumer::new_with_group_id(brokers, config, &group_id)?;
result.insert(group_id, (consumer, topics));
}
Ok(result)
}
pub async fn ensure_topic_exists(
brokers: &str,
topic: &str,
partitions: i32,
replication_factor: i32,
) -> Result<(), KafkaError> {
let admin_client: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()?;
let new_topic = NewTopic::new(
topic,
partitions,
TopicReplication::Fixed(replication_factor),
);
let results = admin_client
.create_topics(&[new_topic], &AdminOptions::default())
.await?;
for result in results {
match result {
Ok(topic_name) => {
info!("✅ Kafka Topic 创建成功: {}", topic_name);
}
Err((topic_name, err)) => {
if err.to_string().contains("already exists") {
info!("ℹ️ Kafka Topic 已存在: {}", topic_name);
} else {
warn!("⚠️ Kafka Topic 创建失败: {} - {}", topic_name, err);
}
}
}
}
Ok(())
}