kinbox 0.1.3

A simple kafka inbox
Documentation
use crate::config::{ClientConfig, CommonConfig};
use crate::name::Name;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaError;
use std::collections::HashMap;
use std::time::Duration;

/// 元数据
#[derive(Debug, Clone)]
pub struct Meta {
    /// 具体的名字
    pub name: String,
    /// 分区id, 副本数
    pub partitions: Vec<(i32, usize)>,
}

impl Meta {
    pub fn new() -> Self {
        Self {
            name: String::new(),
            partitions: vec![],
        }
    }
}

/// 列表
pub struct List {
    inner: BaseConsumer,
}

impl List {
    pub fn new(cfg: &ClientConfig) -> Result<Self, KafkaError> {
        let consumer: BaseConsumer = cfg.inner.create()?;
        Ok(List { inner: consumer })
    }

    pub fn new_with_brokers<V: AsRef<str>>(brokers: &[V]) -> Result<Self, KafkaError> {
        let mut c = ClientConfig::default();
        c.set_brokers(brokers);
        List::new(&c)
    }

    /// 获取邮箱列表
    pub fn list_inboxes(&self) -> Result<Vec<Meta>, KafkaError> {
        let result = self.inner.fetch_metadata(None, Duration::from_secs(5))?;
        let mut metas = vec![];
        for topic in result.topics() {
            let mut partitions = vec![];
            for p in topic.partitions() {
                partitions.push((p.id(), p.replicas().len()));
            }
            metas.push(Meta {
                name: topic.name().to_string(),
                partitions,
            })
        }
        Ok(metas)
    }

    /// 聚合邮箱列表
    pub fn aggregate_inboxes<S: AsRef<str>>(
        &self,
        names: &[S],
    ) -> Result<HashMap<String, Vec<Meta>>, KafkaError> {
        let mut result = HashMap::new();
        if names.is_empty() {
            return Ok(result);
        }

        // 构建名字映射
        let mut name_map = HashMap::new();
        for name in names {
            let name = Name::new(name.as_ref());
            for n in name.names() {
                name_map.insert(n.to_string(), name.name.clone());
            }
            result.insert(name.name.clone(), vec![]);
        }

        // 读取数据
        let metas = self.list_inboxes()?;
        for meta in metas {
            if let Some(original_name) = name_map.get(&meta.name) {
                if let Some(entry) = result.get_mut(original_name) {
                    entry.push(meta);
                }
            }
        }

        Ok(result)
    }
}