use rdsys;
use rdsys::types::*;
use std::ffi::{CString, CStr};
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use serde_json;
use config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
use error::{IsError, KafkaError, KafkaResult};
use groups::GroupList;
use metadata::Metadata;
use statistics::Statistics;
use util::bytes_cstr_to_owned;
pub trait Context: Send + Sync {
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
match level {
RDKafkaLogLevel::Emerg => error!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Alert => error!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Critical => error!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Error => error!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Warning => warn!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Notice => info!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Info => info!("librdkafka: {} {}", fac, log_message),
RDKafkaLogLevel::Debug => debug!("librdkafka: {} {}", fac, log_message),
}
}
fn stats(&self, statistics: Statistics) {
info!("Client stats: {:?}", statistics);
}
}
#[derive(Clone, Default)]
pub struct EmptyContext;
impl Context for EmptyContext { }
impl EmptyContext {
pub fn new() -> EmptyContext {
EmptyContext {}
}
}
pub struct NativeClient {
ptr: *mut RDKafka,
}
unsafe impl Sync for NativeClient {}
unsafe impl Send for NativeClient {}
impl NativeClient {
pub fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
trace!("Create new NativeClient {:p}", ptr);
NativeClient {ptr: ptr}
}
pub fn ptr(&self) -> *mut RDKafka {
self.ptr
}
}
impl Drop for NativeClient {
fn drop(&mut self) {
trace!("Destroy rd_kafka {:p}", self.ptr);
unsafe {
rdsys::rd_kafka_destroy(self.ptr);
rdsys::rd_kafka_wait_destroyed(1000);
}
}
}
pub struct Client<C: Context> {
native: NativeClient,
context: Box<C>,
}
impl<C: Context> Client<C> {
pub fn new(config: &ClientConfig, native_config: NativeClientConfig, rd_kafka_type: RDKafkaType,
context: C)
-> KafkaResult<Client<C>> {
let errstr = [0i8; 1024];
let mut boxed_context = Box::new(context);
unsafe { rdsys::rd_kafka_conf_set_opaque(
native_config.ptr(), (&mut *boxed_context) as *mut C as *mut c_void) };
unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::<C>)) };
unsafe { rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::<C>)) };
let client_ptr = unsafe {
rdsys::rd_kafka_new(rd_kafka_type, native_config.ptr_move(), errstr.as_ptr() as *mut i8, errstr.len())
};
if client_ptr.is_null() {
let descr = unsafe { bytes_cstr_to_owned(&errstr) };
return Err(KafkaError::ClientCreation(descr));
}
unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
Ok(Client {
native: NativeClient::from_ptr(client_ptr),
context: boxed_context,
})
}
pub fn native_ptr(&self) -> *mut RDKafka {
self.native.ptr
}
pub fn context(&self) -> &C {
self.context.as_ref()
}
pub fn fetch_metadata(&self, timeout_ms: i32) -> KafkaResult<Metadata> {
let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
trace!("Starting metadata fetch");
let ret = unsafe {
rdsys::rd_kafka_metadata(
self.native_ptr(),
1, ptr::null::<u8>() as *mut RDKafkaTopic,
&mut metadata_ptr as *mut *const RDKafkaMetadata,
timeout_ms)
};
trace!("Metadata fetch completed");
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret));
}
Ok(Metadata::from_ptr(metadata_ptr))
}
pub fn fetch_watermarks(&self, topic: &str, partition: i32, timeout_ms: i32) -> KafkaResult<(i64, i64)> {
let mut low = -1;
let mut high = -1;
let topic_c = CString::new(topic.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_query_watermark_offsets(self.native_ptr(), topic_c.as_ptr(), partition,
&mut low as *mut i64, &mut high as *mut i64, timeout_ms)
};
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret));
}
Ok((low, high))
}
pub fn fetch_group_list(&self, group: Option<&str>, timeout_ms: i32) -> KafkaResult<GroupList> {
let group_c = CString::new(group.map_or("".to_string(), |g| g.to_string()))?;
let group_c_ptr = if group.is_some() {
group_c.as_ptr()
} else {
ptr::null_mut()
};
let mut group_list_ptr: *const RDKafkaGroupList = ptr::null_mut();
trace!("Starting group list fetch");
let ret = unsafe {
rdsys::rd_kafka_list_groups(
self.native_ptr(),
group_c_ptr,
&mut group_list_ptr as *mut *const RDKafkaGroupList,
timeout_ms)
};
trace!("Group list fetch completed");
if ret.is_error() {
return Err(KafkaError::GroupListFetch(ret));
}
Ok(GroupList::from_ptr(group_list_ptr))
}
}
pub unsafe extern "C" fn native_log_cb<C: Context>(
client: *const RDKafka, level: i32,
fac: *const i8, buf: *const i8) {
let fac = CStr::from_ptr(fac).to_string_lossy();
let log_message = CStr::from_ptr(buf).to_string_lossy();
let context = Box::from_raw(rdsys::rd_kafka_opaque(client) as *mut C);
(*context).log(RDKafkaLogLevel::from_int(level), fac.trim(), log_message.trim());
mem::forget(context); }
pub unsafe extern "C" fn native_stats_cb<C: Context>(
_conf: *mut RDKafka, json: *mut i8, json_len: usize,
opaque: *mut c_void) -> i32 {
let context = Box::from_raw(opaque as *mut C);
let statistics_json = String::from_raw_parts(json as *mut u8, json_len, json_len);
match serde_json::from_str(&statistics_json) {
Ok(stats) => (*context).stats(stats),
Err(e) => error!("Could not parse statistics json: {}", e)
}
mem::forget(context);
1 }
#[cfg(test)]
mod tests {
extern crate rdkafka_sys as rdsys;
use rdsys::types::*;
use config::ClientConfig;
use super::*;
#[test]
fn test_client() {
let config = ClientConfig::new();
let native_config = config.create_native_config().unwrap();
let client = Client::new(&config, native_config, RDKafkaType::RD_KAFKA_PRODUCER,
EmptyContext::new()).unwrap();
assert!(!client.native_ptr().is_null());
}
}