kinbox 0.1.3

A simple kafka inbox
Documentation
use crate::config::{ClientConfig, CommonConfig};
use crate::name::Name;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};

/// 创建选项
pub struct CreateOption {
    /// 分区数
    pub partitions: i32,
    /// 副本数
    pub replication: i32,
}

impl CreateOption {
    pub fn new(partitions: i32, replication: i32) -> Self {
        Self {
            partitions,
            replication,
        }
    }
}

impl Default for CreateOption {
    fn default() -> Self {
        Self::new(1, 1)
    }
}

/// 邮箱
pub struct Inbox {
    client: AdminClient<DefaultClientContext>,
}

impl Inbox {
    pub fn new(config: &ClientConfig) -> Result<Self, KafkaError> {
        let client = config.inner.create()?;
        Ok(Inbox { client })
    }

    pub fn new_with_brokers<V: AsRef<str>>(brokers: &[V]) -> Result<Self, KafkaError> {
        let mut c = ClientConfig::default();
        c.set_brokers(brokers);
        Inbox::new(&c)
    }

    /// 创建邮箱
    pub async fn create_inbox<S: AsRef<str>>(
        &self,
        name: S,
        opt: CreateOption,
    ) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let name = Name::new(name.as_ref());
        let mut topics = vec![];
        for n in name.names() {
            let partitions = if name.is_dead_letter(n) {
                1
            } else {
                opt.partitions
            };
            topics.push(NewTopic::new(
                n,
                partitions,
                TopicReplication::Fixed(opt.replication),
            ));
        }
        println!("topics:{:?}", topics);
        Ok(check_topic_result(
            self.client
                .create_topics(&topics[..], &AdminOptions::new())
                .await?,
            RDKafkaErrorCode::TopicAlreadyExists,
            topics.len(),
        ))
    }

    /// 删除邮箱
    pub async fn delete_inbox<S: AsRef<str>>(
        &self,
        name: S,
    ) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let name = Name::new(name.as_ref());
        let names: Vec<&str> = name.names().iter().map(|n| n as &str).collect();
        Ok(check_topic_result(
            self.client
                .delete_topics(&names, &AdminOptions::new())
                .await?,
            RDKafkaErrorCode::UnknownTopicOrPartition,
            names.len(),
        ))
    }

    /// 调整邮箱
    pub async fn adjust_inbox<S: AsRef<str>>(
        &self,
        name: S,
        partitions: i32,
    ) -> Result<Result<(), RDKafkaErrorCode>, KafkaError> {
        let name = Name::new(name.as_ref());
        let mut parts = vec![];
        for n in name.names() {
            if name.is_dead_letter(n) {
                continue;
            }
            parts.push(rdkafka::admin::NewPartitions::new(n, partitions as usize));
        }
        self.client
            .create_partitions(&parts, &AdminOptions::new())
            .await
            .map(|r| match r[0] {
                Ok(_) => Ok(()),
                Err(ref t) => Err(t.1),
            })
    }
}

/// 检查返回结果
fn check_topic_result(
    v: Vec<rdkafka::admin::TopicResult>,
    not: RDKafkaErrorCode,
    count: usize,
) -> Result<(), RDKafkaErrorCode> {
    assert_eq!(v.len(), count);
    for r in v.iter() {
        if let Err(t) = r {
            if t.1 != not {
                return Err(t.1);
            }
        }
    }
    Ok(())
}