use std::error::Error;
use std::ffi::{CStr, CString};
use std::mem::ManuallyDrop;
use std::os::raw::c_char;
use std::ptr;
use std::string::ToString;
use std::sync::Arc;
use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::admin::NativeEvent;
use crate::config::{ClientConfig, NativeClientConfig, RDKafkaLogLevel};
use crate::consumer::RebalanceProtocol;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::log::{debug, error, info, trace, warn};
use crate::metadata::Metadata;
use crate::mocking::MockCluster;
use crate::statistics::Statistics;
use crate::util::{self, ErrBuf, KafkaDrop, NativePtr, Timeout};
pub trait ClientContext: Send + Sync {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = false;
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
match level {
RDKafkaLogLevel::Emerg
| RDKafkaLogLevel::Alert
| RDKafkaLogLevel::Critical
| RDKafkaLogLevel::Error => {
error!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Warning => {
warn!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Notice => {
info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Info => {
info!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
RDKafkaLogLevel::Debug => {
debug!(target: "librdkafka", "librdkafka: {} {}", fac, log_message)
}
}
}
fn stats(&self, statistics: Statistics) {
info!("Client stats: {:?}", statistics);
}
fn stats_raw(&self, statistics: &[u8]) {
match serde_json::from_slice(statistics) {
Ok(stats) => self.stats(stats),
Err(e) => error!("Could not parse statistics JSON: {}", e),
}
}
fn error(&self, error: KafkaError, reason: &str) {
error!("librdkafka: {}: {}", error, reason);
}
fn generate_oauth_token(
&self,
_oauthbearer_config: Option<&str>,
) -> Result<OAuthToken, Box<dyn Error>> {
Err("Default implementation of generate_oauth_token must be overridden".into())
}
}
#[derive(Clone, Debug, Default)]
pub struct DefaultClientContext;
impl ClientContext for DefaultClientContext {}
pub struct NativeClient {
ptr: NativePtr<RDKafka>,
}
unsafe impl KafkaDrop for RDKafka {
const TYPE: &'static str = "client";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_destroy;
}
unsafe impl Sync for NativeClient {}
unsafe impl Send for NativeClient {}
impl NativeClient {
pub(crate) unsafe fn from_ptr(ptr: *mut RDKafka) -> NativeClient {
NativeClient {
ptr: NativePtr::from_ptr(ptr).unwrap(),
}
}
pub fn ptr(&self) -> *mut RDKafka {
self.ptr.ptr()
}
pub(crate) fn rebalance_protocol(&self) -> RebalanceProtocol {
let protocol = unsafe { rdsys::rd_kafka_rebalance_protocol(self.ptr()) };
if protocol.is_null() {
RebalanceProtocol::None
} else {
let protocol = unsafe { CStr::from_ptr(protocol) };
match protocol.to_bytes() {
b"NONE" => RebalanceProtocol::None,
b"EAGER" => RebalanceProtocol::Eager,
b"COOPERATIVE" => RebalanceProtocol::Cooperative,
_ => unreachable!(),
}
}
}
}
pub(crate) enum EventPollResult<T> {
None,
EventConsumed,
Event(T),
}
impl<T> From<EventPollResult<T>> for Option<T> {
fn from(val: EventPollResult<T>) -> Self {
match val {
EventPollResult::None | EventPollResult::EventConsumed => None,
EventPollResult::Event(evt) => Some(evt),
}
}
}
pub struct Client<C: ClientContext = DefaultClientContext> {
native: NativeClient,
context: Arc<C>,
}
impl<C: ClientContext> Client<C> {
pub fn new(
config: &ClientConfig,
native_config: NativeClientConfig,
rd_kafka_type: RDKafkaType,
context: C,
) -> KafkaResult<Client<C>> {
Self::new_context_arc(config, native_config, rd_kafka_type, Arc::new(context))
}
pub(crate) fn new_context_arc(
config: &ClientConfig,
native_config: NativeClientConfig,
rd_kafka_type: RDKafkaType,
context: Arc<C>,
) -> KafkaResult<Client<C>> {
let mut err_buf = ErrBuf::new();
unsafe {
rdsys::rd_kafka_conf_set_opaque(
native_config.ptr(),
Arc::as_ptr(&context) as *mut c_void,
)
};
native_config.set("log.queue", "true")?;
let client_ptr = unsafe {
let native_config = ManuallyDrop::new(native_config);
rdsys::rd_kafka_new(
rd_kafka_type,
native_config.ptr(),
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
trace!("Create new librdkafka client {:p}", client_ptr);
if client_ptr.is_null() {
return Err(KafkaError::ClientCreation(err_buf.to_string()));
}
let ret = unsafe {
rdsys::rd_kafka_set_log_queue(client_ptr, rdsys::rd_kafka_queue_get_main(client_ptr))
};
if ret.is_error() {
return Err(KafkaError::Global(ret.into()));
}
unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
Ok(Client {
native: unsafe { NativeClient::from_ptr(client_ptr) },
context,
})
}
pub fn native_client(&self) -> &NativeClient {
&self.native
}
pub fn native_ptr(&self) -> *mut RDKafka {
self.native.ptr.ptr()
}
pub fn context(&self) -> &Arc<C> {
&self.context
}
pub(crate) fn poll_event<T: Into<Timeout>>(
&self,
queue: &NativeQueue,
timeout: T,
) -> EventPollResult<NativeEvent> {
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
if let Some(ev) = event {
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_LOG => {
self.handle_log_event(ev.ptr());
return EventPollResult::EventConsumed;
}
rdsys::RD_KAFKA_EVENT_STATS => {
self.handle_stats_event(ev.ptr());
return EventPollResult::EventConsumed;
}
rdsys::RD_KAFKA_EVENT_ERROR => {
self.handle_error_event(ev.ptr());
return EventPollResult::Event(ev);
}
rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH => {
if C::ENABLE_REFRESH_OAUTH_TOKEN {
self.handle_oauth_refresh_event(ev.ptr());
}
return EventPollResult::EventConsumed;
}
_ => {
return EventPollResult::Event(ev);
}
}
}
EventPollResult::None
}
fn handle_log_event(&self, event: *mut RDKafkaEvent) {
let mut fac: *const c_char = std::ptr::null();
let mut str_: *const c_char = std::ptr::null();
let mut level: i32 = 0;
let result = unsafe { rdsys::rd_kafka_event_log(event, &mut fac, &mut str_, &mut level) };
if result == 0 {
let fac = unsafe { CStr::from_ptr(fac).to_string_lossy() };
let log_message = unsafe { CStr::from_ptr(str_).to_string_lossy() };
self.context().log(
RDKafkaLogLevel::from_int(level),
fac.trim(),
log_message.trim(),
);
}
}
fn handle_stats_event(&self, event: *mut RDKafkaEvent) {
let json = unsafe { CStr::from_ptr(rdsys::rd_kafka_event_stats(event)) };
self.context().stats_raw(json.to_bytes());
}
fn handle_error_event(&self, event: *mut RDKafkaEvent) {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event) };
let error = KafkaError::Global(rdkafka_err.into());
let reason =
unsafe { CStr::from_ptr(rdsys::rd_kafka_event_error_string(event)).to_string_lossy() };
self.context().error(error, reason.trim());
}
fn handle_oauth_refresh_event(&self, event: *mut RDKafkaEvent) {
let oauthbearer_config = unsafe { rdsys::rd_kafka_event_config_string(event) };
let res: Result<_, Box<dyn Error>> = (|| {
let oauthbearer_config = match oauthbearer_config.is_null() {
true => None,
false => unsafe { Some(util::cstr_to_owned(oauthbearer_config)) },
};
let token_info = self
.context()
.generate_oauth_token(oauthbearer_config.as_deref())?;
let token = CString::new(token_info.token)?;
let principal_name = CString::new(token_info.principal_name)?;
Ok((token, principal_name, token_info.lifetime_ms))
})();
match res {
Ok((token, principal_name, lifetime_ms)) => {
let mut err_buf = ErrBuf::new();
let code = unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token(
self.native_ptr(),
token.as_ptr(),
lifetime_ms,
principal_name.as_ptr(),
ptr::null_mut(),
0,
err_buf.as_mut_ptr(),
err_buf.capacity(),
)
};
if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
debug!("successfully set refreshed OAuth token");
} else {
debug!(
"failed to set refreshed OAuth token (code {:?}): {}",
code, err_buf
);
unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(
self.native_ptr(),
err_buf.as_mut_ptr(),
)
};
}
}
Err(e) => {
debug!("failed to refresh OAuth token: {}", e);
let message = match CString::new(e.to_string()) {
Ok(message) => message,
Err(e) => {
error!(
"error message generated while refreshing OAuth token has embedded null character: {}",
e
);
CString::new(
"error while refreshing OAuth token has embedded null character",
)
.expect("known to be a valid CString")
}
};
unsafe {
rdkafka_sys::rd_kafka_oauthbearer_set_token_failure(
self.native_ptr(),
message.as_ptr(),
)
};
}
}
}
pub fn fetch_metadata<T: Into<Timeout>>(
&self,
topic: Option<&str>,
timeout: T,
) -> KafkaResult<Metadata> {
let mut metadata_ptr: *const RDKafkaMetadata = ptr::null_mut();
let (flag, native_topic) = if let Some(topic_name) = topic {
(0, Some(self.native_topic(topic_name)?))
} else {
(1, None)
};
trace!("Starting metadata fetch");
let ret = unsafe {
rdsys::rd_kafka_metadata(
self.native_ptr(),
flag,
native_topic.map(|t| t.ptr()).unwrap_or_else(ptr::null_mut),
&mut metadata_ptr as *mut *const RDKafkaMetadata,
timeout.into().as_millis(),
)
};
trace!("Metadata fetch completed");
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret.into()));
}
Ok(unsafe { Metadata::from_ptr(metadata_ptr) })
}
pub fn fetch_watermarks<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
let mut low = -1;
let mut high = -1;
let topic_c = CString::new(topic.to_string())?;
let ret = unsafe {
rdsys::rd_kafka_query_watermark_offsets(
self.native_ptr(),
topic_c.as_ptr(),
partition,
&mut low as *mut i64,
&mut high as *mut i64,
timeout.into().as_millis(),
)
};
if ret.is_error() {
return Err(KafkaError::MetadataFetch(ret.into()));
}
Ok((low, high))
}
pub fn fetch_cluster_id<T: Into<Timeout>>(&self, timeout: T) -> Option<String> {
let cluster_id =
unsafe { rdsys::rd_kafka_clusterid(self.native_ptr(), timeout.into().as_millis()) };
if cluster_id.is_null() {
return None;
}
let buf = unsafe { CStr::from_ptr(cluster_id).to_bytes() };
String::from_utf8(buf.to_vec()).ok()
}
pub fn fetch_group_list<T: Into<Timeout>>(
&self,
group: Option<&str>,
timeout: T,
) -> KafkaResult<GroupList> {
let group_c = CString::new(group.map_or("".to_string(), ToString::to_string))?;
let group_c_ptr = if group.is_some() {
group_c.as_ptr()
} else {
ptr::null_mut()
};
let mut group_list_ptr: *const RDKafkaGroupList = ptr::null_mut();
trace!("Starting group list fetch");
let ret = unsafe {
rdsys::rd_kafka_list_groups(
self.native_ptr(),
group_c_ptr,
&mut group_list_ptr as *mut *const RDKafkaGroupList,
timeout.into().as_millis(),
)
};
trace!("Group list fetch completed");
if ret.is_error() {
return Err(KafkaError::GroupListFetch(ret.into()));
}
Ok(unsafe { GroupList::from_ptr(group_list_ptr) })
}
pub fn fatal_error(&self) -> Option<(RDKafkaErrorCode, String)> {
let mut err_buf = ErrBuf::new();
let code = unsafe {
rdsys::rd_kafka_fatal_error(self.native_ptr(), err_buf.as_mut_ptr(), err_buf.capacity())
};
if code == RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR {
None
} else {
Some((code.into(), err_buf.to_string()))
}
}
pub fn mock_cluster(&self) -> Option<MockCluster<'_, C>> {
MockCluster::from_client(self)
}
pub(crate) fn native_topic(&self, topic: &str) -> KafkaResult<NativeTopic> {
let topic_c = CString::new(topic.to_string())?;
Ok(unsafe {
NativeTopic::from_ptr(rdsys::rd_kafka_topic_new(
self.native_ptr(),
topic_c.as_ptr(),
ptr::null_mut(),
))
.unwrap()
})
}
pub(crate) fn new_native_queue(&self) -> NativeQueue {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_new(self.native_ptr())).unwrap() }
}
pub(crate) fn consumer_queue(&self) -> Option<NativeQueue> {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_consumer(self.native_ptr())) }
}
pub(crate) fn main_queue(&self) -> NativeQueue {
unsafe { NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_main(self.native_ptr())).unwrap() }
}
}
pub(crate) type NativeTopic = NativePtr<RDKafkaTopic>;
unsafe impl KafkaDrop for RDKafkaTopic {
const TYPE: &'static str = "native topic";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_destroy;
}
unsafe impl Send for NativeTopic {}
unsafe impl Sync for NativeTopic {}
pub(crate) type NativeQueue = NativePtr<RDKafkaQueue>;
unsafe impl KafkaDrop for RDKafkaQueue {
const TYPE: &'static str = "queue";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_queue_destroy;
}
unsafe impl Sync for NativeQueue {}
unsafe impl Send for NativeQueue {}
impl NativeQueue {
pub fn poll<T: Into<Timeout>>(&self, t: T) -> *mut RDKafkaEvent {
unsafe { rdsys::rd_kafka_queue_poll(self.ptr(), t.into().as_millis()) }
}
}
pub struct OAuthToken {
pub token: String,
pub principal_name: String,
pub lifetime_ms: i64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ClientConfig;
#[test]
fn test_client() {
let config = ClientConfig::new();
let native_config = config.create_native_config().unwrap();
let client = Client::new(
&config,
native_config,
RDKafkaType::RD_KAFKA_PRODUCER,
DefaultClientContext,
)
.unwrap();
assert!(!client.native_ptr().is_null());
}
}