#[derive(Clone, Debug)]
pub struct Config {
pub(crate) inner: rdkafka::config::ClientConfig,
}
impl Config {
pub fn new() -> Self {
Config {
inner: rdkafka::ClientConfig::new(),
}
}
pub fn set<K, V>(&mut self, key: K, value: V) -> &mut Config
where
K: Into<String>,
V: Into<String>,
{
self.inner.set(key, value);
self
}
pub fn get(&self, key: &str) -> Option<&str> {
self.inner.get(key)
}
pub fn remove(&mut self, key: &str) -> &mut Config {
self.inner.remove(key);
self
}
pub fn set_log_level(&mut self, log_level: i32) -> &mut Config {
let level = match log_level {
0 => rdkafka::config::RDKafkaLogLevel::Emerg,
1 => rdkafka::config::RDKafkaLogLevel::Alert,
2 => rdkafka::config::RDKafkaLogLevel::Critical,
3 => rdkafka::config::RDKafkaLogLevel::Error,
4 => rdkafka::config::RDKafkaLogLevel::Warning,
5 => rdkafka::config::RDKafkaLogLevel::Notice,
6 => rdkafka::config::RDKafkaLogLevel::Info,
_ => rdkafka::config::RDKafkaLogLevel::Debug,
};
self.inner.set_log_level(level);
self
}
pub fn set_brokers<V: AsRef<str>>(&mut self, brokers: &[V]) -> &mut Config {
let brokers = brokers
.iter()
.map(|v| v.as_ref())
.collect::<Vec<&str>>()
.join(",");
self.inner.set("bootstrap.servers", brokers);
self
}
pub fn set_group_id<V: Into<String>>(&mut self, group_id: V) -> &mut Config {
self.inner.set("group.id", group_id);
self
}
pub fn set_auto_commit(&mut self, auto: bool) -> &mut Config {
self.inner
.set("enable.auto.commit", if auto { "true" } else { "false" });
self
}
pub fn set_partition_eof(&mut self, eof: bool) -> &mut Config {
self.inner
.set("enable.partition.eof", if eof { "true" } else { "false" });
self
}
pub fn set_offset_reset(&mut self, reset: OffsetReset) -> &mut Config {
let reset = match reset {
OffsetReset::Earliest => "earliest",
OffsetReset::Latest => "latest",
OffsetReset::None => "none",
};
self.inner.set("auto.offset.reset", reset);
self
}
pub fn set_message_max_bytes(&mut self, max: i32) -> &mut Config {
self.inner.set("message.max.bytes", max.to_string());
self
}
pub fn set_ack(&mut self, ack: Ack) -> &mut Config {
let ack = match ack {
Ack::Zero => "0",
Ack::One => "1",
Ack::All => "all",
};
self.inner.set("acks", ack);
self
}
pub fn set_delivery_timeout_ms(&mut self, timeout: i32) -> &mut Config {
self.inner.set("delivery.timeout.ms", timeout.to_string());
self
}
pub fn set_fetch_min_bytes(&mut self, min: i32) -> &mut Config {
self.inner.set("fetch.min.bytes", min.to_string());
self
}
pub fn set_fetch_max_bytes(&mut self, max: i32) -> &mut Config {
self.inner.set("fetch.max.bytes", max.to_string());
self
}
pub fn set_session_timeout_ms(&mut self, timeout: i32) -> &mut Config {
self.inner.set("session.timeout.ms", timeout.to_string());
self
}
pub fn set_connection_timeout_ms(&mut self, timeout: i32) -> &mut Config {
self.inner.set("socket.connection.setup.timeout.ms", timeout.to_string());
self
}
pub fn set_socket_timeout_ms(&mut self, timeout: i32) -> &mut Config {
self.inner.set("socket.timeout.ms", timeout.to_string());
self
}
pub fn set_request_timeout_ms(&mut self, timeout: i32) -> &mut Config {
self.inner.set("request.timeout.ms", timeout.to_string());
self
}
pub fn set_idempotence(&mut self, val: bool) -> &mut Config {
self.inner
.set("enable.idempotence", if val { "true" } else { "false" });
self
}
pub fn set_retries(&mut self, retries: i32) -> &mut Config {
self.inner.set("retries", retries.to_string());
self
}
pub fn set_fetch_wait_max_ms(&mut self, wait: i32) -> &mut Config {
self.inner.set("fetch.wait.max.ms", wait.to_string());
self
}
}
#[derive(Copy, Clone, Debug)]
pub enum OffsetReset {
Earliest,
Latest,
None,
}
#[derive(Copy, Clone, Debug)]
pub enum Ack {
Zero,
One,
All,
}