pub trait CommonConfig {
fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig;
fn inner(&self) -> &rdkafka::ClientConfig;
fn set<K, V>(&mut self, key: K, value: V) -> &mut Self
where
K: Into<String>,
V: Into<String>,
{
self.inner_mut().set(key, value);
self
}
fn remove(&mut self, key: &str) -> &mut Self {
self.inner_mut().remove(key);
self
}
fn get(&self, key: &str) -> Option<&str> {
self.inner().get(key)
}
fn set_log_level(&mut self, log_level: i32) -> &mut Self {
let level = match log_level {
0 => rdkafka::config::RDKafkaLogLevel::Emerg,
1 => rdkafka::config::RDKafkaLogLevel::Alert,
2 => rdkafka::config::RDKafkaLogLevel::Critical,
3 => rdkafka::config::RDKafkaLogLevel::Error,
4 => rdkafka::config::RDKafkaLogLevel::Warning,
5 => rdkafka::config::RDKafkaLogLevel::Notice,
6 => rdkafka::config::RDKafkaLogLevel::Info,
_ => rdkafka::config::RDKafkaLogLevel::Debug,
};
self.inner_mut().set_log_level(level);
self
}
fn set_brokers<V: AsRef<str>>(&mut self, brokers: &[V]) -> &mut Self {
let brokers = brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(",");
self.inner_mut().set("bootstrap.servers", brokers);
self
}
fn set_connection_timeout_ms(&mut self, timeout: i32) -> &mut Self {
self.inner_mut()
.set("socket.connection.setup.timeout.ms", timeout.to_string());
self
}
fn set_socket_timeout_ms(&mut self, timeout: i32) -> &mut Self {
self.inner_mut()
.set("socket.timeout.ms", timeout.to_string());
self
}
fn set_request_timeout_ms(&mut self, timeout: i32) -> &mut Self {
self.inner_mut()
.set("request.timeout.ms", timeout.to_string());
self
}
fn set_all_timeout_ms(&mut self, timeout: i32) -> &mut Self {
self.set_connection_timeout_ms(timeout)
.set_socket_timeout_ms(timeout)
.set_request_timeout_ms(timeout);
self
}
}
#[derive(Clone, Debug)]
pub struct ClientConfig {
pub(crate) inner: rdkafka::ClientConfig,
}
impl ClientConfig {
pub fn new() -> Self {
ClientConfig {
inner: rdkafka::ClientConfig::new(),
}
}
}
impl CommonConfig for ClientConfig {
fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig {
&mut self.inner
}
fn inner(&self) -> &rdkafka::ClientConfig {
&self.inner
}
}
impl Default for ClientConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub struct WriteConfig {
pub(crate) inner: rdkafka::ClientConfig,
}
impl WriteConfig {
pub fn new() -> Self {
let mut c = Self {
inner: rdkafka::ClientConfig::new(),
};
c.set_all_timeout_ms(3000)
.set_ack(Ack::All)
.set_idempotence(true)
.set_delivery_timeout_ms(3000)
.set_message_max_bytes(1048576)
.set_retries(2);
c
}
pub fn set_ack(&mut self, ack: Ack) -> &mut Self {
let ack = match ack {
Ack::Zero => "0",
Ack::One => "1",
Ack::All => "all",
};
self.inner.set("acks", ack);
self
}
pub fn set_idempotence(&mut self, val: bool) -> &mut Self {
self.inner
.set("enable.idempotence", if val { "true" } else { "false" });
self
}
pub fn set_delivery_timeout_ms(&mut self, timeout: i32) -> &mut Self {
self.inner.set("delivery.timeout.ms", timeout.to_string());
self
}
pub fn set_message_max_bytes(&mut self, max: i32) -> &mut Self {
self.inner.set("message.max.bytes", max.to_string());
self
}
pub fn set_retries(&mut self, retries: i32) -> &mut Self {
self.inner.set("retries", retries.to_string());
self
}
}
impl CommonConfig for WriteConfig {
fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig {
&mut self.inner
}
fn inner(&self) -> &rdkafka::ClientConfig {
&self.inner
}
}
impl Default for WriteConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub struct ReadConfig {
pub(crate) inner: rdkafka::ClientConfig,
}
impl ReadConfig {
pub fn new() -> Self {
let mut c = ReadConfig {
inner: rdkafka::ClientConfig::new(),
};
c.set_all_timeout_ms(3000)
.set_auto_commit(false)
.set_partition_eof(true)
.set_offset_reset(OffsetReset::Earliest)
.set_fetch_min_bytes(1)
.set_fetch_max_bytes(1048576)
.set_message_max_bytes(1048576)
.set_fetch_wait_max_ms(500)
.set_session_timeout_ms(10000);
c
}
pub fn set_group_id<V: Into<String>>(&mut self, group_id: V) -> &mut Self {
self.inner.set("group.id", group_id);
self
}
pub fn set_auto_commit(&mut self, auto: bool) -> &mut Self {
self.inner
.set("enable.auto.commit", if auto { "true" } else { "false" });
self
}
pub fn set_partition_eof(&mut self, eof: bool) -> &mut Self {
self.inner
.set("enable.partition.eof", if eof { "true" } else { "false" });
self
}
pub fn set_offset_reset(&mut self, reset: OffsetReset) -> &mut Self {
let reset = match reset {
OffsetReset::Earliest => "earliest",
OffsetReset::Latest => "latest",
OffsetReset::None => "none",
};
self.inner.set("auto.offset.reset", reset);
self
}
pub fn set_fetch_min_bytes(&mut self, min: i32) -> &mut Self {
self.inner.set("fetch.min.bytes", min.to_string());
self
}
pub fn set_fetch_max_bytes(&mut self, max: i32) -> &mut Self {
self.inner.set("fetch.max.bytes", max.to_string());
self
}
pub fn set_message_max_bytes(&mut self, max: i32) -> &mut Self {
self.inner.set("message.max.bytes", max.to_string());
self
}
pub fn set_session_timeout_ms(&mut self, timeout: i32) -> &mut Self {
self.inner.set("session.timeout.ms", timeout.to_string());
self
}
pub fn set_fetch_wait_max_ms(&mut self, wait: i32) -> &mut Self {
self.inner.set("fetch.wait.max.ms", wait.to_string());
self
}
}
impl CommonConfig for ReadConfig {
fn inner_mut(&mut self) -> &mut rdkafka::ClientConfig {
&mut self.inner
}
fn inner(&self) -> &rdkafka::ClientConfig {
&self.inner
}
}
impl Default for ReadConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Copy, Clone, Debug)]
pub enum Ack {
Zero,
One,
All,
}
#[derive(Copy, Clone, Debug)]
pub enum OffsetReset {
Earliest,
Latest,
None,
}