use std::collections::{BTreeMap, HashMap};
use std::ffi::CString;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::os::raw::c_char;
use std::ptr;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::ClientContext;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{log_enabled, DEBUG, INFO, WARN};
use crate::util::{ErrBuf, KafkaDrop, NativePtr};
const SENSITIVE_CONFIG_KEYS: &[&str] = &[
"sasl.password",
"ssl.key.password",
"ssl.keystore.password",
"ssl.truststore.password",
"sasl.oauthbearer.client.secret",
];
const SANITIZED_VALUE_PLACEHOLDER: &str = "[sanitized for safety]";
#[derive(Copy, Clone, Debug)]
pub enum RDKafkaLogLevel {
Emerg = 0,
Alert = 1,
Critical = 2,
Error = 3,
Warning = 4,
Notice = 5,
Info = 6,
Debug = 7,
}
impl RDKafkaLogLevel {
pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
match level {
0 => RDKafkaLogLevel::Emerg,
1 => RDKafkaLogLevel::Alert,
2 => RDKafkaLogLevel::Critical,
3 => RDKafkaLogLevel::Error,
4 => RDKafkaLogLevel::Warning,
5 => RDKafkaLogLevel::Notice,
6 => RDKafkaLogLevel::Info,
_ => RDKafkaLogLevel::Debug,
}
}
}
pub struct NativeClientConfig {
ptr: NativePtr<RDKafkaConf>,
}
unsafe impl KafkaDrop for RDKafkaConf {
const TYPE: &'static str = "client config";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
}
impl NativeClientConfig {
pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
NativeClientConfig {
ptr: NativePtr::from_ptr(ptr).unwrap(),
}
}
pub fn ptr(&self) -> *mut RDKafkaConf {
self.ptr.ptr()
}
pub fn get(&self, key: &str) -> KafkaResult<String> {
let make_err = |res| {
KafkaError::ClientConfig(
res,
match res {
RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
}
.into(),
key.into(),
"".into(),
)
};
let key_c = CString::new(key.to_string())?;
let mut size = 0_usize;
let res = unsafe {
rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
};
if res.is_error() {
return Err(make_err(res));
}
let mut buf = vec![0_u8; size];
let res = unsafe {
rdsys::rd_kafka_conf_get(
self.ptr(),
key_c.as_ptr(),
buf.as_mut_ptr() as *mut c_char,
&mut size,
)
};
if res.is_error() {
return Err(make_err(res));
}
Ok(String::from_utf8_lossy(&buf)
.trim_matches(char::from(0))
.to_string())
}
pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> {
let mut err_buf = ErrBuf::new();
let key_c = CString::new(key)?;
let value_c = CString::new(value)?;
let ret = unsafe {
rdsys::rd_kafka_conf_set(
self.ptr(),
key_c.as_ptr(),
value_c.as_ptr(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if ret.is_error() {
return Err(KafkaError::ClientConfig(
ret,
err_buf.to_string(),
key.to_string(),
value.to_string(),
));
}
Ok(())
}
}
#[derive(Clone)]
pub struct ClientConfig {
conf_map: HashMap<String, String>,
pub log_level: RDKafkaLogLevel,
}
impl Debug for ClientConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sanitized: BTreeMap<&str, &str> = self
.conf_map
.iter()
.filter_map(|(key, value)| {
if SENSITIVE_CONFIG_KEYS.contains(&key.as_str()) {
None
} else {
Some((key.as_str(), value.as_str()))
}
})
.collect();
let mut debug_struct = f.debug_struct("ClientConfig");
debug_struct.field("log_level", &self.log_level);
debug_struct.field("conf_map", &sanitized);
debug_struct.finish()
}
}
impl Default for ClientConfig {
fn default() -> Self {
Self::new()
}
}
impl ClientConfig {
pub fn new() -> ClientConfig {
ClientConfig {
conf_map: HashMap::new(),
log_level: log_level_from_global_config(),
}
}
pub fn config_map(&self) -> BTreeMap<&str, &str> {
self.conf_map
.iter()
.map(|(key, value)| {
if SENSITIVE_CONFIG_KEYS.contains(&key.as_str()) {
(key.as_str(), SANITIZED_VALUE_PLACEHOLDER)
} else {
(key.as_str(), value.as_str())
}
})
.collect()
}
pub fn get(&self, key: &str) -> Option<&str> {
self.conf_map.get(key).map(|val| val.as_str())
}
pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
where
K: Into<String>,
V: Into<String>,
{
self.conf_map.insert(key.into(), value.into());
self
}
pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
self.conf_map.remove(key);
self
}
pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
self.log_level = log_level;
self
}
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
for (key, value) in &self.conf_map {
conf.set(key, value)?;
}
Ok(conf)
}
pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
T::from_config(self)
}
pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
where
C: ClientContext,
T: FromClientConfigAndContext<C>,
{
T::from_config_and_context(self, context)
}
}
impl FromIterator<(String, String)> for ClientConfig {
fn from_iter<I>(iter: I) -> ClientConfig
where
I: IntoIterator<Item = (String, String)>,
{
let mut config = ClientConfig::new();
config.extend(iter);
config
}
}
impl Extend<(String, String)> for ClientConfig {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = (String, String)>,
{
self.conf_map.extend(iter)
}
}
fn log_level_from_global_config() -> RDKafkaLogLevel {
if log_enabled!(target: "librdkafka", DEBUG) {
RDKafkaLogLevel::Debug
} else if log_enabled!(target: "librdkafka", INFO) {
RDKafkaLogLevel::Info
} else if log_enabled!(target: "librdkafka", WARN) {
RDKafkaLogLevel::Warning
} else {
RDKafkaLogLevel::Error
}
}
pub trait FromClientConfig: Sized {
fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
}
pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
}
#[cfg(test)]
mod tests {
use super::ClientConfig;
#[test]
fn test_client_config_set_map() {
let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
.into_iter()
.collect();
config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
assert_eq!(config.get("a").unwrap(), "1");
assert_eq!(config.get("b").unwrap(), "2");
assert_eq!(config.get("c").unwrap(), "3");
}
}