br_kafka/
lib.rs

1use kafka::producer::{Producer, Record, RequiredAcks};
2use std::collections::BTreeMap;
3use std::path::PathBuf;
4use std::time::Duration;
5use std::{fs, thread};
6
7use json::{array, JsonValue};
8use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
9use log::{error, info};
10use serde::{Deserialize, Serialize};
11
12#[derive(Clone)]
13pub struct Kafka {
14    connection: Connection,
15}
16impl Kafka {
17    pub fn new(config: Config) -> Result<Self, String> {
18        let connection = config.connections.get(&*config.default).unwrap().clone();
19        Ok(Self { connection })
20    }
21    /// 获取全部主题
22    pub fn get_topics(&mut self) -> Vec<String> {
23        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
24        client.load_metadata_all().unwrap();
25        let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
26        topics
27    }
28    /// 创建主题
29    pub fn create_topic(&mut self, topic: &str) -> bool {
30        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
31        client.load_metadata_all().unwrap();
32        client.load_metadata(&[topic]).is_ok()
33    }
34    /// 生产
35    pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
36        let producer = Producer::from_hosts(self.connection.brokers.clone()).with_ack_timeout(Duration::from_secs(1)).with_required_acks(RequiredAcks::One).create();
37        match producer {
38            Ok(mut e) => {
39                let res = {
40                    if key.is_empty() {
41                        e.send(&Record::from_value(topic, data.to_string()))
42                    } else {
43                        e.send(&Record {
44                            topic,
45                            partition: -1,
46                            key,
47                            value: data.to_string().as_bytes(),
48                        })
49                    }
50                };
51                match res {
52                    Ok(_e) => true,
53                    Err(e) => {
54                        if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
55                            info!("获取消息尺寸过大");
56                            return false;
57                        }
58                        info!("kafka 发送 主题[{}] 失败:{:?}", topic, e);
59                        false
60                    }
61                }
62            }
63            Err(_e) => {
64                info!("kafka 消息队列连接失败:{:?}", _e);
65                false
66            }
67        }
68    }
69    /// 消费
70    pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
71        let cons = Consumer::from_hosts(self.connection.brokers.clone()).with_topic(topic.to_string()).with_group(topic.to_string()).with_fallback_offset(FetchOffset::Earliest).with_offset_storage(Some(GroupOffsetStorage::Kafka)).with_fetch_max_bytes_per_partition(self.connection.fetch_max).with_retry_max_bytes_limit(self.connection.retry_max).create();
72        match cons {
73            Ok(mut con) => match con.poll() {
74                Ok(ee) => {
75                    let mut list = array![];
76                    for ms in ee.iter() {
77                        for m in ms.messages() {
78                            let str = String::from_utf8(m.value.to_vec()).unwrap();
79
80                            let res = match json::parse(str.as_str()) {
81                                Ok(e) => e,
82                                Err(_e) => JsonValue::from(str.clone()),
83                            };
84                            let key_value = String::from_utf8(m.key.to_vec()).unwrap();
85
86                            match only {
87                                true => {
88                                    if key_value.as_str() == key {
89                                        con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
90                                        con.commit_consumed().unwrap();
91                                        return res;
92                                    }
93                                }
94                                false => {
95                                    if key == key_value.as_str() {
96                                        list.push(res.clone()).expect("加入失败");
97                                        con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
98                                        con.commit_consumed().unwrap();
99                                    }
100                                }
101                            }
102                        }
103                    }
104                    if only {
105                        JsonValue::String("".to_string())
106                    } else {
107                        list
108                    }
109                }
110                Err(e) => {
111                    if e.to_string() == *"Kafka Error (MessageSizeTooLarge)" {
112                        error!("kafka:获取消息尺寸过大");
113                        return JsonValue::String("".to_string());
114                    }
115                    error!("消费失败{:?}", e);
116                    JsonValue::String("".to_string())
117                }
118            },
119            Err(e) => {
120                error!("kafka 消息队列连接失败:{:?}", e);
121                JsonValue::String("".to_string())
122            }
123        }
124    }
125    /// 消费 监听消息不退出
126    pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
127        let mut con = Consumer::from_hosts(self.connection.brokers.clone()).with_topic(topic.to_string()).with_group(topic.to_string()).with_fallback_offset(FetchOffset::Earliest).with_offset_storage(Some(GroupOffsetStorage::Kafka)).create().unwrap();
128
129        loop {
130            let mss = con.poll().unwrap();
131            if mss.is_empty() {
132                thread::sleep(Duration::from_micros(100));
133                continue;
134            }
135            for ms in mss.iter() {
136                for m in ms.messages() {
137                    let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
138                    if xf {
139                        con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
140                    }
141                }
142            }
143            con.commit_consumed().unwrap();
144        }
145    }
146}
147
148#[derive(Clone, Debug, Deserialize, Serialize)]
149pub struct Config {
150    default: String,
151    connections: BTreeMap<String, Connection>,
152}
153
154impl Default for Config {
155    fn default() -> Self {
156        Self::new()
157    }
158}
159
160impl Config {
161    pub fn create(config_file: PathBuf, pkg_name: bool) -> Config {
162        #[derive(Clone, Debug, Deserialize, Serialize)]
163        pub struct BrKafka {
164            pub br_kafka: Config,
165        }
166        impl BrKafka {
167            pub fn new() -> BrKafka {
168                let mut connections = BTreeMap::new();
169                connections.insert("my_name".to_string(), Connection::default());
170                Self {
171                    br_kafka: Config {
172                        default: "my_name".to_string(),
173                        connections,
174                    },
175                }
176            }
177        }
178        match fs::read_to_string(config_file.clone()) {
179            Ok(e) => {
180                if pkg_name {
181                    let data = BrKafka::new();
182                    toml::from_str::<BrKafka>(&e).unwrap_or_else(|_| {
183                        let toml = toml::to_string(&data).unwrap();
184                        let toml = format!("{}\r\n{}", e, toml);
185                        let _ = fs::write(config_file.to_str().unwrap(), toml);
186                        data
187                    }).br_kafka
188                } else {
189                    Config::new()
190                }
191            }
192            Err(_) => {
193                if pkg_name {
194                    let data = BrKafka::new();
195                    fs::create_dir_all(config_file.parent().unwrap()).unwrap();
196                    let toml = toml::to_string(&data).unwrap();
197                    let _ = fs::write(config_file.to_str().unwrap(), toml);
198                    data.br_kafka
199                } else {
200                    let data = Config::new();
201                    fs::create_dir_all(config_file.parent().unwrap()).unwrap();
202                    let toml = toml::to_string(&data).unwrap();
203                    let _ = fs::write(config_file.to_str().unwrap(), toml);
204                    data
205                }
206            }
207        }
208    }
209
210    pub fn new() -> Self {
211        let mut connections = BTreeMap::new();
212        connections.insert("my_name".to_string(), Connection::default());
213        Self {
214            default: "my_name".to_string(),
215            connections,
216        }
217    }
218}
219
220#[derive(Clone, Debug, Deserialize, Serialize)]
221struct Connection {
222    brokers: Vec<String>,
223    fetch_max: i32,
224    retry_max: i32,
225}
226impl Connection {
227    pub fn default() -> Connection {
228        Self {
229            brokers: vec![],
230            fetch_max: 0,
231            retry_max: 0,
232        }
233    }
234}