skafka 0.1.2

A simple kafka wrapper for rdkafka
Documentation
//! 配置参数, 它可能影响生产者和消费者的速度
//!
//! 1)下面是一些跟拉取有关的参数, consumer在订阅topic时,采用的是pull机制
//!
//! fetch.min.bytes	拉取的最小消息大小,低于这个大小不会立即返回
//!
//! fetch.max.bytes	单次拉取的最大大小,限制批次总大小
//!
//! fetch.wait.max.ms	Broker 在无数据时等待最大时间(长轮询时长)
//!
//! max.partition.fetch.bytes	每个分区最大拉取的字节数
//!
//! queued.min.messages	本地消息缓冲区中最少消息数
//!
//! queued.max.messages.kbytes	本地消息缓冲区最大大小(单位 KB)
//!
//! 2)下面是一些跟生产者有关的参数
//!
//! batch.size 控制每个 batch 的最大大小	增大可以提高吞吐(更少 request 次数),但也会增加延迟
//!
//! linger.ms  消息在发送前最多等待时间,等待更多消息凑成一个 batch	适当增加可提高吞吐,轻微增加延迟
//!
//! compression.type 消息压缩方式(gzip, snappy, lz4, zstd)	吞吐更高,但增加 CPU 使用;lz4 和 zstd 性能最好
//!
//! acks Broker 返回成功的条件(0, 1, all)0 最快但不可靠;all 最慢但最安全
//!
//! enable.idempotence 幂等性开启,防止消息重复	开启后可能限制并发与吞吐(同时启用时 max.in.flight.requests 必须 ≤ 5)
//!
//! max.in.flight.requests.per.connection 单个连接上未确认请求数量	增加可提高并发,但和幂等性冲突(必须 ≤ 5)
//!
//! retries	失败时自动重试次数	增加可提升可靠性,但重试可能导致延迟升高
//!
//! delivery.timeout.ms	发送失败前最多等待时间(包括重试)	限制消息最大等待时间
//!
//! request.timeout.ms 等待 Broker响应时间上限	Broker 不响应时快速失败
//!
//! buffer.memory 用于缓存待发送消息的内存,太小会阻塞发送;足够大才能并发批量发消息
//!
//! message.timeout.ms 单条消息的过期时间(老版本参数)不再推荐使用,推荐用 delivery.timeout.ms

#[derive(Clone, Debug)]
pub struct Config {
    pub(crate) inner: rdkafka::config::ClientConfig,
}

impl Config {
    pub fn new() -> Self {
        Config {
            inner: rdkafka::ClientConfig::new(),
        }
    }

    pub fn set<K, V>(&mut self, key: K, value: V) -> &mut Config
    where
        K: Into<String>,
        V: Into<String>,
    {
        self.inner.set(key, value);
        self
    }

    pub fn get(&self, key: &str) -> Option<&str> {
        self.inner.get(key)
    }

    pub fn remove(&mut self, key: &str) -> &mut Config {
        self.inner.remove(key);
        self
    }

    /// 设置日志等级
    pub fn set_log_level(&mut self, log_level: i32) -> &mut Config {
        let level = match log_level {
            0 => rdkafka::config::RDKafkaLogLevel::Emerg,
            1 => rdkafka::config::RDKafkaLogLevel::Alert,
            2 => rdkafka::config::RDKafkaLogLevel::Critical,
            3 => rdkafka::config::RDKafkaLogLevel::Error,
            4 => rdkafka::config::RDKafkaLogLevel::Warning,
            5 => rdkafka::config::RDKafkaLogLevel::Notice,
            6 => rdkafka::config::RDKafkaLogLevel::Info,
            _ => rdkafka::config::RDKafkaLogLevel::Debug,
        };

        self.inner.set_log_level(level);
        self
    }

    /// 设置代理的地址
    pub fn set_brokers<V: AsRef<str>>(&mut self, brokers: &[V]) -> &mut Config {
        let brokers = brokers
            .iter()
            .map(|v| v.as_ref())
            .collect::<Vec<&str>>()
            .join(",");
        self.inner.set("bootstrap.servers", brokers);
        self
    }

    /// consumer用
    /// 设置消费组id
    pub fn set_group_id<V: Into<String>>(&mut self, group_id: V) -> &mut Config {
        self.inner.set("group.id", group_id);
        self
    }

    /// consumer用
    /// 设置是否自动确认提交, 默认是false
    pub fn set_auto_commit(&mut self, auto: bool) -> &mut Config {
        self.inner
            .set("enable.auto.commit", if auto { "true" } else { "false" });
        self
    }

    /// consumer用
    /// 设置是否当队列里没消息时收到eof, 默认是false
    pub fn set_partition_eof(&mut self, eof: bool) -> &mut Config {
        self.inner
            .set("enable.partition.eof", if eof { "true" } else { "false" });
        self
    }

    /// consumer用
    /// 设置从哪里开始消费, 默认是Latest
    pub fn set_offset_reset(&mut self, reset: OffsetReset) -> &mut Config {
        let reset = match reset {
            OffsetReset::Earliest => "earliest",
            OffsetReset::Latest => "latest",
            OffsetReset::None => "none",
        };
        self.inner.set("auto.offset.reset", reset);
        self
    }

    /// 单条消息最大字节数(服务端必须支持)
    pub fn set_message_max_bytes(&mut self, max: i32) -> &mut Config {
        self.inner.set("message.max.bytes", max.to_string());
        self
    }

    /// producer用
    /// 设置投递消息时确认的机制, 默认是one
    /// 不同的确认方式速度,可靠性不一样
    pub fn set_ack(&mut self, ack: Ack) -> &mut Config {
        let ack = match ack {
            Ack::Zero => "0",
            Ack::One => "1",
            Ack::All => "all",
        };
        self.inner.set("acks", ack);
        self
    }

    /// producer用
    /// 生产者等待成功的最大时间
    pub fn set_delivery_timeout_ms(&mut self, timeout: i32) -> &mut Config {
        self.inner.set("delivery.timeout.ms", timeout.to_string());
        self
    }

    /// consumer用
    /// 每次拉取最小字节数
    pub fn set_fetch_min_bytes(&mut self, min: i32) -> &mut Config {
        self.inner.set("fetch.min.bytes", min.to_string());
        self
    }

    /// consumer用
    /// 每次拉取最大字节数
    pub fn set_fetch_max_bytes(&mut self, max: i32) -> &mut Config {
        self.inner.set("fetch.max.bytes", max.to_string());
        self
    }

    /// consumer用
    /// consumer session 失效超时时间
    pub fn set_session_timeout_ms(&mut self, timeout: i32) -> &mut Config {
        self.inner.set("session.timeout.ms", timeout.to_string());
        self
    }

    /// admin client用
    /// 网络 I/O 超时时间
    pub fn set_socket_timeout_ms(&mut self, timeout: i32) -> &mut Config {
        self.inner.set("socket.timeout.ms", timeout.to_string());
        self
    }

    /// admin client用
    /// 请求超时时间
    pub fn set_request_timeout_ms(&mut self, timeout: i32) -> &mut Config {
        self.inner.set("request.timeout.ms", timeout.to_string());
        self
    }

    /// producer用
    /// 是否开启幂等, 如果开启则通过自动添加sequence number保证消息只写入一次
    /// 可能得配合Ack::All使用
    pub fn set_idempotence(&mut self, val: bool) -> &mut Config {
        self.inner
            .set("enable.idempotence", if val { "true" } else { "false" });
        self
    }

    /// producer用
    /// 发送失败重试次数
    pub fn set_retries(&mut self, retries: i32) -> &mut Config {
        self.inner.set("retries", retries.to_string());
        self
    }

    /// consumer用
    /// 拉取数据时最大的延迟时间,默认可能是500
    pub fn set_fetch_wait_max_ms(&mut self, wait: i32) -> &mut Config {
        self.inner.set("fetch.wait.max.ms", wait.to_string());
        self
    }
}

#[derive(Copy, Clone, Debug)]
pub enum OffsetReset {
    /// 从最早的地方开始消费
    Earliest,
    /// 消费最新的
    Latest,
    None,
}

#[derive(Copy, Clone, Debug)]
pub enum Ack {
    /// 不需要等broker确认
    Zero,
    /// broker确认
    One,
    /// 所有副本持久化才确认
    All,
}