use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::iter::FromIterator;
use std::os::raw::c_char;
use std::ptr;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::ClientContext;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{log_enabled, DEBUG, INFO, WARN};
use crate::util::{ErrBuf, KafkaDrop, NativePtr};
#[derive(Copy, Clone, Debug)]
pub enum RDKafkaLogLevel {
Emerg = 0,
Alert = 1,
Critical = 2,
Error = 3,
Warning = 4,
Notice = 5,
Info = 6,
Debug = 7,
}
impl RDKafkaLogLevel {
pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
match level {
0 => RDKafkaLogLevel::Emerg,
1 => RDKafkaLogLevel::Alert,
2 => RDKafkaLogLevel::Critical,
3 => RDKafkaLogLevel::Error,
4 => RDKafkaLogLevel::Warning,
5 => RDKafkaLogLevel::Notice,
6 => RDKafkaLogLevel::Info,
_ => RDKafkaLogLevel::Debug,
}
}
}
pub struct NativeClientConfig {
ptr: NativePtr<RDKafkaConf>,
}
unsafe impl KafkaDrop for RDKafkaConf {
const TYPE: &'static str = "client config";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
}
impl NativeClientConfig {
pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
NativeClientConfig {
ptr: NativePtr::from_ptr(ptr).unwrap(),
}
}
pub fn ptr(&self) -> *mut RDKafkaConf {
self.ptr.ptr()
}
pub fn get(&self, key: &str) -> KafkaResult<String> {
let make_err = |res| {
KafkaError::ClientConfig(
res,
match res {
RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
}
.into(),
key.into(),
"".into(),
)
};
let key_c = CString::new(key.to_string())?;
let mut size = 0_usize;
let res = unsafe {
rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
};
if res.is_error() {
return Err(make_err(res));
}
let mut buf = vec![0_u8; size];
let res = unsafe {
rdsys::rd_kafka_conf_get(
self.ptr(),
key_c.as_ptr(),
buf.as_mut_ptr() as *mut c_char,
&mut size,
)
};
if res.is_error() {
return Err(make_err(res));
}
Ok(CStr::from_bytes_with_nul(&buf)
.unwrap()
.to_string_lossy()
.into())
}
}
#[derive(Clone, Debug)]
pub struct ClientConfig {
conf_map: HashMap<String, String>,
pub log_level: RDKafkaLogLevel,
}
impl Default for ClientConfig {
fn default() -> Self {
Self::new()
}
}
impl ClientConfig {
pub fn new() -> ClientConfig {
ClientConfig {
conf_map: HashMap::new(),
log_level: log_level_from_global_config(),
}
}
pub fn get(&self, key: &str) -> Option<&str> {
self.conf_map.get(key).map(|val| val.as_str())
}
pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
where
K: Into<String>,
V: Into<String>,
{
self.conf_map.insert(key.into(), value.into());
self
}
pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
self.conf_map.remove(key);
self
}
pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
self.log_level = log_level;
self
}
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
let mut err_buf = ErrBuf::new();
for (key, value) in &self.conf_map {
let key_c = CString::new(key.to_string())?;
let value_c = CString::new(value.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_conf_set(
conf.ptr(),
key_c.as_ptr(),
value_c.as_ptr(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if ret.is_error() {
return Err(KafkaError::ClientConfig(
ret,
err_buf.to_string(),
key.to_string(),
value.to_string(),
));
}
}
Ok(conf)
}
pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
T::from_config(self)
}
pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
where
C: ClientContext,
T: FromClientConfigAndContext<C>,
{
T::from_config_and_context(self, context)
}
}
impl FromIterator<(String, String)> for ClientConfig {
fn from_iter<I>(iter: I) -> ClientConfig
where
I: IntoIterator<Item = (String, String)>,
{
let mut config = ClientConfig::new();
config.extend(iter);
config
}
}
impl Extend<(String, String)> for ClientConfig {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = (String, String)>,
{
self.conf_map.extend(iter)
}
}
fn log_level_from_global_config() -> RDKafkaLogLevel {
if log_enabled!(target: "librdkafka", DEBUG) {
RDKafkaLogLevel::Debug
} else if log_enabled!(target: "librdkafka", INFO) {
RDKafkaLogLevel::Info
} else if log_enabled!(target: "librdkafka", WARN) {
RDKafkaLogLevel::Warning
} else {
RDKafkaLogLevel::Error
}
}
pub trait FromClientConfig: Sized {
fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
}
pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
}
#[cfg(test)]
mod tests {
use super::ClientConfig;
#[test]
fn test_client_config_set_map() {
let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
.into_iter()
.collect();
config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
assert_eq!(config.get("a").unwrap(), "1");
assert_eq!(config.get("b").unwrap(), "2");
assert_eq!(config.get("c").unwrap(), "3");
}
}