1use kafka::producer::{Producer, Record, RequiredAcks};
2use std::collections::BTreeMap;
3use std::path::PathBuf;
4use std::time::Duration;
5use std::{fs, thread};
6
7use json::{array, JsonValue};
8use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
9use log::{error, info};
10use serde::{Deserialize, Serialize};
11
12#[derive(Clone)]
13pub struct Kafka {
14 connection: Connection,
15}
16impl Kafka {
17 pub fn new(config: Config) -> Result<Self, String> {
18 let connection = config.connections.get(&*config.default).unwrap().clone();
19 Ok(Self { connection })
20 }
21 pub fn get_topics(&mut self) -> Vec<String> {
23 let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
24 client.load_metadata_all().unwrap();
25 let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
26 topics
27 }
28 pub fn create_topic(&mut self, topic: &str) -> bool {
30 let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
31 client.load_metadata_all().unwrap();
32 client.load_metadata(&[topic]).is_ok()
33 }
34 pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
36 let producer = Producer::from_hosts(self.connection.brokers.clone()).with_ack_timeout(Duration::from_secs(1)).with_required_acks(RequiredAcks::One).create();
37 match producer {
38 Ok(mut e) => {
39 let res = {
40 if key.is_empty() {
41 e.send(&Record::from_value(topic, data.to_string()))
42 } else {
43 e.send(&Record {
44 topic,
45 partition: -1,
46 key,
47 value: data.to_string().as_bytes(),
48 })
49 }
50 };
51 match res {
52 Ok(_e) => true,
53 Err(e) => {
54 if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
55 info!("获取消息尺寸过大");
56 return false;
57 }
58 info!("kafka 发送 主题[{}] 失败:{:?}", topic, e);
59 false
60 }
61 }
62 }
63 Err(_e) => {
64 info!("kafka 消息队列连接失败:{:?}", _e);
65 false
66 }
67 }
68 }
69 pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
71 let cons = Consumer::from_hosts(self.connection.brokers.clone()).with_topic(topic.to_string()).with_group(topic.to_string()).with_fallback_offset(FetchOffset::Earliest).with_offset_storage(Some(GroupOffsetStorage::Kafka)).with_fetch_max_bytes_per_partition(self.connection.fetch_max).with_retry_max_bytes_limit(self.connection.retry_max).create();
72 match cons {
73 Ok(mut con) => match con.poll() {
74 Ok(ee) => {
75 let mut list = array![];
76 for ms in ee.iter() {
77 for m in ms.messages() {
78 let str = String::from_utf8(m.value.to_vec()).unwrap();
79
80 let res = match json::parse(str.as_str()) {
81 Ok(e) => e,
82 Err(_e) => JsonValue::from(str.clone()),
83 };
84 let key_value = String::from_utf8(m.key.to_vec()).unwrap();
85
86 match only {
87 true => {
88 if key_value.as_str() == key {
89 con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
90 con.commit_consumed().unwrap();
91 return res;
92 }
93 }
94 false => {
95 if key == key_value.as_str() {
96 list.push(res.clone()).expect("加入失败");
97 con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
98 con.commit_consumed().unwrap();
99 }
100 }
101 }
102 }
103 }
104 if only {
105 JsonValue::String("".to_string())
106 } else {
107 list
108 }
109 }
110 Err(e) => {
111 if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
112 error!("kafka:获取消息尺寸过大");
113 return JsonValue::String("".to_string());
114 }
115 error!("消费失败{:?}", e);
116 JsonValue::String("".to_string())
117 }
118 },
119 Err(e) => {
120 error!("kafka 消息队列连接失败:{:?}", e);
121 JsonValue::String("".to_string())
122 }
123 }
124 }
125 pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
127 let mut con = Consumer::from_hosts(self.connection.brokers.clone()).with_topic(topic.to_string()).with_group(topic.to_string()).with_fallback_offset(FetchOffset::Earliest).with_offset_storage(Some(GroupOffsetStorage::Kafka)).create().unwrap();
128
129 loop {
130 let mss = con.poll().unwrap();
131 if mss.is_empty() {
132 thread::sleep(Duration::from_micros(100));
133 continue;
134 }
135 for ms in mss.iter() {
136 for m in ms.messages() {
137 let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
138 if xf {
139 con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
140 }
141 }
142 }
143 con.commit_consumed().unwrap();
144 }
145 }
146}
147
148#[derive(Clone, Debug, Deserialize, Serialize)]
149pub struct Config {
150 default: String,
151 connections: BTreeMap<String, Connection>,
152}
153
154impl Default for Config {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160impl Config {
161 pub fn create(config_file: PathBuf, pkg_name: bool) -> Config {
162 #[derive(Clone, Debug, Deserialize, Serialize)]
163 pub struct BrKafka {
164 pub br_kafka: Config,
165 }
166 impl BrKafka {
167 pub fn new() -> BrKafka {
168 let mut connections = BTreeMap::new();
169 connections.insert("my_name".to_string(), Connection::default());
170 Self {
171 br_kafka: Config {
172 default: "my_name".to_string(),
173 connections,
174 },
175 }
176 }
177 }
178 match fs::read_to_string(config_file.clone()) {
179 Ok(e) => {
180 if pkg_name {
181 let data = BrKafka::new();
182 toml::from_str::<BrKafka>(&e).unwrap_or_else(|_| {
183 let toml = toml::to_string(&data).unwrap();
184 let toml = format!("{}\r\n{}", e, toml);
185 let _ = fs::write(config_file.to_str().unwrap(), toml);
186 data
187 }).br_kafka
188 } else {
189 Config::new()
190 }
191 }
192 Err(_) => {
193 if pkg_name {
194 let data = BrKafka::new();
195 fs::create_dir_all(config_file.parent().unwrap()).unwrap();
196 let toml = toml::to_string(&data).unwrap();
197 let _ = fs::write(config_file.to_str().unwrap(), toml);
198 data.br_kafka
199 } else {
200 let data = Config::new();
201 fs::create_dir_all(config_file.parent().unwrap()).unwrap();
202 let toml = toml::to_string(&data).unwrap();
203 let _ = fs::write(config_file.to_str().unwrap(), toml);
204 data
205 }
206 }
207 }
208 }
209
210 pub fn new() -> Self {
211 let mut connections = BTreeMap::new();
212 connections.insert("my_name".to_string(), Connection::default());
213 Self {
214 default: "my_name".to_string(),
215 connections,
216 }
217 }
218}
219
220#[derive(Clone, Debug, Deserialize, Serialize)]
221struct Connection {
222 brokers: Vec<String>,
223 fetch_max: i32,
224 retry_max: i32,
225}
226impl Connection {
227 pub fn default() -> Connection {
228 Self {
229 brokers: vec![],
230 fetch_max: 0,
231 retry_max: 0,
232 }
233 }
234}