use log::LogLevel;
use rdsys::types::*;
use rdsys;
use client::Context;
use error::{KafkaError, KafkaResult, IsError};
use util::bytes_cstr_to_owned;
use std::collections::HashMap;
use std::ffi::CString;
use std::mem;
const ERR_LEN: usize = 256;
#[derive(Copy, Clone, Debug)]
pub enum RDKafkaLogLevel {
Emerg = 0,
Alert = 1,
Critical = 2,
Error = 3,
Warning = 4,
Notice = 5,
Info = 6,
Debug = 7,
}
impl RDKafkaLogLevel {
pub fn from_int(level: i32) -> RDKafkaLogLevel {
match level {
0 => RDKafkaLogLevel::Emerg,
1 => RDKafkaLogLevel::Alert,
2 => RDKafkaLogLevel::Critical,
3 => RDKafkaLogLevel::Error,
4 => RDKafkaLogLevel::Warning,
5 => RDKafkaLogLevel::Notice,
6 => RDKafkaLogLevel::Info,
_ => RDKafkaLogLevel::Debug,
}
}
}
pub struct NativeClientConfig {
ptr: *mut RDKafkaConf,
}
impl NativeClientConfig {
pub fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
NativeClientConfig {ptr: ptr}
}
pub fn ptr(&self) -> *mut RDKafkaConf {
self.ptr
}
pub fn ptr_move(self) -> *mut RDKafkaConf {
let ptr = self.ptr;
mem::forget(self);
ptr
}
}
impl Drop for NativeClientConfig {
fn drop(&mut self) {
trace!("Drop NativeClientConfig {:p}", self.ptr());
unsafe { rdsys::rd_kafka_conf_destroy(self.ptr) }
}
}
#[derive(Clone)]
pub struct ClientConfig {
conf_map: HashMap<String, String>,
default_topic_config: Option<TopicConfig>,
pub log_level: RDKafkaLogLevel,
}
impl Default for ClientConfig {
fn default() -> Self {
Self::new()
}
}
impl ClientConfig {
pub fn new() -> ClientConfig {
ClientConfig {
conf_map: HashMap::new(),
default_topic_config: None,
log_level: log_level_from_global_config(),
}
}
pub fn set<'a>(&'a mut self, key: &str, value: &str) -> &'a mut ClientConfig {
self.conf_map.insert(key.to_string(), value.to_string());
self
}
pub fn set_default_topic_config(&mut self, default_topic_config: TopicConfig) -> &mut ClientConfig {
self.default_topic_config = Some(default_topic_config);
self
}
pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
self.log_level = log_level;
self
}
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
let conf = unsafe { rdsys::rd_kafka_conf_new() };
let errstr = [0; ERR_LEN];
for (key, value) in &self.conf_map {
let key_c = CString::new(key.to_string())?;
let value_c = CString::new(value.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_conf_set(conf, key_c.as_ptr(), value_c.as_ptr(),
errstr.as_ptr() as *mut i8, errstr.len())
};
if ret.is_error() {
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
return Err(KafkaError::ClientConfig(ret, descr, key.to_string(), value.to_string()));
}
}
if let Some(ref topic_config) = self.default_topic_config {
let native_topic_config = topic_config.create_native_config()?;
unsafe { rdsys::rd_kafka_conf_set_default_topic_conf(conf, native_topic_config.ptr_move()) };
};
Ok(NativeClientConfig::from_ptr(conf))
}
pub fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
T::from_config(self)
}
pub fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
where C: Context,
T: FromClientConfigAndContext<C> {
T::from_config_and_context(self, context)
}
}
fn log_level_from_global_config() -> RDKafkaLogLevel {
if log_enabled!(LogLevel::Debug) {
RDKafkaLogLevel::Debug
} else if log_enabled!(LogLevel::Info) {
RDKafkaLogLevel::Info
} else if log_enabled!(LogLevel::Warn) {
RDKafkaLogLevel::Warning
} else {
RDKafkaLogLevel::Error
}
}
pub trait FromClientConfig: Sized {
fn from_config(&ClientConfig) -> KafkaResult<Self>;
}
pub trait FromClientConfigAndContext<C: Context>: Sized {
fn from_config_and_context(&ClientConfig, C) -> KafkaResult<Self>;
}
pub struct NativeTopicConfig {
ptr: *mut RDKafkaTopicConf,
}
impl NativeTopicConfig {
pub fn from_ptr(ptr: *mut RDKafkaTopicConf) -> NativeTopicConfig {
NativeTopicConfig {ptr: ptr}
}
pub fn ptr(&self) -> *mut RDKafkaTopicConf {
self.ptr
}
pub fn ptr_move(self) -> *mut RDKafkaTopicConf {
let ptr = self.ptr;
mem::forget(self);
ptr
}
}
impl Drop for NativeTopicConfig {
fn drop(&mut self) {
trace!("Drop NativeTopicConfig {:p}", self.ptr);
unsafe { rdsys::rd_kafka_topic_conf_destroy(self.ptr) }
}
}
#[derive(Clone, Default)]
pub struct TopicConfig {
conf_map: HashMap<String, String>,
}
impl TopicConfig {
pub fn new() -> TopicConfig {
TopicConfig {
conf_map: HashMap::new(),
}
}
pub fn set(&mut self, key: &str, value: &str) -> &mut TopicConfig {
self.conf_map.insert(key.to_string(), value.to_string());
self
}
pub fn finalize(&self) -> TopicConfig {
TopicConfig { conf_map: self.conf_map.clone() }
}
pub fn create_native_config(&self) -> KafkaResult<NativeTopicConfig> {
let config_ptr = unsafe { rdsys::rd_kafka_topic_conf_new() };
let errstr = [0; ERR_LEN];
for (name, value) in &self.conf_map {
let name_c = CString::new(name.to_string())?;
let value_c = CString::new(value.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_topic_conf_set(config_ptr, name_c.as_ptr(), value_c.as_ptr(),
errstr.as_ptr() as *mut i8, errstr.len())
};
if ret.is_error() {
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
return Err(KafkaError::TopicConfig(ret, descr, name.to_string(), value.to_string()));
}
}
Ok(NativeTopicConfig::from_ptr(config_ptr))
}
}