br_kafka/
lib.rs

1use kafka::producer::{Producer, Record, RequiredAcks};
2use std::collections::BTreeMap;
3use std::path::PathBuf;
4use std::time::Duration;
5use std::{fs, thread};
6
7use json::{array, JsonValue};
8use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
9use log::{error, info};
10use serde::{Deserialize, Serialize};
11
12#[derive(Clone)]
13pub struct Kafka {
14    connection: Connection,
15}
16impl Kafka {
17    pub fn new(config: Config) -> Result<Self, String> {
18        let connection = config.connections.get(&*config.default).unwrap().clone();
19        Ok(Self { connection })
20    }
21    /// 获取全部主题
22    pub fn get_topics(&mut self) -> Vec<String> {
23        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
24        client.load_metadata_all().unwrap();
25        let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
26        topics
27    }
28    /// 创建主题
29    pub fn create_topic(&mut self, topic: &str) -> bool {
30        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
31        client.load_metadata_all().unwrap();
32        client.load_metadata(&[topic]).is_ok()
33    }
34    /// 生产
35    pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
36        let producer = Producer::from_hosts(self.connection.brokers.clone())
37            .with_ack_timeout(Duration::from_secs(1))
38            .with_required_acks(RequiredAcks::One)
39            .create();
40        match producer {
41            Ok(mut e) => {
42                let res = {
43                    if key.is_empty() {
44                        e.send(&Record::from_value(topic, data.to_string()))
45                    } else {
46                        e.send(&Record {
47                            topic,
48                            partition: -1,
49                            key,
50                            value: data.to_string().as_bytes(),
51                        })
52                    }
53                };
54                match res {
55                    Ok(_e) => true,
56                    Err(e) => {
57                        if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
58                            info!("获取消息尺寸过大");
59                            return false;
60                        }
61                        info!("kafka 发送 主题[{}] 失败:{:?}", topic, e);
62                        false
63                    }
64                }
65            }
66            Err(_e) => {
67                info!("kafka 消息队列连接失败:{:?}", _e);
68                false
69            }
70        }
71    }
72    /// 消费
73    pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
74        let cons = Consumer::from_hosts(self.connection.brokers.clone())
75            .with_topic(topic.to_string())
76            .with_group(topic.to_string())
77            .with_fallback_offset(FetchOffset::Earliest)
78            .with_offset_storage(Some(GroupOffsetStorage::Kafka))
79            .with_fetch_max_bytes_per_partition(self.connection.fetch_max)
80            .with_retry_max_bytes_limit(self.connection.retry_max)
81            .create();
82        match cons {
83            Ok(mut con) => match con.poll() {
84                Ok(ee) => {
85                    let mut list = array![];
86                    for ms in ee.iter() {
87                        for m in ms.messages() {
88                            let str = String::from_utf8(m.value.to_vec()).unwrap();
89
90                            let res = match json::parse(str.as_str()) {
91                                Ok(e) => e,
92                                Err(_e) => JsonValue::from(str.clone()),
93                            };
94                            let key_value = String::from_utf8(m.key.to_vec()).unwrap();
95
96                            match only {
97                                true => {
98                                    if key_value.as_str() == key {
99                                        con.consume_message(ms.topic(), ms.partition(), m.offset)
100                                            .unwrap();
101                                        con.commit_consumed().unwrap();
102                                        return res;
103                                    }
104                                }
105                                false => {
106                                    if key == key_value.as_str() {
107                                        list.push(res.clone()).expect("加入失败");
108                                        con.consume_message(ms.topic(), ms.partition(), m.offset)
109                                            .unwrap();
110                                        con.commit_consumed().unwrap();
111                                    }
112                                }
113                            }
114                        }
115                    }
116                    if only {
117                        JsonValue::String("".to_string())
118                    } else {
119                        list
120                    }
121                }
122                Err(e) => {
123                    if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
124                        error!("kafka:获取消息尺寸过大");
125                        return JsonValue::String("".to_string());
126                    }
127                    error!("消费失败{:?}", e);
128                    JsonValue::String("".to_string())
129                }
130            },
131            Err(e) => {
132                error!("kafka 消息队列连接失败:{:?}", e);
133                JsonValue::String("".to_string())
134            }
135        }
136    }
137    /// 消费 监听消息不退出
138    pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
139        let mut con = Consumer::from_hosts(self.connection.brokers.clone())
140            .with_topic(topic.to_string())
141            .with_group(topic.to_string())
142            .with_fallback_offset(FetchOffset::Earliest)
143            .with_offset_storage(Some(GroupOffsetStorage::Kafka))
144            .create()
145            .unwrap();
146
147        loop {
148            let mss = con.poll().unwrap();
149            if mss.is_empty() {
150                thread::sleep(Duration::from_micros(100));
151                continue;
152            }
153            for ms in mss.iter() {
154                for m in ms.messages() {
155                    let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
156                    if xf {
157                        con.consume_message(ms.topic(), ms.partition(), m.offset)
158                            .unwrap();
159                    }
160                }
161            }
162            con.commit_consumed().unwrap();
163        }
164    }
165}
166
167#[derive(Clone, Debug, Deserialize, Serialize)]
168pub struct Config {
169    default: String,
170    connections: BTreeMap<String, Connection>,
171}
172
173impl Default for Config {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179impl Config {
180    pub fn create(config_file: PathBuf) -> Config {
181        let data = Config::new();
182        match fs::read_to_string(config_file.clone()) {
183            Ok(e) => toml::from_str::<Config>(&e).unwrap_or_else(|_| {
184                if e.contains(format!("#{}", env!("CARGO_PKG_NAME")).as_str()) {
185                    return data;
186                }
187                let toml = toml::to_string(&data).unwrap();
188                let toml = format!("\r\n#{}\r\n{}{}", env!("CARGO_PKG_NAME"), toml, e);
189                let _ = fs::write(config_file.to_str().unwrap(), toml);
190                data
191            }),
192            Err(_) => {
193                let toml = toml::to_string(&data).unwrap();
194                let _ = fs::write(config_file.to_str().unwrap(), toml);
195                data
196            }
197        }
198    }
199
200    pub fn new() -> Self {
201        let mut connections = BTreeMap::new();
202        connections.insert("my_name".to_string(), Connection::default());
203        Self {
204            default: "my_name".to_string(),
205            connections,
206        }
207    }
208}
209
210#[derive(Clone, Debug, Deserialize, Serialize)]
211struct Connection {
212    brokers: Vec<String>,
213    fetch_max: i32,
214    retry_max: i32,
215}
216impl Connection {
217    pub fn default() -> Connection {
218        Self {
219            brokers: vec![],
220            fetch_max: 0,
221            retry_max: 0,
222        }
223    }
224}