kinbox 0.1.3

A simple kafka inbox
Documentation
use crate::config::{CommonConfig, WriteConfig};
use rdkafka::error::KafkaError;
use rdkafka::message::ToBytes;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;
use std::time::Duration;

/// 写邮箱
#[derive(Clone)]
pub struct Writer {
    producer: rdkafka::producer::FutureProducer,
}

impl Writer {
    pub fn new(cfg: &WriteConfig) -> Result<Self, KafkaError> {
        let producer: rdkafka::producer::FutureProducer = cfg.inner.create()?;
        Ok(Writer { producer })
    }

    pub fn new_with_brokers<V: AsRef<str>>(brokers: &[V]) -> Result<Self, KafkaError> {
        let mut c = WriteConfig::default();
        c.set_brokers(brokers);
        Writer::new(&c)
    }

    /// 写邮件
    pub async fn write<K, P>(&self, record: FutureRecord<'_, K, P>) -> Result<(), KafkaError>
    where
        K: ToBytes + ?Sized,
        P: ToBytes + ?Sized,
    {
        self.producer
            .send(record, Duration::from_secs(3))
            .await
            .map(|_| ())
            .map_err(|e| e.0)
    }

    /// 写邮件带超时
    pub async fn write_timeout<K, P, T>(
        &self,
        record: FutureRecord<'_, K, P>,
        queue_timeout: T,
    ) -> Result<(), KafkaError>
    where
        K: ToBytes + ?Sized,
        P: ToBytes + ?Sized,
        T: Into<Timeout>,
    {
        self.producer
            .send(record, queue_timeout)
            .await
            .map(|_| ())
            .map_err(|e| e.0)
    }
}