rdkafka/
config.rs

1//! Producer and consumer configuration.
2//!
3//! ## C library configuration
4//!
5//! The Rust library will forward all the configuration to the C library. The
6//! most frequently used parameters are listed here.
7//!
8//! ### Frequently used parameters
9//!
10//! For producer-specific and consumer-specific parameters check the producer
11//! and consumer modules documentation. The full list of available parameters is
12//! available in the [librdkafka documentation][librdkafka-config].
13//!
14//! - `client.id`: Client identifier. Default: `rdkafka`.
15//! - `bootstrap.servers`: Initial list of brokers as a CSV list of broker host
16//!   or host:port. Default: empty.
17//! - `message.max.bytes`: Maximum message size. Default: 1000000.
18//! - `debug`: A comma-separated list of debug contexts to enable. Use 'all' to
19//!   print all the debugging information. Default: empty (off).
20//! - `statistics.interval.ms`: how often the statistic callback
21//!   specified in the [`ClientContext`] will be called. Default: 0 (disabled).
22//!
23//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
24
25use std::collections::HashMap;
26use std::ffi::CString;
27use std::iter::FromIterator;
28use std::os::raw::c_char;
29use std::ptr;
30
31use rdkafka_sys as rdsys;
32use rdkafka_sys::types::*;
33
34use crate::client::ClientContext;
35use crate::error::{IsError, KafkaError, KafkaResult};
36use crate::log::{log_enabled, DEBUG, INFO, WARN};
37use crate::util::{ErrBuf, KafkaDrop, NativePtr};
38
39/// The log levels supported by librdkafka.
40#[derive(Copy, Clone, Debug)]
41pub enum RDKafkaLogLevel {
42    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
43    /// crate.
44    Emerg = 0,
45    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
46    /// crate.
47    Alert = 1,
48    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
49    /// crate.
50    Critical = 2,
51    /// Equivalent to [`Level::Error`](log::Level::Error) from the log crate.
52    Error = 3,
53    /// Equivalent to [`Level::Warn`](log::Level::Warn) from the log crate.
54    Warning = 4,
55    /// Higher priority then [`Level::Info`](log::Level::Info) from the log
56    /// crate.
57    Notice = 5,
58    /// Equivalent to [`Level::Info`](log::Level::Info) from the log crate.
59    Info = 6,
60    /// Equivalent to [`Level::Debug`](log::Level::Debug) from the log crate.
61    Debug = 7,
62}
63
64impl RDKafkaLogLevel {
65    pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
66        match level {
67            0 => RDKafkaLogLevel::Emerg,
68            1 => RDKafkaLogLevel::Alert,
69            2 => RDKafkaLogLevel::Critical,
70            3 => RDKafkaLogLevel::Error,
71            4 => RDKafkaLogLevel::Warning,
72            5 => RDKafkaLogLevel::Notice,
73            6 => RDKafkaLogLevel::Info,
74            _ => RDKafkaLogLevel::Debug,
75        }
76    }
77}
78
79//
80// ********** CLIENT CONFIG **********
81//
82
83/// A native rdkafka-sys client config.
84pub struct NativeClientConfig {
85    ptr: NativePtr<RDKafkaConf>,
86}
87
88unsafe impl KafkaDrop for RDKafkaConf {
89    const TYPE: &'static str = "client config";
90    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
91}
92
93impl NativeClientConfig {
94    /// Wraps a pointer to an `RDKafkaConfig` object and returns a new `NativeClientConfig`.
95    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
96        NativeClientConfig {
97            ptr: NativePtr::from_ptr(ptr).unwrap(),
98        }
99    }
100
101    /// Returns the pointer to the librdkafka RDKafkaConf structure.
102    pub fn ptr(&self) -> *mut RDKafkaConf {
103        self.ptr.ptr()
104    }
105
106    /// Gets the value of a parameter in the configuration.
107    ///
108    /// This method reflects librdkafka's view of the current value of the
109    /// parameter. If the parameter was overridden by the user, it returns the
110    /// user-specified value. Otherwise, it returns librdkafka's default value
111    /// for the parameter.
112    pub fn get(&self, key: &str) -> KafkaResult<String> {
113        let make_err = |res| {
114            KafkaError::ClientConfig(
115                res,
116                match res {
117                    RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
118                    RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
119                    RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
120                }
121                .into(),
122                key.into(),
123                "".into(),
124            )
125        };
126        let key_c = CString::new(key.to_string())?;
127
128        // Call with a `NULL` buffer to determine the size of the string.
129        let mut size = 0_usize;
130        let res = unsafe {
131            rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
132        };
133        if res.is_error() {
134            return Err(make_err(res));
135        }
136
137        // Allocate a buffer of that size and call again to get the actual
138        // string.
139        let mut buf = vec![0_u8; size];
140        let res = unsafe {
141            rdsys::rd_kafka_conf_get(
142                self.ptr(),
143                key_c.as_ptr(),
144                buf.as_mut_ptr() as *mut c_char,
145                &mut size,
146            )
147        };
148        if res.is_error() {
149            return Err(make_err(res));
150        }
151
152        // Convert the C string to a Rust string.
153        Ok(String::from_utf8_lossy(&buf)
154            .trim_matches(char::from(0))
155            .to_string())
156    }
157
158    pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> {
159        let mut err_buf = ErrBuf::new();
160        let key_c = CString::new(key)?;
161        let value_c = CString::new(value)?;
162        let ret = unsafe {
163            rdsys::rd_kafka_conf_set(
164                self.ptr(),
165                key_c.as_ptr(),
166                value_c.as_ptr(),
167                err_buf.as_mut_ptr(),
168                err_buf.capacity(),
169            )
170        };
171        if ret.is_error() {
172            return Err(KafkaError::ClientConfig(
173                ret,
174                err_buf.to_string(),
175                key.to_string(),
176                value.to_string(),
177            ));
178        }
179        Ok(())
180    }
181}
182
183/// Client configuration.
184#[derive(Clone, Debug)]
185pub struct ClientConfig {
186    conf_map: HashMap<String, String>,
187    /// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list
188    /// of available levels.
189    pub log_level: RDKafkaLogLevel,
190}
191
192impl Default for ClientConfig {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198impl ClientConfig {
199    /// Creates a new empty configuration.
200    pub fn new() -> ClientConfig {
201        ClientConfig {
202            conf_map: HashMap::new(),
203            log_level: log_level_from_global_config(),
204        }
205    }
206
207    /// Gets a reference to the underlying config map
208    pub fn config_map(&self) -> &HashMap<String, String> {
209        &self.conf_map
210    }
211
212    /// Gets the value of a parameter in the configuration.
213    ///
214    /// Returns the current value set for `key`, or `None` if no value for `key`
215    /// exists.
216    ///
217    /// Note that this method will only ever return values that were installed
218    /// by a call to [`ClientConfig::set`]. To retrieve librdkafka's default
219    /// value for a parameter, build a [`NativeClientConfig`] and then call
220    /// [`NativeClientConfig::get`] on the resulting object.
221    pub fn get(&self, key: &str) -> Option<&str> {
222        self.conf_map.get(key).map(|val| val.as_str())
223    }
224
225    /// Sets a parameter in the configuration.
226    ///
227    /// If there is an existing value for `key` in the configuration, it is
228    /// overridden with the new `value`.
229    pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
230    where
231        K: Into<String>,
232        V: Into<String>,
233    {
234        self.conf_map.insert(key.into(), value.into());
235        self
236    }
237
238    /// Removes a parameter from the configuration.
239    pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
240        self.conf_map.remove(key);
241        self
242    }
243
244    /// Sets the log level of the client. If not specified, the log level will be calculated based
245    /// on the global log level of the log crate.
246    pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
247        self.log_level = log_level;
248        self
249    }
250
251    /// Builds a native librdkafka configuration.
252    pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
253        let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
254        for (key, value) in &self.conf_map {
255            conf.set(key, value)?;
256        }
257        Ok(conf)
258    }
259
260    /// Uses the current configuration to create a new Consumer or Producer.
261    pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
262        T::from_config(self)
263    }
264
265    /// Uses the current configuration and the provided context to create a new Consumer or Producer.
266    pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
267    where
268        C: ClientContext,
269        T: FromClientConfigAndContext<C>,
270    {
271        T::from_config_and_context(self, context)
272    }
273}
274
275impl FromIterator<(String, String)> for ClientConfig {
276    fn from_iter<I>(iter: I) -> ClientConfig
277    where
278        I: IntoIterator<Item = (String, String)>,
279    {
280        let mut config = ClientConfig::new();
281        config.extend(iter);
282        config
283    }
284}
285
286impl Extend<(String, String)> for ClientConfig {
287    fn extend<I>(&mut self, iter: I)
288    where
289        I: IntoIterator<Item = (String, String)>,
290    {
291        self.conf_map.extend(iter)
292    }
293}
294
295/// Return the log level
296fn log_level_from_global_config() -> RDKafkaLogLevel {
297    if log_enabled!(target: "librdkafka", DEBUG) {
298        RDKafkaLogLevel::Debug
299    } else if log_enabled!(target: "librdkafka", INFO) {
300        RDKafkaLogLevel::Info
301    } else if log_enabled!(target: "librdkafka", WARN) {
302        RDKafkaLogLevel::Warning
303    } else {
304        RDKafkaLogLevel::Error
305    }
306}
307
308/// Create a new client based on the provided configuration.
309pub trait FromClientConfig: Sized {
310    /// Creates a client from a client configuration. The default client context
311    /// will be used.
312    fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
313}
314
315/// Create a new client based on the provided configuration and context.
316pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
317    /// Creates a client from a client configuration and a client context.
318    fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
319}
320
321#[cfg(test)]
322mod tests {
323    use super::ClientConfig;
324
325    #[test]
326    fn test_client_config_set_map() {
327        let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
328            .into_iter()
329            .collect();
330        config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
331
332        assert_eq!(config.get("a").unwrap(), "1");
333        assert_eq!(config.get("b").unwrap(), "2");
334        assert_eq!(config.get("c").unwrap(), "3");
335    }
336}