br-cache 0.3.22

This is an Cache and Message Queue abstraction layer
Documentation
use crate::config::KafkaConnection;
use json::{array, JsonValue};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use kafka::producer::{Producer, Record, RequiredAcks};
use std::sync::mpsc::Sender;
use std::thread;
use std::time::Duration;

#[derive(Clone)]
pub struct Kafka {
    connection: KafkaConnection,
}

impl Kafka {
    pub fn connect(connection: KafkaConnection) -> Result<Self, String> {
        if connection.brokers.is_empty() {
            return Err("Kafka brokers 未配置".to_string());
        }
        Ok(Self { connection })
    }

    pub fn ping(&self) -> Result<bool, String> {
        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
        client
            .load_metadata_all()
            .map(|_| true)
            .map_err(|e| format!("Kafka ping failed: {}", e))
    }

    pub fn get_topics(&self) -> Result<Vec<String>, String> {
        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
        client
            .load_metadata_all()
            .map_err(|e| format!("加载 Kafka 元数据失败: {}", e))?;
        Ok(client.topics().names().map(ToOwned::to_owned).collect())
    }

    pub fn create_topic(&self, topic: &str) -> Result<bool, String> {
        let mut client = kafka::client::KafkaClient::new(self.connection.brokers.clone());
        client
            .load_metadata_all()
            .map_err(|e| format!("加载 Kafka 元数据失败: {}", e))?;
        client
            .load_metadata(&[topic])
            .map(|_| true)
            .map_err(|e| format!("创建主题失败: {}", e))
    }

    pub fn publish(&self, topic: &str, value: JsonValue) -> Result<bool, String> {
        self.publish_with_key(topic, "", value)
    }

    pub fn publish_with_key(
        &self,
        topic: &str,
        key: &str,
        data: JsonValue,
    ) -> Result<bool, String> {
        let mut producer = Producer::from_hosts(self.connection.brokers.clone())
            .with_ack_timeout(Duration::from_secs(1))
            .with_required_acks(RequiredAcks::One)
            .create()
            .map_err(|e| format!("Kafka 连接失败: {}", e))?;

        let result = if key.is_empty() {
            producer.send(&Record::from_value(topic, data.to_string()))
        } else {
            producer.send(&Record {
                topic,
                partition: -1,
                key,
                value: data.to_string().as_bytes(),
            })
        };

        result.map(|_| true).map_err(|e| {
            if e.to_string().contains("MessageSizeTooLarge") {
                "消息尺寸过大".to_string()
            } else {
                format!("Kafka 发送失败: {}", e)
            }
        })
    }

    pub fn consume(&self, topic: &str, key: &str, only_one: bool) -> Result<JsonValue, String> {
        let mut consumer = Consumer::from_hosts(self.connection.brokers.clone())
            .with_topic(topic.to_string())
            .with_group(topic.to_string())
            .with_fallback_offset(FetchOffset::Earliest)
            .with_offset_storage(Some(GroupOffsetStorage::Kafka))
            .with_fetch_max_bytes_per_partition(self.connection.fetch_max)
            .with_retry_max_bytes_limit(self.connection.retry_max)
            .create()
            .map_err(|e| format!("Kafka 消费者创建失败: {}", e))?;

        let messages = consumer
            .poll()
            .map_err(|e| format!("Kafka 消费失败: {}", e))?;

        let mut list = array![];
        for ms in messages.iter() {
            for m in ms.messages() {
                let str_value = String::from_utf8(m.value.to_vec())
                    .map_err(|e| format!("消息解码失败: {}", e))?;
                let json_value =
                    json::parse(&str_value).unwrap_or_else(|_| JsonValue::from(str_value.clone()));
                let key_value = String::from_utf8(m.key.to_vec()).unwrap_or_default();

                if key.is_empty() || key == key_value {
                    if only_one {
                        let _ = consumer.consume_message(ms.topic(), ms.partition(), m.offset);
                        let _ = consumer.commit_consumed();
                        return Ok(json_value);
                    }
                    let _ = list.push(json_value);
                    let _ = consumer.consume_message(ms.topic(), ms.partition(), m.offset);
                }
            }
        }
        let _ = consumer.commit_consumed();

        if only_one {
            Ok(JsonValue::Null)
        } else {
            Ok(list)
        }
    }

    pub fn subscribe(&self, topic: &str, tx: Sender<JsonValue>) -> Result<(), String> {
        let brokers = self.connection.brokers.clone();
        let topic = topic.to_string();

        thread::spawn(move || {
            let consumer = Consumer::from_hosts(brokers)
                .with_topic(topic.clone())
                .with_group(topic.clone())
                .with_fallback_offset(FetchOffset::Earliest)
                .with_offset_storage(Some(GroupOffsetStorage::Kafka))
                .create();

            let mut con = match consumer {
                Ok(c) => c,
                Err(e) => {
                    log::error!("Kafka 订阅失败: {}", e);
                    return;
                }
            };

            loop {
                match con.poll() {
                    Ok(messages) => {
                        if messages.is_empty() {
                            thread::sleep(Duration::from_millis(100));
                            continue;
                        }
                        for ms in messages.iter() {
                            for m in ms.messages() {
                                if let Ok(str_value) = String::from_utf8(m.value.to_vec()) {
                                    let json_value = json::parse(&str_value)
                                        .unwrap_or_else(|_| JsonValue::from(str_value));
                                    if tx.send(json_value).is_err() {
                                        log::warn!("Kafka 订阅通道已关闭");
                                        return;
                                    }
                                }
                                let _ = con.consume_message(ms.topic(), ms.partition(), m.offset);
                            }
                        }
                        let _ = con.commit_consumed();
                    }
                    Err(e) => {
                        log::error!("Kafka 消费错误: {}", e);
                        thread::sleep(Duration::from_secs(1));
                    }
                }
            }
        });

        Ok(())
    }
}