kinbox 0.1.3

A simple kafka inbox
Documentation
/// 公共配置 trait
pub trait CommonConfig {
    fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig;

    fn inner(&self) -> &rdkafka::ClientConfig;

    /// 设置配置项
    fn set<K, V>(&mut self, key: K, value: V) -> &mut Self
    where
        K: Into<String>,
        V: Into<String>,
    {
        self.inner_mut().set(key, value);
        self
    }

    /// 移除配置项
    fn remove(&mut self, key: &str) -> &mut Self {
        self.inner_mut().remove(key);
        self
    }

    /// 获取配置项
    fn get(&self, key: &str) -> Option<&str> {
        self.inner().get(key)
    }

    /// 设置日志等级
    fn set_log_level(&mut self, log_level: i32) -> &mut Self {
        let level = match log_level {
            0 => rdkafka::config::RDKafkaLogLevel::Emerg,
            1 => rdkafka::config::RDKafkaLogLevel::Alert,
            2 => rdkafka::config::RDKafkaLogLevel::Critical,
            3 => rdkafka::config::RDKafkaLogLevel::Error,
            4 => rdkafka::config::RDKafkaLogLevel::Warning,
            5 => rdkafka::config::RDKafkaLogLevel::Notice,
            6 => rdkafka::config::RDKafkaLogLevel::Info,
            _ => rdkafka::config::RDKafkaLogLevel::Debug,
        };
        self.inner_mut().set_log_level(level);
        self
    }

    /// 设置代理的地址
    fn set_brokers<V: AsRef<str>>(&mut self, brokers: &[V]) -> &mut Self {
        let brokers = brokers
            .iter()
            .map(|v| v.as_ref())
            .collect::<Vec<&str>>()
            .join(",");
        self.inner_mut().set("bootstrap.servers", brokers);
        self
    }

    /// 建立tcp连接的时间
    fn set_connection_timeout_ms(&mut self, timeout: i32) -> &mut Self {
        self.inner_mut()
            .set("socket.connection.setup.timeout.ms", timeout.to_string());
        self
    }

    /// 网络 I/O 超时时间
    fn set_socket_timeout_ms(&mut self, timeout: i32) -> &mut Self {
        self.inner_mut()
            .set("socket.timeout.ms", timeout.to_string());
        self
    }

    /// 请求超时时间
    fn set_request_timeout_ms(&mut self, timeout: i32) -> &mut Self {
        self.inner_mut()
            .set("request.timeout.ms", timeout.to_string());
        self
    }

    /// 设置所有的默认超时
    fn set_all_timeout_ms(&mut self, timeout: i32) -> &mut Self {
        self.set_connection_timeout_ms(timeout)
            .set_socket_timeout_ms(timeout)
            .set_request_timeout_ms(timeout);
        self
    }
}

/// 客户端配置
#[derive(Clone, Debug)]
pub struct ClientConfig {
    pub(crate) inner: rdkafka::ClientConfig,
}

impl ClientConfig {
    pub fn new() -> Self {
        ClientConfig {
            inner: rdkafka::ClientConfig::new(),
        }
    }
}

impl CommonConfig for ClientConfig {
    fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig {
        &mut self.inner
    }

    fn inner(&self) -> &rdkafka::ClientConfig {
        &self.inner
    }
}

impl Default for ClientConfig {
    fn default() -> Self {
        Self::new()
    }
}

/// 生成者配置
#[derive(Clone, Debug)]
pub struct WriteConfig {
    pub(crate) inner: rdkafka::ClientConfig,
}

impl WriteConfig {
    /// 会有默认的设置
    pub fn new() -> Self {
        let mut c = Self {
            inner: rdkafka::ClientConfig::new(),
        };

        c.set_all_timeout_ms(3000)
            // 最高的可靠性确认机制
            .set_ack(Ack::All)
            // 开启幂等
            .set_idempotence(true)
            // 生产者等待成功的最大时间
            .set_delivery_timeout_ms(3000)
            // 单条消息最大字节数(服务端必须支持)
            .set_message_max_bytes(1048576)
            .set_retries(2);
        c
    }

    /// 设置投递消息时确认的机制, 默认是one
    /// 不同的确认方式速度,可靠性不一样
    pub fn set_ack(&mut self, ack: Ack) -> &mut Self {
        let ack = match ack {
            Ack::Zero => "0",
            Ack::One => "1",
            Ack::All => "all",
        };
        self.inner.set("acks", ack);
        self
    }

    /// 是否开启幂等, 如果开启则通过自动添加sequence number保证消息只写入一次
    /// 可能得配合Ack::All使用
    pub fn set_idempotence(&mut self, val: bool) -> &mut Self {
        self.inner
            .set("enable.idempotence", if val { "true" } else { "false" });
        self
    }

    /// 生产者等待成功的最大时间
    pub fn set_delivery_timeout_ms(&mut self, timeout: i32) -> &mut Self {
        self.inner.set("delivery.timeout.ms", timeout.to_string());
        self
    }

    /// 单条消息最大字节数(服务端必须支持)
    pub fn set_message_max_bytes(&mut self, max: i32) -> &mut Self {
        self.inner.set("message.max.bytes", max.to_string());
        self
    }

    /// 发送失败重试次数
    pub fn set_retries(&mut self, retries: i32) -> &mut Self {
        self.inner.set("retries", retries.to_string());
        self
    }
}

impl CommonConfig for WriteConfig {
    fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig {
        &mut self.inner
    }

    fn inner(&self) -> &rdkafka::ClientConfig {
        &self.inner
    }
}

impl Default for WriteConfig {
    fn default() -> Self {
        Self::new()
    }
}

/// 消费者配置
#[derive(Clone, Debug)]
pub struct ReadConfig {
    pub(crate) inner: rdkafka::ClientConfig,
}

impl ReadConfig {
    /// 会有默认的设置
    pub fn new() -> Self {
        let mut c = ReadConfig {
            inner: rdkafka::ClientConfig::new(),
        };

        c.set_all_timeout_ms(3000)
            // 关闭自动提交
            .set_auto_commit(false)
            // 队列里没消息时收到eof
            .set_partition_eof(true)
            // 消费开始的offset
            .set_offset_reset(OffsetReset::Earliest)
            // 每次拉取最小字节数
            .set_fetch_min_bytes(1)
            // 每次拉取最大字节数
            .set_fetch_max_bytes(1048576)
            .set_message_max_bytes(1048576)
            // 拉取数据时最大的延迟时间
            .set_fetch_wait_max_ms(500)
            // consumer session 失效超时时间
            .set_session_timeout_ms(10000);
        c
    }

    /// 设置消费组id
    pub fn set_group_id<V: Into<String>>(&mut self, group_id: V) -> &mut Self {
        self.inner.set("group.id", group_id);
        self
    }

    /// 设置是否自动确认提交, 默认是false
    pub fn set_auto_commit(&mut self, auto: bool) -> &mut Self {
        self.inner
            .set("enable.auto.commit", if auto { "true" } else { "false" });
        self
    }

    /// 设置是否当队列里没消息时收到eof, 默认是false
    pub fn set_partition_eof(&mut self, eof: bool) -> &mut Self {
        self.inner
            .set("enable.partition.eof", if eof { "true" } else { "false" });
        self
    }

    /// 设置从哪里开始消费, 默认是Latest
    pub fn set_offset_reset(&mut self, reset: OffsetReset) -> &mut Self {
        let reset = match reset {
            OffsetReset::Earliest => "earliest",
            OffsetReset::Latest => "latest",
            OffsetReset::None => "none",
        };
        self.inner.set("auto.offset.reset", reset);
        self
    }

    /// 每次拉取最小字节数
    pub fn set_fetch_min_bytes(&mut self, min: i32) -> &mut Self {
        self.inner.set("fetch.min.bytes", min.to_string());
        self
    }

    /// 每次拉取最大字节数
    pub fn set_fetch_max_bytes(&mut self, max: i32) -> &mut Self {
        self.inner.set("fetch.max.bytes", max.to_string());
        self
    }

    /// 单条消息最大字节数(服务端必须支持)
    pub fn set_message_max_bytes(&mut self, max: i32) -> &mut Self {
        self.inner.set("message.max.bytes", max.to_string());
        self
    }

    /// consumer session 失效超时时间
    pub fn set_session_timeout_ms(&mut self, timeout: i32) -> &mut Self {
        self.inner.set("session.timeout.ms", timeout.to_string());
        self
    }

    /// 拉取数据时最大的延迟时间,默认可能是500
    pub fn set_fetch_wait_max_ms(&mut self, wait: i32) -> &mut Self {
        self.inner.set("fetch.wait.max.ms", wait.to_string());
        self
    }
}

impl CommonConfig for ReadConfig {
    fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig {
        &mut self.inner
    }

    fn inner(&self) -> &rdkafka::ClientConfig {
        &self.inner
    }
}

impl Default for ReadConfig {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Copy, Clone, Debug)]
pub enum Ack {
    /// 不需要等broker确认
    Zero,
    /// broker确认
    One,
    /// 所有副本持久化才确认
    All,
}

#[derive(Copy, Clone, Debug)]
pub enum OffsetReset {
    /// 从最早的地方开始消费
    Earliest,
    /// 消费最新的
    Latest,
    None,
}