skafka 0.1.3

A simple kafka wrapper for rdkafka
Documentation
use crate::config::Config;

pub struct Client {
    client: rdkafka::admin::AdminClient<rdkafka::client::DefaultClientContext>,
}

impl Client {
    pub fn build(config: &Config) -> Result<Client, rdkafka::error::KafkaError> {
        let client: rdkafka::admin::AdminClient<rdkafka::client::DefaultClientContext> =
            config.inner.create()?;
        Ok(Self { client })
    }

    /// 需要复杂操作,可以使用原生client
    pub fn client(&self) -> &rdkafka::admin::AdminClient<rdkafka::client::DefaultClientContext> {
        &self.client
    }

    /// 创建topic
    /// @num_partitions: 分区数量
    /// @fixed_replication: 每个分区的固定副本数量
    pub async fn simple_create_topic(
        &self,
        topic: &str,
        num_partitions: i32,
        fixed_replication: i32,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        let topic = rdkafka::admin::NewTopic::new(
            topic,
            num_partitions,
            rdkafka::admin::TopicReplication::Fixed(fixed_replication),
        );
        self.create_topic(topic, &rdkafka::admin::AdminOptions::new())
            .await
    }

    /// 创建topic
    /// @NewTopic: 设置retention.ms属性可控制消息的保留时间
    pub async fn create_topic<'a>(
        &self,
        topic: rdkafka::admin::NewTopic<'a>,
        opts: &rdkafka::admin::AdminOptions,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        self.client
            .create_topics(&[topic], opts)
            .await
            .map(|r| match r[0] {
                Ok(_) => Ok(()),
                Err(ref t) => Err(t.1),
            })
    }

    /// 删除topic
    pub async fn simple_delete_topic(
        &self,
        topic: &str,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        self.delete_topic(topic, &rdkafka::admin::AdminOptions::new())
            .await
    }

    pub async fn delete_topic(
        &self,
        topic: &str,
        opts: &rdkafka::admin::AdminOptions,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        self.client
            .delete_topics(&[topic], opts)
            .await
            .map(|r| match r[0] {
                Ok(_) => Ok(()),
                Err(ref t) => Err(t.1),
            })
    }

    /// 删除消费组
    pub async fn simple_delete_group(
        &self,
        group_name: &str,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        self.delete_group(group_name, &rdkafka::admin::AdminOptions::new())
            .await
    }

    pub async fn delete_group(
        &self,
        group_name: &str,
        opts: &rdkafka::admin::AdminOptions,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        self.client
            .delete_groups(&[group_name], opts)
            .await
            .map(|r| match r[0] {
                Ok(_) => Ok(()),
                Err(ref t) => Err(t.1),
            })
    }

    /// 增加分区
    /// @new_partition_count: 改变后的总分区数, 分区数只能是变多
    pub async fn simple_create_partition(
        &self,
        topic: &str,
        new_partition_count: usize,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        let p = rdkafka::admin::NewPartitions::new(topic, new_partition_count);
        self.create_partition(p, &rdkafka::admin::AdminOptions::new())
            .await
    }

    pub async fn create_partition<'a>(
        &self,
        partition: rdkafka::admin::NewPartitions<'a>,
        opts: &rdkafka::admin::AdminOptions,
    ) -> Result<Result<(), rdkafka::error::RDKafkaErrorCode>, rdkafka::error::KafkaError> {
        self.client
            .create_partitions(&[partition], opts)
            .await
            .map(|r| match r[0] {
                Ok(_) => Ok(()),
                Err(ref t) => Err(t.1),
            })
    }

    /// 删除topic分区的消息记录
    pub async fn simple_delete_record(
        &self,
        topic: &str,
        partition_id: i32,
        offset: i64,
    ) -> Result<Result<(), rdkafka::error::KafkaError>, rdkafka::error::KafkaError> {
        let mut tpl = rdkafka::TopicPartitionList::new();
        tpl.add_partition_offset(topic, partition_id, rdkafka::Offset::Offset(offset))?;

        self.client
            .delete_records(&tpl, &rdkafka::admin::AdminOptions::new())
            .await
            .map(|r| r.elements()[0].error())
    }
}