use std::thread;
use kafka::producer::{Producer, Record, RequiredAcks};
use std::time::Duration;
use json::{array, JsonValue, object};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use log::info;
#[derive(Clone)]
pub struct Kafka {
default: String,
connections: JsonValue,
brokers: Vec<String>,
fetch_max: i32,
retry_max: i32,
}
impl Kafka {
pub fn connect(config: JsonValue) -> Self {
let mut brokers = Vec::new();
if config.is_empty() {
return Self {
default: "".to_string(),
connections: object! {},
brokers,
fetch_max: 0,
retry_max: 0,
};
}
let default = config["default"].to_string();
let connections = config["connections"].clone();
let connection = connections[default.clone()].clone();
let fetch_max = connection["fetch_max"].as_i32().unwrap();
let retry_max = connection["retry_max"].as_i32().unwrap();
for item in connection["brokers"].members() {
brokers.push(item.to_string().clone());
}
Self {
default,
connections,
brokers,
fetch_max,
retry_max,
}
}
pub fn add_config(&mut self, connection: JsonValue) -> &mut Self {
self.connections.push(connection).expect("增加配置 失败");
self
}
pub fn switch_config(&mut self, default: &str) -> &mut Self {
self.default = default.to_string();
let connection = self.connections[self.default.clone()].clone();
self.brokers = Vec::new();
for item in connection["brokers"].members() {
self.brokers.push(item.to_string().clone());
}
self.fetch_max = connection["fetch_max"].as_i32().unwrap();
self.retry_max = connection["retry_max"].as_i32().unwrap();
self
}
pub fn get_topics(&mut self) -> Vec<String> {
let mut client = kafka::client::KafkaClient::new(self.brokers.clone());
client.load_metadata_all().unwrap();
let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
topics
}
pub fn create_topic(&mut self, topic: &str) -> bool {
let mut client = kafka::client::KafkaClient::new(self.brokers.clone());
client.load_metadata_all().unwrap();
match client.load_metadata(&[topic]) {
Ok(_) => true,
Err(_) => false
}
}
pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
let producer = Producer::from_hosts(self.brokers.clone())
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create();
return match producer {
Ok(mut e) => {
let res = {
if key == "" {
e.send(&Record::from_value(topic.clone(), data.to_string()))
} else {
e.send(&Record {
topic,
partition: -1,
key,
value: data.to_string().as_bytes(),
})
}
};
match res {
Ok(_e) => {
true
}
Err(e) => {
if e.to_string() == "Kafka Error (MessageSizeTooLarge)".to_string() {
info!("获取消息尺寸过大");
return false;
}
info!("kafka 发送 主题[{}] 失败:{:?}", topic.clone(), e);
false
}
}
}
Err(_e) => {
info!("kafka 消息队列连接失败:{:?}", _e);
false
}
};
}
pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
let cons = Consumer::from_hosts(self.brokers.clone())
.with_topic(topic.to_string())
.with_group(topic.to_string())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(GroupOffsetStorage::Kafka)
.with_fetch_max_bytes_per_partition(self.fetch_max)
.with_retry_max_bytes_limit(self.retry_max)
.create();
match cons {
Ok(mut con) => {
loop {
let mss = con.poll();
return match mss {
Ok(ee) => {
let mut list = array![];
for ms in ee.iter() {
for m in ms.messages() {
let str = String::from_utf8(m.value.to_vec()).unwrap();
let res = match json::parse(str.as_str().clone()) {
Ok(e) => e,
Err(_e) => JsonValue::from(str.clone())
};
let key_value = String::from_utf8(m.key.to_vec()).unwrap();
match only {
true => {
if key_value.as_str() == key {
con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
con.commit_consumed().unwrap();
return res;
}
}
false => {
if key == key_value.as_str() {
list.push(res.clone()).expect("加入失败");
con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
con.commit_consumed().unwrap();
}
}
}
}
}
if only {
JsonValue::String("".to_string())
} else {
list
}
}
Err(e) => {
if e.to_string() == "Kafka Error (MessageSizeTooLarge)".to_string() {
info!("kafka:获取消息尺寸过大");
return JsonValue::String("".to_string());
}
info!("消费失败{:?}", e);
JsonValue::String("".to_string())
}
};
}
}
Err(e) => {
info!("kafka 消息队列连接失败:{:?}", e);
return JsonValue::String("".to_string());
}
}
}
pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
let mut con = Consumer::from_hosts(self.brokers.clone())
.with_topic(topic.to_string())
.with_group(topic.to_string())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(GroupOffsetStorage::Kafka)
.create().unwrap();
loop {
let mss = con.poll().unwrap();
if mss.is_empty() {
thread::sleep(Duration::from_micros(100));
continue;
}
for ms in mss.iter() {
for m in ms.messages() {
let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
if xf {
con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
}
}
}
con.commit_consumed().unwrap();
}
}
}