1use kafka::producer::{Producer, Record, RequiredAcks};
2use std::collections::BTreeMap;
3use std::path::PathBuf;
4use std::time::Duration;
5use std::{fs, thread};
6
7use json::{array, JsonValue};
8use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
9use log::{error, info};
10use serde::{Deserialize, Serialize};
11
12#[derive(Clone)]
13pub struct Kafka {
14 connection: Connection,
15}
16impl Kafka {
17 pub fn new(config: Config) -> Result<Self, String> {
18 let connection = config.connections.get(&*config.default).unwrap().clone();
19 Ok(Self { connection })
20 }
21 pub fn get_topics(&mut self) -> Vec<String> {
23 let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
24 client.load_metadata_all().unwrap();
25 let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
26 topics
27 }
28 pub fn create_topic(&mut self, topic: &str) -> bool {
30 let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
31 client.load_metadata_all().unwrap();
32 client.load_metadata(&[topic]).is_ok()
33 }
34 pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
36 let producer = Producer::from_hosts(self.connection.brokers.clone())
37 .with_ack_timeout(Duration::from_secs(1))
38 .with_required_acks(RequiredAcks::One)
39 .create();
40 match producer {
41 Ok(mut e) => {
42 let res = {
43 if key.is_empty() {
44 e.send(&Record::from_value(topic, data.to_string()))
45 } else {
46 e.send(&Record {
47 topic,
48 partition: -1,
49 key,
50 value: data.to_string().as_bytes(),
51 })
52 }
53 };
54 match res {
55 Ok(_e) => true,
56 Err(e) => {
57 if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
58 info!("获取消息尺寸过大");
59 return false;
60 }
61 info!("kafka 发送 主题[{}] 失败:{:?}", topic, e);
62 false
63 }
64 }
65 }
66 Err(_e) => {
67 info!("kafka 消息队列连接失败:{:?}", _e);
68 false
69 }
70 }
71 }
72 pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
74 let cons = Consumer::from_hosts(self.connection.brokers.clone())
75 .with_topic(topic.to_string())
76 .with_group(topic.to_string())
77 .with_fallback_offset(FetchOffset::Earliest)
78 .with_offset_storage(Some(GroupOffsetStorage::Kafka))
79 .with_fetch_max_bytes_per_partition(self.connection.fetch_max)
80 .with_retry_max_bytes_limit(self.connection.retry_max)
81 .create();
82 match cons {
83 Ok(mut con) => match con.poll() {
84 Ok(ee) => {
85 let mut list = array![];
86 for ms in ee.iter() {
87 for m in ms.messages() {
88 let str = String::from_utf8(m.value.to_vec()).unwrap();
89
90 let res = match json::parse(str.as_str()) {
91 Ok(e) => e,
92 Err(_e) => JsonValue::from(str.clone()),
93 };
94 let key_value = String::from_utf8(m.key.to_vec()).unwrap();
95
96 match only {
97 true => {
98 if key_value.as_str() == key {
99 con.consume_message(ms.topic(), ms.partition(), m.offset)
100 .unwrap();
101 con.commit_consumed().unwrap();
102 return res;
103 }
104 }
105 false => {
106 if key == key_value.as_str() {
107 list.push(res.clone()).expect("加入失败");
108 con.consume_message(ms.topic(), ms.partition(), m.offset)
109 .unwrap();
110 con.commit_consumed().unwrap();
111 }
112 }
113 }
114 }
115 }
116 if only {
117 JsonValue::String("".to_string())
118 } else {
119 list
120 }
121 }
122 Err(e) => {
123 if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
124 error!("kafka:获取消息尺寸过大");
125 return JsonValue::String("".to_string());
126 }
127 error!("消费失败{:?}", e);
128 JsonValue::String("".to_string())
129 }
130 },
131 Err(e) => {
132 error!("kafka 消息队列连接失败:{:?}", e);
133 JsonValue::String("".to_string())
134 }
135 }
136 }
137 pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
139 let mut con = Consumer::from_hosts(self.connection.brokers.clone())
140 .with_topic(topic.to_string())
141 .with_group(topic.to_string())
142 .with_fallback_offset(FetchOffset::Earliest)
143 .with_offset_storage(Some(GroupOffsetStorage::Kafka))
144 .create()
145 .unwrap();
146
147 loop {
148 let mss = con.poll().unwrap();
149 if mss.is_empty() {
150 thread::sleep(Duration::from_micros(100));
151 continue;
152 }
153 for ms in mss.iter() {
154 for m in ms.messages() {
155 let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
156 if xf {
157 con.consume_message(ms.topic(), ms.partition(), m.offset)
158 .unwrap();
159 }
160 }
161 }
162 con.commit_consumed().unwrap();
163 }
164 }
165}
166
167#[derive(Clone, Debug, Deserialize, Serialize)]
168pub struct Config {
169 default: String,
170 connections: BTreeMap<String, Connection>,
171}
172
173impl Default for Config {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179impl Config {
180 pub fn create(config_file: PathBuf) -> Config {
181 let data = Config::new();
182 match fs::read_to_string(config_file.clone()) {
183 Ok(e) => toml::from_str::<Config>(&e).unwrap_or_else(|_| {
184 if e.contains(format!("#{}", env!("CARGO_PKG_NAME")).as_str()) {
185 return data;
186 }
187 let toml = toml::to_string(&data).unwrap();
188 let toml = format!("\r\n#{}\r\n{}{}", env!("CARGO_PKG_NAME"), toml, e);
189 let _ = fs::write(config_file.to_str().unwrap(), toml);
190 data
191 }),
192 Err(_) => {
193 let toml = toml::to_string(&data).unwrap();
194 let _ = fs::write(config_file.to_str().unwrap(), toml);
195 data
196 }
197 }
198 }
199
200 pub fn new() -> Self {
201 let mut connections = BTreeMap::new();
202 connections.insert("my_name".to_string(), Connection::default());
203 Self {
204 default: "my_name".to_string(),
205 connections,
206 }
207 }
208}
209
210#[derive(Clone, Debug, Deserialize, Serialize)]
211struct Connection {
212 brokers: Vec<String>,
213 fetch_max: i32,
214 retry_max: i32,
215}
216impl Connection {
217 pub fn default() -> Connection {
218 Self {
219 brokers: vec![],
220 fetch_max: 0,
221 retry_max: 0,
222 }
223 }
224}