kmailbox 0.1.2

A simple kafka mailbox
Documentation
use rdkafka::admin::AdminClient;
use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::producer::FutureProducer;
use rdkafka::{ClientConfig, TopicPartitionList};
use std::sync::{Arc, RwLock};

#[derive(Clone)]
pub struct Builder {
    brokers: Arc<Vec<String>>,
    inner: Arc<RwLock<Inner>>,
}

struct Inner {
    client: Option<Arc<AdminClient<DefaultClientContext>>>,
    producer: Option<FutureProducer>,
    socket_timeout: i32,
    request_timeout: i32,
}

impl Builder {
    pub fn new<V: AsRef<str>>(brokers: &[V]) -> Self {
        Builder {
            brokers: Arc::new(brokers.iter().map(|b| b.as_ref().to_string()).collect()),
            inner: Arc::new(RwLock::new(Inner {
                client: None,
                producer: None,
                socket_timeout: 10000,
                request_timeout: 10000,
            })),
        }
    }

    pub fn socket_timeout(&mut self, mut to: i32) -> i32 {
        let mut wg = self.inner.write().unwrap();
        std::mem::swap(&mut wg.socket_timeout, &mut to);
        to
    }

    pub fn request_timeout(&mut self, mut to: i32) -> i32 {
        let mut wg = self.inner.write().unwrap();
        std::mem::swap(&mut wg.request_timeout, &mut to);
        to
    }

    fn get_timeout(&self) -> (i32, i32) {
        let g = self.inner.read().unwrap();
        (g.socket_timeout, g.request_timeout)
    }

    fn get_client(&self) -> Result<Arc<AdminClient<DefaultClientContext>>, KafkaError> {
        // read first
        {
            let rg = self.inner.read().unwrap();
            if rg.client.is_some() {
                return Ok(rg.client.clone().unwrap());
            }
        }

        // write second
        {
            let mut wg = self.inner.write().unwrap();
            if wg.client.is_none() {
                let c: AdminClient<DefaultClientContext> = ClientConfig::new()
                    // 代理
                    .set(
                        "bootstrap.servers",
                        self.brokers
                            .iter()
                            .map(|v| v.as_ref())
                            .collect::<Vec<&str>>()
                            .join(","),
                    )
                    // 网络I/O超时时间
                    .set("socket.timeout.ms", wg.socket_timeout.to_string())
                    // 请求超时时间
                    .set("request.timeout.ms", wg.request_timeout.to_string())
                    .create()?;

                wg.client = Some(Arc::new(c));
            }
            Ok(wg.client.clone().unwrap())
        }
    }

    fn get_producer(&self) -> Result<FutureProducer, KafkaError> {
        // read first
        {
            let rg = self.inner.read().unwrap();
            if rg.producer.is_some() {
                return Ok(rg.producer.clone().unwrap());
            }
        }

        // write second
        {
            let mut wg = self.inner.write().unwrap();
            if wg.producer.is_none() {
                let p: FutureProducer = ClientConfig::new()
                    // 发送失败重试次数
                    .set("retries", "3")
                    // 代理
                    .set(
                        "bootstrap.servers",
                        self.brokers
                            .iter()
                            .map(|v| v.as_ref())
                            .collect::<Vec<&str>>()
                            .join(","),
                    )
                    // 网络I/O超时时间
                    .set("socket.timeout.ms", wg.socket_timeout.to_string())
                    // 请求超时时间
                    .set("request.timeout.ms", wg.request_timeout.to_string())
                    // 最高的可靠性确认机制
                    .set("acks", "all")
                    // 开启幂等
                    .set("enable.idempotence", "true")
                    // 生产者等待成功的最大时间
                    .set("delivery.timeout.ms", "3000")
                    // 单条消息最大字节数
                    .set("message.max.bytes", "62464")
                    .create()?;

                wg.producer = Some(p);
            }
            Ok(wg.producer.clone().unwrap())
        }
    }

    fn create_consumer(&self, group_id: &str) -> Result<StreamConsumer, KafkaError> {
        let to = self.get_timeout();
        let c: Result<StreamConsumer, KafkaError> = ClientConfig::new()
            // 消费组id
            .set("group.id", group_id)
            // 关闭自动提交
            .set("enable.auto.commit", "false")
            // 队列里没消息时收到eof
            .set("enable.partition.eof", "true")
            // 消费开始的offset
            .set("auto.offset.reset", "earliest")
            // 每次拉取最小字节数
            .set("fetch.min.bytes", "1")
            // 每次拉取最大字节数
            .set("fetch.max.bytes", "62464")
            // 拉取数据时最大的延迟时间
            .set("fetch.wait.max.ms", "500")
            // 单条消息最大字节数
            .set("message.max.bytes", "62464")
            // 代理
            .set(
                "bootstrap.servers",
                self.brokers
                    .iter()
                    .map(|v| v.as_ref())
                    .collect::<Vec<&str>>()
                    .join(","),
            )
            // 网络I/O超时时间
            .set("socket.timeout.ms", to.0.to_string())
            // 请求超时时间
            .set("request.timeout.ms", to.1.to_string())
            .create();
        c
    }

    /// 构建共享邮箱
    pub fn shared_mailbox<V: Into<String>>(
        &self,
        name: V,
        fixed_replication: i32,
    ) -> Result<crate::shared::Mailbox, KafkaError> {
        let client = self.get_client()?;
        Ok(crate::shared::Mailbox::new(
            client,
            name.into(),
            fixed_replication,
        ))
    }

    pub fn shared<V: Into<String>>(
        &self,
        name: V,
        fixed_replication: i32,
    ) -> Result<crate::shared::Mailbox, KafkaError> {
        self.shared_mailbox(name, fixed_replication)
    }

    /// 构建专享邮箱
    pub fn exclusive_mailbox<V: Into<String>>(
        &self,
        name: V,
        num_partitions: i32,
        fixed_replication: i32,
    ) -> Result<crate::exclusive::Mailbox, KafkaError> {
        let client = self.get_client()?;
        Ok(crate::exclusive::Mailbox::new(
            client,
            name.into(),
            num_partitions,
            fixed_replication,
        ))
    }

    pub fn exclusive<V: Into<String>>(
        &self,
        name: V,
        num_partitions: i32,
        fixed_replication: i32,
    ) -> Result<crate::exclusive::Mailbox, KafkaError> {
        self.exclusive_mailbox(name, num_partitions, fixed_replication)
    }

    /// 构建writer
    pub fn writer(&self) -> Result<crate::write::Writer, KafkaError> {
        let producer = self.get_producer()?;
        Ok(crate::write::Writer::new(producer))
    }

    /// 构建shard reader
    pub fn shared_reader<V: Into<String>>(
        &self,
        name: V,
    ) -> Result<crate::read::Reader, KafkaError> {
        let name = name.into();
        let dead_letter = crate::dead_letter(name.clone());
        let retry_name = crate::retry_mailbox(name.clone());

        let group1 = crate::group(name.clone());
        let group2 = crate::group(retry_name.clone());

        let consumer = self.create_consumer(&group1)?;
        let retry = self.create_consumer(&group2)?;
        let producer = self.get_producer()?;

        consumer.subscribe(&[&name])?;
        retry.subscribe(&[&retry_name])?;

        Ok(crate::read::Reader::new(
            name,
            dead_letter,
            retry_name,
            consumer,
            retry,
            producer,
        ))
    }

    /// 构建exclusive reader
    pub fn exclusive_reader<V: Into<String>>(
        &self,
        name: V,
        id: i32,
    ) -> Result<crate::read::Reader, KafkaError> {
        let name = name.into();
        let dead_letter = crate::dead_letter(name.clone());
        let retry_name = crate::retry_mailbox(name.clone());

        let group1 = crate::group_with_no(name.clone(), id);
        let group2 = crate::group_with_no(retry_name.clone(), id);

        let consumer = self.create_consumer(&group1)?;
        let retry = self.create_consumer(&group2)?;
        let producer = self.get_producer()?;

        let mut tpl = TopicPartitionList::new();
        tpl.add_partition(&name, id);
        consumer.assign(&tpl)?;

        let mut tpl = TopicPartitionList::new();
        tpl.add_partition(&retry_name, id);
        retry.assign(&tpl)?;

        Ok(crate::read::Reader::new(
            name,
            dead_letter,
            retry_name,
            consumer,
            retry,
            producer,
        ))
    }
}