1use std::collections::HashMap;
26use std::ffi::CString;
27use std::iter::FromIterator;
28use std::os::raw::c_char;
29use std::ptr;
30
31use rdkafka_sys as rdsys;
32use rdkafka_sys::types::*;
33
34use crate::client::ClientContext;
35use crate::error::{IsError, KafkaError, KafkaResult};
36use crate::log::{log_enabled, DEBUG, INFO, WARN};
37use crate::util::{ErrBuf, KafkaDrop, NativePtr};
38
39#[derive(Copy, Clone, Debug)]
41pub enum RDKafkaLogLevel {
42 Emerg = 0,
45 Alert = 1,
48 Critical = 2,
51 Error = 3,
53 Warning = 4,
55 Notice = 5,
58 Info = 6,
60 Debug = 7,
62}
63
64impl RDKafkaLogLevel {
65 pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
66 match level {
67 0 => RDKafkaLogLevel::Emerg,
68 1 => RDKafkaLogLevel::Alert,
69 2 => RDKafkaLogLevel::Critical,
70 3 => RDKafkaLogLevel::Error,
71 4 => RDKafkaLogLevel::Warning,
72 5 => RDKafkaLogLevel::Notice,
73 6 => RDKafkaLogLevel::Info,
74 _ => RDKafkaLogLevel::Debug,
75 }
76 }
77}
78
79pub struct NativeClientConfig {
85 ptr: NativePtr<RDKafkaConf>,
86}
87
88unsafe impl KafkaDrop for RDKafkaConf {
89 const TYPE: &'static str = "client config";
90 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
91}
92
93impl NativeClientConfig {
94 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
96 NativeClientConfig {
97 ptr: NativePtr::from_ptr(ptr).unwrap(),
98 }
99 }
100
101 pub fn ptr(&self) -> *mut RDKafkaConf {
103 self.ptr.ptr()
104 }
105
106 pub fn get(&self, key: &str) -> KafkaResult<String> {
113 let make_err = |res| {
114 KafkaError::ClientConfig(
115 res,
116 match res {
117 RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
118 RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
119 RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
120 }
121 .into(),
122 key.into(),
123 "".into(),
124 )
125 };
126 let key_c = CString::new(key.to_string())?;
127
128 let mut size = 0_usize;
130 let res = unsafe {
131 rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
132 };
133 if res.is_error() {
134 return Err(make_err(res));
135 }
136
137 let mut buf = vec![0_u8; size];
140 let res = unsafe {
141 rdsys::rd_kafka_conf_get(
142 self.ptr(),
143 key_c.as_ptr(),
144 buf.as_mut_ptr() as *mut c_char,
145 &mut size,
146 )
147 };
148 if res.is_error() {
149 return Err(make_err(res));
150 }
151
152 Ok(String::from_utf8_lossy(&buf)
154 .trim_matches(char::from(0))
155 .to_string())
156 }
157
158 pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> {
159 let mut err_buf = ErrBuf::new();
160 let key_c = CString::new(key)?;
161 let value_c = CString::new(value)?;
162 let ret = unsafe {
163 rdsys::rd_kafka_conf_set(
164 self.ptr(),
165 key_c.as_ptr(),
166 value_c.as_ptr(),
167 err_buf.as_mut_ptr(),
168 err_buf.capacity(),
169 )
170 };
171 if ret.is_error() {
172 return Err(KafkaError::ClientConfig(
173 ret,
174 err_buf.to_string(),
175 key.to_string(),
176 value.to_string(),
177 ));
178 }
179 Ok(())
180 }
181}
182
183#[derive(Clone, Debug)]
185pub struct ClientConfig {
186 conf_map: HashMap<String, String>,
187 pub log_level: RDKafkaLogLevel,
190}
191
192impl Default for ClientConfig {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198impl ClientConfig {
199 pub fn new() -> ClientConfig {
201 ClientConfig {
202 conf_map: HashMap::new(),
203 log_level: log_level_from_global_config(),
204 }
205 }
206
207 pub fn config_map(&self) -> &HashMap<String, String> {
209 &self.conf_map
210 }
211
212 pub fn get(&self, key: &str) -> Option<&str> {
222 self.conf_map.get(key).map(|val| val.as_str())
223 }
224
225 pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
230 where
231 K: Into<String>,
232 V: Into<String>,
233 {
234 self.conf_map.insert(key.into(), value.into());
235 self
236 }
237
238 pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
240 self.conf_map.remove(key);
241 self
242 }
243
244 pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
247 self.log_level = log_level;
248 self
249 }
250
251 pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
253 let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
254 for (key, value) in &self.conf_map {
255 conf.set(key, value)?;
256 }
257 Ok(conf)
258 }
259
260 pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
262 T::from_config(self)
263 }
264
265 pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
267 where
268 C: ClientContext,
269 T: FromClientConfigAndContext<C>,
270 {
271 T::from_config_and_context(self, context)
272 }
273}
274
275impl FromIterator<(String, String)> for ClientConfig {
276 fn from_iter<I>(iter: I) -> ClientConfig
277 where
278 I: IntoIterator<Item = (String, String)>,
279 {
280 let mut config = ClientConfig::new();
281 config.extend(iter);
282 config
283 }
284}
285
286impl Extend<(String, String)> for ClientConfig {
287 fn extend<I>(&mut self, iter: I)
288 where
289 I: IntoIterator<Item = (String, String)>,
290 {
291 self.conf_map.extend(iter)
292 }
293}
294
295fn log_level_from_global_config() -> RDKafkaLogLevel {
297 if log_enabled!(target: "librdkafka", DEBUG) {
298 RDKafkaLogLevel::Debug
299 } else if log_enabled!(target: "librdkafka", INFO) {
300 RDKafkaLogLevel::Info
301 } else if log_enabled!(target: "librdkafka", WARN) {
302 RDKafkaLogLevel::Warning
303 } else {
304 RDKafkaLogLevel::Error
305 }
306}
307
308pub trait FromClientConfig: Sized {
310 fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
313}
314
315pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
317 fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
319}
320
321#[cfg(test)]
322mod tests {
323 use super::ClientConfig;
324
325 #[test]
326 fn test_client_config_set_map() {
327 let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
328 .into_iter()
329 .collect();
330 config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
331
332 assert_eq!(config.get("a").unwrap(), "1");
333 assert_eq!(config.get("b").unwrap(), "2");
334 assert_eq!(config.get("c").unwrap(), "3");
335 }
336}