thalo_kafka/
config.rs

1use rdkafka::{config::RDKafkaLogLevel, ClientConfig};
2
3/// A wrapper around [`rdkafka::ClientConfig`] for convenience.
4///
5/// Provides recommended config with `KafkaClientConfig::new_recommended` which is configured for
6/// at-least-once kafka semantic. Offsets should be manually added to the store.
7/// For more info, see the [`rust-rdkafka/examples/at_least_once.rs`](https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs) example.
8#[derive(Clone, Debug)]
9pub struct KafkaClientConfig(ClientConfig);
10
11impl KafkaClientConfig {
12    /// Create a new `KafkaClientConfig` from an existing [`rdkafka::ClientConfig`].
13    pub fn new(config: ClientConfig) -> Self {
14        KafkaClientConfig(config)
15    }
16
17    /// Create a new `KafkaClientConfig` with a recommended configuration with at-least-once semantics.
18    ///
19    /// Along with `group.id` and `bootstrap.servers`, the following config items are set:
20    ///
21    /// - `enable.partition.eof=false`
22    /// - `session.timeout.ms=6000`
23    /// - `enable.auto.commit=true`
24    /// - `auto.commit.interval.ms=5000`
25    /// - `enable.auto.offset.store=false`
26    pub fn new_recommended(group_id: impl Into<String>, brokers: impl Into<String>) -> Self {
27        let mut config = ClientConfig::new();
28        config
29            .set("group.id", group_id)
30            .set("bootstrap.servers", brokers)
31            .set("enable.partition.eof", "false")
32            .set("session.timeout.ms", "6000")
33            .set("enable.auto.commit", "true")
34            .set("auto.commit.interval.ms", "5000")
35            .set("enable.auto.offset.store", "false")
36            .set("auto.offset.reset", "earliest")
37            .set_log_level(RDKafkaLogLevel::Debug);
38
39        KafkaClientConfig(config)
40    }
41
42    /// Return the inner [`rdkafka::ClientConfig`].
43    pub fn into_inner(self) -> ClientConfig {
44        self.0
45    }
46
47    /// Sets a configuration key on the inner [`rdkafka::ClientConfig`].
48    pub fn set<K, V>(&mut self, key: K, value: V) -> &mut Self
49    where
50        K: Into<String>,
51        V: Into<String>,
52    {
53        self.as_mut().set(key, value);
54        self
55    }
56}
57
58impl From<KafkaClientConfig> for ClientConfig {
59    fn from(config: KafkaClientConfig) -> Self {
60        config.into_inner()
61    }
62}
63
64impl AsRef<ClientConfig> for KafkaClientConfig {
65    fn as_ref(&self) -> &ClientConfig {
66        &self.0
67    }
68}
69
70impl AsMut<ClientConfig> for KafkaClientConfig {
71    fn as_mut(&mut self) -> &mut ClientConfig {
72        &mut self.0
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::KafkaClientConfig;
79
80    #[test]
81    fn new_recommended() {
82        let config = KafkaClientConfig::new_recommended("my_group", "broker1,broker2");
83        let inner_config = config.into_inner();
84        assert_eq!(inner_config.get("group.id"), Some("my_group"));
85        assert_eq!(
86            inner_config.get("bootstrap.servers"),
87            Some("broker1,broker2")
88        );
89        assert_eq!(inner_config.get("enable.partition.eof"), Some("false"));
90        assert_eq!(inner_config.get("session.timeout.ms"), Some("6000"));
91        assert_eq!(inner_config.get("enable.auto.commit"), Some("true"));
92        assert_eq!(inner_config.get("auto.commit.interval.ms"), Some("5000"));
93        assert_eq!(inner_config.get("enable.auto.offset.store"), Some("false"));
94    }
95
96    #[test]
97    fn set() {
98        let mut config = KafkaClientConfig::new_recommended("my_group", "broker1,broker2");
99        config.set("group.id", "overwritten");
100
101        let inner_config = config.into_inner();
102        assert_eq!(inner_config.get("group.id"), Some("overwritten"));
103    }
104}