skafka 0.1.2

A simple kafka wrapper for rdkafka
Documentation
use crate::Config;
use crate::config::Ack;
use rdkafka::message::ToBytes;
use std::collections::HashMap;
use std::time::Duration;

#[derive(Clone)]
pub struct Producer {
    inner: rdkafka::producer::FutureProducer,
}

/// 生产者
///
/// 发送数据可能会导致不存在的topic被自动创建
impl Producer {
    pub fn build(config: &Config) -> Result<Producer, rdkafka::error::KafkaError> {
        let producer: rdkafka::producer::FutureProducer = config.inner.create()?;
        Ok(Self { inner: producer })
    }

    /// 使用高可靠性构建
    pub fn build_with_reliability(config: &Config) -> Result<Producer, rdkafka::error::KafkaError> {
        let mut config = config.clone();
        // 开启幂等,及高级确认
        config.set_idempotence(true).set_ack(Ack::All);
        Self::build(&config)
    }

    pub fn producer(&self) -> &rdkafka::producer::FutureProducer {
        &self.inner
    }

    /// 发送一条记录
    /// @timeout: none表示永不超时,如果队列满了会一直等待
    pub async fn simple_send<'a, P: ToBytes + ?Sized>(
        &self,
        topic: &'a str,
        payload: &'a P,
        timeout: Option<Duration>,
    ) -> Result<i64, rdkafka::error::KafkaError> {
        self.simple_send_with_key(topic, "", payload, timeout).await
    }

    /// 发送一条记录
    /// @timeout: none表示永不超时,如果队列满了会一直等待
    pub async fn simple_send_with_key<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized>(
        &self,
        topic: &'a str,
        key: &'a K,
        payload: &'a P,
        timeout: Option<Duration>,
    ) -> Result<i64, rdkafka::error::KafkaError> {
        let header: Option<HashMap<&'a str, &'a str>> = None;
        self.simple_send_with_header(topic, key, payload, header, timeout)
            .await
    }

    /// 发送一条记录
    /// @header: 带头部
    /// @timeout: none表示永不超时,如果队列满了会一直等待
    pub async fn simple_send_with_header<
        'a,
        K: ToBytes + ?Sized,
        P: ToBytes + ?Sized,
        V: ToBytes + ?Sized,
    >(
        &self,
        topic: &'a str,
        key: &'a K,
        payload: &'a P,
        header: Option<HashMap<&'a str, &V>>,
        timeout: Option<Duration>,
    ) -> Result<i64, rdkafka::error::KafkaError> {
        let mut record = rdkafka::producer::FutureRecord::to(topic)
            .key(key)
            .payload(payload);
        if let Some(h) = header {
            let mut header = rdkafka::message::OwnedHeaders::new();
            for (k, v) in h {
                header = header.insert(rdkafka::message::Header {
                    key: k,
                    value: Some(v),
                });
            }
            record = record.headers(header);
        }
        match self.inner.send(record, timeout).await {
            Ok(d) => Ok(d.offset),
            Err(e) => Err(e.0),
        }
    }
}