use crate::config::KafkaConnection;
use json::{array, JsonValue};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use kafka::producer::{Producer, Record, RequiredAcks};
use std::sync::mpsc::Sender;
use std::thread;
use std::time::Duration;
#[derive(Clone)]
pub struct Kafka {
connection: KafkaConnection,
}
impl Kafka {
pub fn connect(connection: KafkaConnection) -> Result<Self, String> {
if connection.brokers.is_empty() {
return Err("Kafka brokers 未配置".to_string());
}
Ok(Self { connection })
}
pub fn ping(&self) -> Result<bool, String> {
let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
client
.load_metadata_all()
.map(|_| true)
.map_err(|e| format!("Kafka ping failed: {}", e))
}
pub fn get_topics(&self) -> Result<Vec<String>, String> {
let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
client
.load_metadata_all()
.map_err(|e| format!("加载 Kafka 元数据失败: {}", e))?;
Ok(client.topics().names().map(ToOwned::to_owned).collect())
}
pub fn create_topic(&self, topic: &str) -> Result<bool, String> {
let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
client
.load_metadata_all()
.map_err(|e| format!("加载 Kafka 元数据失败: {}", e))?;
client
.load_metadata(&[topic])
.map(|_| true)
.map_err(|e| format!("创建主题失败: {}", e))
}
pub fn publish(&self, topic: &str, value: JsonValue) -> Result<bool, String> {
self.publish_with_key(topic, "", value)
}
pub fn publish_with_key(
&self,
topic: &str,
key: &str,
data: JsonValue,
) -> Result<bool, String> {
let mut producer = Producer::from_hosts(self.connection.brokers.clone())
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.map_err(|e| format!("Kafka 连接失败: {}", e))?;
let result = if key.is_empty() {
producer.send(&Record::from_value(topic, data.to_string()))
} else {
producer.send(&Record {
topic,
partition: -1,
key,
value: data.to_string().as_bytes(),
})
};
result.map(|_| true).map_err(|e| {
if e.to_string().contains("MessageSizeTooLarge") {
"消息尺寸过大".to_string()
} else {
format!("Kafka 发送失败: {}", e)
}
})
}
pub fn consume(&self, topic: &str, key: &str, only_one: bool) -> Result<JsonValue, String> {
let mut consumer = Consumer::from_hosts(self.connection.brokers.clone())
.with_topic(topic.to_string())
.with_group(topic.to_string())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.with_fetch_max_bytes_per_partition(self.connection.fetch_max)
.with_retry_max_bytes_limit(self.connection.retry_max)
.create()
.map_err(|e| format!("Kafka 消费者创建失败: {}", e))?;
let messages = consumer
.poll()
.map_err(|e| format!("Kafka 消费失败: {}", e))?;
let mut list = array![];
for ms in messages.iter() {
for m in ms.messages() {
let str_value = String::from_utf8(m.value.to_vec())
.map_err(|e| format!("消息解码失败: {}", e))?;
let json_value =
json::parse(&str_value).unwrap_or_else(|_| JsonValue::from(str_value.clone()));
let key_value = String::from_utf8(m.key.to_vec()).unwrap_or_default();
if key.is_empty() || key == key_value {
if only_one {
let _ = consumer.consume_message(ms.topic(), ms.partition(), m.offset);
let _ = consumer.commit_consumed();
return Ok(json_value);
}
let _ = list.push(json_value);
let _ = consumer.consume_message(ms.topic(), ms.partition(), m.offset);
}
}
}
let _ = consumer.commit_consumed();
if only_one {
Ok(JsonValue::Null)
} else {
Ok(list)
}
}
pub fn subscribe(&self, topic: &str, tx: Sender<JsonValue>) -> Result<(), String> {
let brokers = self.connection.brokers.clone();
let topic = topic.to_string();
thread::spawn(move || {
let consumer = Consumer::from_hosts(brokers)
.with_topic(topic.clone())
.with_group(topic.clone())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create();
let mut con = match consumer {
Ok(c) => c,
Err(e) => {
log::error!("Kafka 订阅失败: {}", e);
return;
}
};
loop {
match con.poll() {
Ok(messages) => {
if messages.is_empty() {
thread::sleep(Duration::from_millis(100));
continue;
}
for ms in messages.iter() {
for m in ms.messages() {
if let Ok(str_value) = String::from_utf8(m.value.to_vec()) {
let json_value = json::parse(&str_value)
.unwrap_or_else(|_| JsonValue::from(str_value));
if tx.send(json_value).is_err() {
log::warn!("Kafka 订阅通道已关闭");
return;
}
}
let _ = con.consume_message(ms.topic(), ms.partition(), m.offset);
}
}
let _ = con.commit_consumed();
}
Err(e) => {
log::error!("Kafka 消费错误: {}", e);
thread::sleep(Duration::from_secs(1));
}
}
}
});
Ok(())
}
}