1use rdkafka::{config::RDKafkaLogLevel, ClientConfig};
2
3#[derive(Clone, Debug)]
9pub struct KafkaClientConfig(ClientConfig);
10
11impl KafkaClientConfig {
12 pub fn new(config: ClientConfig) -> Self {
14 KafkaClientConfig(config)
15 }
16
17 pub fn new_recommended(group_id: impl Into<String>, brokers: impl Into<String>) -> Self {
27 let mut config = ClientConfig::new();
28 config
29 .set("group.id", group_id)
30 .set("bootstrap.servers", brokers)
31 .set("enable.partition.eof", "false")
32 .set("session.timeout.ms", "6000")
33 .set("enable.auto.commit", "true")
34 .set("auto.commit.interval.ms", "5000")
35 .set("enable.auto.offset.store", "false")
36 .set("auto.offset.reset", "earliest")
37 .set_log_level(RDKafkaLogLevel::Debug);
38
39 KafkaClientConfig(config)
40 }
41
42 pub fn into_inner(self) -> ClientConfig {
44 self.0
45 }
46
47 pub fn set<K, V>(&mut self, key: K, value: V) -> &mut Self
49 where
50 K: Into<String>,
51 V: Into<String>,
52 {
53 self.as_mut().set(key, value);
54 self
55 }
56}
57
58impl From<KafkaClientConfig> for ClientConfig {
59 fn from(config: KafkaClientConfig) -> Self {
60 config.into_inner()
61 }
62}
63
64impl AsRef<ClientConfig> for KafkaClientConfig {
65 fn as_ref(&self) -> &ClientConfig {
66 &self.0
67 }
68}
69
70impl AsMut<ClientConfig> for KafkaClientConfig {
71 fn as_mut(&mut self) -> &mut ClientConfig {
72 &mut self.0
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use super::KafkaClientConfig;
79
80 #[test]
81 fn new_recommended() {
82 let config = KafkaClientConfig::new_recommended("my_group", "broker1,broker2");
83 let inner_config = config.into_inner();
84 assert_eq!(inner_config.get("group.id"), Some("my_group"));
85 assert_eq!(
86 inner_config.get("bootstrap.servers"),
87 Some("broker1,broker2")
88 );
89 assert_eq!(inner_config.get("enable.partition.eof"), Some("false"));
90 assert_eq!(inner_config.get("session.timeout.ms"), Some("6000"));
91 assert_eq!(inner_config.get("enable.auto.commit"), Some("true"));
92 assert_eq!(inner_config.get("auto.commit.interval.ms"), Some("5000"));
93 assert_eq!(inner_config.get("enable.auto.offset.store"), Some("false"));
94 }
95
96 #[test]
97 fn set() {
98 let mut config = KafkaClientConfig::new_recommended("my_group", "broker1,broker2");
99 config.set("group.id", "overwritten");
100
101 let inner_config = config.into_inner();
102 assert_eq!(inner_config.get("group.id"), Some("overwritten"));
103 }
104}