skafka 0.1.3

A simple kafka wrapper for rdkafka
Documentation
use std::collections::HashMap;

use crate::Config;
use crate::config::OffsetReset;
pub struct Consumer {
    inner: rdkafka::consumer::StreamConsumer,
}

/// 当consumer处于PREPARING_REBALANCE状态时不会拉取数据
impl Consumer {
    pub fn build(config: &Config) -> Result<Consumer, rdkafka::error::KafkaError> {
        let consumer: rdkafka::consumer::StreamConsumer = config.inner.create()?;
        Ok(Consumer { inner: consumer })
    }

    pub fn consumer(&self) -> &rdkafka::consumer::StreamConsumer {
        &self.inner
    }

    /// 默认的参数
    ///
    /// @enable.partition.eof
    ///
    /// @enable.auto.commit
    pub fn build_with_default(config: &Config) -> Result<Consumer, rdkafka::error::KafkaError> {
        let mut config = config.clone();
        config
            .set_partition_eof(true)
            .set_auto_commit(true)
            .set_offset_reset(OffsetReset::Earliest);
        Self::build(&config)
    }

    pub fn simple_subscribe(&self, topic: &str) -> Result<(), rdkafka::error::KafkaError> {
        self.subscribe(&[topic])
    }

    /// 订阅一些topic
    pub fn subscribe(&self, topics: &[&str]) -> Result<(), rdkafka::error::KafkaError> {
        rdkafka::consumer::Consumer::subscribe(&self.inner, topics)
    }

    pub fn unsubscribe(&self) {
        rdkafka::consumer::Consumer::unsubscribe(&self.inner)
    }

    /// 订阅一批topics,可以指定分区
    pub fn simple_assign(
        &self,
        topics: HashMap<&str, i32>,
    ) -> Result<(), rdkafka::error::KafkaError> {
        let mut tpl = rdkafka::TopicPartitionList::new();
        for (k, v) in topics {
            tpl.add_partition(k, v);
        }
        rdkafka::consumer::Consumer::assign(&self.inner, &tpl)
    }

    pub async fn recv(
        &self,
    ) -> Result<rdkafka::message::BorrowedMessage<'_>, rdkafka::error::KafkaError> {
        self.inner.recv().await
    }
}