df-helper 0.2.26

df helper tools db cache
Documentation
use kafka::producer::{Producer, Record, RequiredAcks};
use std::time::Duration;

use json::{array, JsonValue};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use crate::datetime::timer::Timer;

pub struct Kafka {
    default: String,
    connections: JsonValue,
    brokers: Vec<String>,
    fetch_max: i32,
    retry_max: i32,
}

impl Kafka {
    /// 链接
    ///
    /// * config 配置
    /// ````json
    /// {
    ///   "default": "mac",
    ///   "connections": {
    ///     "mac": {
    ///       "brokers": [
    ///         "0.0.0.0:9092"
    ///       ],
    ///       "fetch_max": 900000000,
    ///       "retry_max": 900000000
    ///     }
    ///   }
    /// }
    /// ````
    pub fn connect(config: JsonValue) -> Self {
        let mut brokers = Vec::new();
        let default = config["default"].to_string();
        let connections = config["connections"].clone();
        let connection = connections[default.clone()].clone();
        let fetch_max = connection["fetch_max"].as_i32().unwrap();
        let retry_max = connection["retry_max"].as_i32().unwrap();
        for item in connection["brokers"].members() {
            brokers.push(item.to_string().clone());
        }

        Self {
            default,
            connections,
            brokers,
            fetch_max,
            retry_max,
        }
    }
    /// 增加配置
    pub fn add_config(&mut self, connection: JsonValue) -> &mut Self {
        self.connections.push(connection).expect("增加配置 失败");
        self
    }
    /// 切换配置
    pub fn switch_config(&mut self, default: &str) -> &mut Self {
        self.default = default.to_string();
        let connection = self.connections[self.default.clone()].clone();
        self.brokers = Vec::new();
        for item in connection["brokers"].members() {
            self.brokers.push(item.to_string().clone());
        }
        self.fetch_max = connection["fetch_max"].as_i32().unwrap();
        self.retry_max = connection["retry_max"].as_i32().unwrap();
        self
    }

    /// 生产
    pub fn set(&self, topic: &str, key: &str, data: JsonValue) -> bool {
        let producer = Producer::from_hosts(self.brokers.clone())
            .with_ack_timeout(Duration::from_secs(1))
            .with_required_acks(RequiredAcks::One)
            .create();
        return match producer {
            Ok(mut e) => {
                let res = {
                    if key == "" {
                        e.send(&Record::from_value(topic.clone(), data.to_string()))
                    } else {
                        e.send(&Record {
                            topic,
                            partition: -1,
                            key,
                            value: data.to_string().as_bytes(),
                        })
                    }
                };
                match res {
                    Ok(_e) => {
                        true
                    }
                    Err(e) => {
                        if e.to_string() == "Kafka Error (MessageSizeTooLarge)".to_string() {
                            println!("获取消息尺寸过大");
                        }
                        println!("kafka 发送 主题[{}] 失败:{:?}", topic.clone(), e);
                        false
                    }
                }
            }
            Err(_e) => {
                println!("kafka 消息队列连接失败:{:?}", _e);
                false
            }
        };
    }
    /// 消费
    pub fn get(&self, topic: &str, key: &str, only: bool) -> JsonValue {
        let cons = Consumer::from_hosts(self.brokers.clone())
            .with_topic(topic.to_string())
            .with_group(topic.to_string())
            .with_fallback_offset(FetchOffset::Earliest)
            .with_offset_storage(GroupOffsetStorage::Kafka)
            .with_fetch_max_bytes_per_partition(self.fetch_max)
            .with_retry_max_bytes_limit(self.retry_max)
            .create();
        match cons {
            Ok(mut con) => {
                loop {
                    let mss = con.poll();
                    match mss {
                        Ok(_ee) => {
                            let mut list = array![];
                            for ms in _ee.iter() {
                                for m in ms.messages() {
                                    let str = String::from_utf8(m.value.to_vec()).unwrap();
                                    let res = {
                                        match json::parse(str.as_str().clone()) {
                                            Ok(e) => {
                                                e
                                            }
                                            Err(_e) => {
                                                JsonValue::String(str)
                                            }
                                        }
                                    };
                                    let key_value = String::from_utf8(m.key.to_vec()).unwrap();
                                    if only {
                                        if key_value.as_str() == key {
                                            con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
                                            con.commit_consumed().unwrap();
                                            return res;
                                        }
                                    } else {
                                        if key == key_value.as_str() {
                                            list.push(res.clone()).expect("加入失败");
                                            con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
                                            con.commit_consumed().unwrap();
                                        }
                                    }
                                }
                            }
                            if only {
                                return JsonValue::String("".to_string());
                            } else {
                                return list;
                            }
                        }
                        Err(_e) => {
                            if _e.to_string() == "Kafka Error (MessageSizeTooLarge)".to_string() {
                                println!("kafka:获取消息尺寸过大");
                                return JsonValue::String("".to_string());
                            }
                            println!("消费失败{:?}", _e);
                            return JsonValue::String("".to_string());
                        }
                    }
                }
            }
            Err(e) => {
                println!("{:?}", e);
                println!("kafka 消息队列连接失败:{:?}", e);
                return JsonValue::String("".to_string());
            }
        }
    }

    /// 消费 监听消息不退出
    pub fn get_client(&self, topic: &str, fun: fn(str: String) -> bool) {
        let mut con = Consumer::from_hosts(self.brokers.clone())
            .with_topic(topic.to_string())
            .with_group(topic.to_string())
            .with_fallback_offset(FetchOffset::Earliest)
            .with_offset_storage(GroupOffsetStorage::Kafka)
            .create().unwrap();

        loop {
            let mss = con.poll().unwrap();
            if mss.is_empty() {
                Timer::sleep(100);
                continue;
            }
            for ms in mss.iter() {
                for m in ms.messages() {
                    let xf = fun(String::from_utf8(m.value.to_vec()).unwrap());
                    if xf {
                        con.consume_message(ms.topic(), ms.partition(), m.offset).unwrap();
                    }
                }
            }
            con.commit_consumed().unwrap();
        }
    }
}