use std::thread;
use kafka::producer::{Producer, Record, RequiredAcks};
use std::time::Duration;
use json::{array, JsonValue, object};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use log::{error, info};
#[derive(Clone)]
pub struct Kafka {
brokers: Vec<String>,
fetch_max: i32,
retry_max: i32,
}
impl Default for Kafka {
fn default() -> Self {
Self::new()
}
}
impl Kafka {
pub fn new() -> Self {
Self {
brokers: vec![],
fetch_max: 0,
retry_max: 0,
}
}
pub fn json(&self) -> JsonValue {
let data = object! {
brokers:self.brokers.clone(),
fetch_max:self.fetch_max,
retry_max:self.retry_max,
};
data
}
fn from(data: JsonValue) -> Self {
Self {
brokers: data["brokers"].members().map(|x| x.to_string().clone()).collect(),
fetch_max: data["fetch_max"].as_i32().unwrap(),
retry_max: data["retry_max"].as_i32().unwrap(),
}
}
pub fn form(config: JsonValue) -> Result<Self, String> {
if config.is_empty() {
return Ok(Self::new());
}
Ok(Self::from(config.clone()))
}
pub fn connect(config: JsonValue) -> Self {
if config.is_empty() {
return Self::new();
}
Self::from(config.clone())
}
pub fn get_topics(&mut self) -> Vec<String> {
let mut client = kafka::client::KafkaClient::new(self.brokers.clone());
client.load_metadata_all().unwrap();
let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
topics
}
pub fn create_topic(&mut self, topic: &str) -> bool {
let mut client = kafka::client::KafkaClient::new(self.brokers.clone());
client.load_metadata_all().unwrap();
client.load_metadata(&[topic]).is_ok()
}
pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
let producer = Producer::from_hosts(self.brokers.clone())
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create();
match producer {
Ok(mut e) => {
let res = {
if key.is_empty() {
e.send(&Record::from_value(topic, data.to_string()))
} else {
e.send(&Record {
topic,
partition: -1,
key,
value: data.to_string().as_bytes(),
})
}
};
match res {
Ok(_e) => {
true
}
Err(e) => {
if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
info!("获取消息尺寸过大");
return false;
}
info!("kafka 发送 主题[{}] 失败:{:?}", topic, e);
false
}
}
}
Err(_e) => {
info!("kafka 消息队列连接失败:{:?}", _e);
false
}
}
}
pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
let cons = Consumer::from_hosts(self.brokers.clone())
.with_topic(topic.to_string())
.with_group(topic.to_string())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.with_fetch_max_bytes_per_partition(self.fetch_max)
.with_retry_max_bytes_limit(self.retry_max)
.create();
match cons {
Ok(mut con) => {
match con.poll() {
Ok(ee) => {
let mut list = array![];
for ms in ee.iter() {
for m in ms.messages() {
let str = String::from_utf8(m.value.to_vec()).unwrap();
let res = match json::parse(str.as_str()) {
Ok(e) => e,
Err(_e) => JsonValue::from(str.clone())
};
let key_value = String::from_utf8(m.key.to_vec()).unwrap();
match only {
true => {
if key_value.as_str() == key {
con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
con.commit_consumed().unwrap();
return res;
}
}
false => {
if key == key_value.as_str() {
list.push(res.clone()).expect("加入失败");
con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
con.commit_consumed().unwrap();
}
}
}
}
}
if only {
JsonValue::String("".to_string())
} else {
list
}
}
Err(e) => {
if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
error!("kafka:获取消息尺寸过大");
return JsonValue::String("".to_string());
}
error!("消费失败{:?}", e);
JsonValue::String("".to_string())
}
}
}
Err(e) => {
error!("kafka 消息队列连接失败:{:?}", e);
JsonValue::String("".to_string())
}
}
}
pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
let mut con = Consumer::from_hosts(self.brokers.clone())
.with_topic(topic.to_string())
.with_group(topic.to_string())
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create().unwrap();
loop {
let mss = con.poll().unwrap();
if mss.is_empty() {
thread::sleep(Duration::from_micros(100));
continue;
}
for ms in mss.iter() {
for m in ms.messages() {
let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
if xf {
con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
}
}
}
con.commit_consumed().unwrap();
}
}
}