use std::cmp;
use std::ffi::CString;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::sync::Arc;
use log::trace;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::{Client, NativeClient, NativeQueue};
use crate::config::{
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::message::{BorrowedMessage, Message};
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, NativePtr, Timeout};
pub(crate) unsafe extern "C" fn native_commit_cb<C: ConsumerContext>(
_conf: *mut RDKafka,
err: RDKafkaRespErr,
offsets: *mut RDKafkaTopicPartitionList,
opaque_ptr: *mut c_void,
) {
let context = &mut *(opaque_ptr as *mut C);
let commit_error = if err.is_error() {
Err(KafkaError::ConsumerCommit(err.into()))
} else {
Ok(())
};
let tpl = TopicPartitionList::from_ptr(offsets);
context.commit_callback(commit_error, &tpl);
mem::forget(tpl);
}
unsafe extern "C" fn native_rebalance_cb<C: ConsumerContext>(
rk: *mut RDKafka,
err: RDKafkaRespErr,
native_tpl: *mut RDKafkaTopicPartitionList,
opaque_ptr: *mut c_void,
) {
let context = &mut *(opaque_ptr as *mut C);
let native_client = NativeClient::from_ptr(rk);
let mut tpl = TopicPartitionList::from_ptr(native_tpl);
context.rebalance(&native_client, err, &mut tpl);
mem::forget(native_client);
mem::forget(tpl);
}
unsafe extern "C" fn native_message_queue_nonempty_cb<C: ConsumerContext>(
_: *mut RDKafka,
opaque_ptr: *mut c_void,
) {
let context = &mut *(opaque_ptr as *mut C);
(*context).message_queue_nonempty_callback();
}
unsafe fn enable_nonempty_callback<C: ConsumerContext>(queue: &NativeQueue, context: &Arc<C>) {
rdsys::rd_kafka_queue_cb_event_enable(
queue.ptr(),
Some(native_message_queue_nonempty_cb::<C>),
Arc::as_ptr(context) as *mut c_void,
)
}
pub struct BaseConsumer<C = DefaultConsumerContext>
where
C: ConsumerContext,
{
client: Client<C>,
main_queue_min_poll_interval: Timeout,
_queue: Option<NativeQueue>,
}
impl FromClientConfig for BaseConsumer {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
}
}
impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
BaseConsumer::new(config, config.create_native_config()?, context)
}
}
impl<C> BaseConsumer<C>
where
C: ConsumerContext,
{
pub(crate) fn new(
config: &ClientConfig,
native_config: NativeClientConfig,
context: C,
) -> KafkaResult<BaseConsumer<C>> {
unsafe {
rdsys::rd_kafka_conf_set_rebalance_cb(
native_config.ptr(),
Some(native_rebalance_cb::<C>),
);
rdsys::rd_kafka_conf_set_offset_commit_cb(
native_config.ptr(),
Some(native_commit_cb::<C>),
);
}
let main_queue_min_poll_interval = context.main_queue_min_poll_interval();
let client = Client::new(
config,
native_config,
RDKafkaType::RD_KAFKA_CONSUMER,
context,
)?;
let queue = client.consumer_queue();
if let Some(queue) = &queue {
unsafe {
enable_nonempty_callback(queue, client.context());
}
}
Ok(BaseConsumer {
client,
main_queue_min_poll_interval,
_queue: queue,
})
}
pub(crate) fn poll_raw(&self, mut timeout: Timeout) -> Option<NativePtr<RDKafkaMessage>> {
loop {
unsafe { rdsys::rd_kafka_poll(self.client.native_ptr(), 0) };
let op_timeout = cmp::min(timeout, self.main_queue_min_poll_interval);
let message_ptr = unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(
self.client.native_ptr(),
op_timeout.as_millis(),
))
};
if let Some(message_ptr) = message_ptr {
break Some(message_ptr);
}
if op_timeout >= timeout {
break None;
}
timeout -= op_timeout;
}
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
self.poll_raw(timeout.into())
.map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, self) })
}
pub fn iter(&self) -> Iter<'_, C> {
Iter(self)
}
pub fn split_partition_queue(
self: &Arc<Self>,
topic: &str,
partition: i32,
) -> Option<PartitionQueue<C>> {
let topic = match CString::new(topic) {
Ok(topic) => topic,
Err(_) => return None,
};
let queue = unsafe {
NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
self.client.native_ptr(),
topic.as_ptr(),
partition,
))
};
queue.map(|queue| {
unsafe {
enable_nonempty_callback(&queue, self.client.context());
rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
}
PartitionQueue::new(self.clone(), queue)
})
}
}
impl<C> Consumer<C> for BaseConsumer<C>
where
C: ConsumerContext,
{
fn client(&self) -> &Client<C> {
&self.client
}
fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
let ptr = unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
self.client.native_ptr(),
))
}?;
Some(ConsumerGroupMetadata(ptr))
}
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
let mut tpl = TopicPartitionList::new();
for topic in topics {
tpl.add_topic_unassigned(topic);
}
let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn unsubscribe(&self) {
unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
}
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
let ret_code =
unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn seek<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
offset: Offset,
timeout: T,
) -> KafkaResult<()> {
let topic = self.client.native_topic(topic)?;
let ret_code = match offset.to_raw() {
Some(offset) => unsafe {
rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
},
None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
};
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Seek(error));
};
Ok(())
}
fn commit(
&self,
topic_partition_list: &TopicPartitionList,
mode: CommitMode,
) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_commit(
self.client.native_ptr(),
topic_partition_list.ptr(),
mode as i32,
)
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error.into()))
} else {
Ok(())
}
}
fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error.into()))
} else {
Ok(())
}
}
fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error.into()))
} else {
Ok(())
}
}
fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
};
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}
fn subscription(&self) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
if error.is_error() {
Err(KafkaError::MetadataFetch(error.into()))
} else {
Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
}
}
fn assignment(&self) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
if error.is_error() {
Err(KafkaError::MetadataFetch(error.into()))
} else {
Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
}
}
fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let assignment_error =
unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
if assignment_error.is_error() {
return Err(KafkaError::MetadataFetch(assignment_error.into()));
}
self.committed_offsets(unsafe { TopicPartitionList::from_ptr(tpl_ptr) }, timeout)
}
fn committed_offsets<T: Into<Timeout>>(
&self,
tpl: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let committed_error = unsafe {
rdsys::rd_kafka_committed(
self.client.native_ptr(),
tpl.ptr(),
timeout.into().as_millis(),
)
};
if committed_error.is_error() {
Err(KafkaError::MetadataFetch(committed_error.into()))
} else {
Ok(tpl)
}
}
fn offsets_for_timestamp<T: Into<Timeout>>(
&self,
timestamp: i64,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let assignment_error =
unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
if assignment_error.is_error() {
return Err(KafkaError::MetadataFetch(assignment_error.into()));
}
let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
tpl.set_all_offsets(Offset::Offset(timestamp))?;
self.offsets_for_times(tpl, timeout)
}
fn offsets_for_times<T: Into<Timeout>>(
&self,
timestamps: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let offsets_for_times_error = unsafe {
rdsys::rd_kafka_offsets_for_times(
self.client.native_ptr(),
timestamps.ptr(),
timeout.into().as_millis(),
)
};
if offsets_for_times_error.is_error() {
Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
} else {
Ok(timestamps)
}
}
fn position(&self) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let error = unsafe {
rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr);
rdsys::rd_kafka_position(self.client.native_ptr(), tpl_ptr)
};
if error.is_error() {
Err(KafkaError::MetadataFetch(error.into()))
} else {
Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
}
}
fn fetch_metadata<T: Into<Timeout>>(
&self,
topic: Option<&str>,
timeout: T,
) -> KafkaResult<Metadata> {
self.client.fetch_metadata(topic, timeout)
}
fn fetch_watermarks<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
self.client.fetch_watermarks(topic, partition, timeout)
}
fn fetch_group_list<T: Into<Timeout>>(
&self,
group: Option<&str>,
timeout: T,
) -> KafkaResult<GroupList> {
self.client.fetch_group_list(group, timeout)
}
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code =
unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code = unsafe {
rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
};
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}
}
impl<C> Drop for BaseConsumer<C>
where
C: ConsumerContext,
{
fn drop(&mut self) {
trace!("Destroying consumer: {:?}", self.client.native_ptr());
unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) };
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
}
}
pub struct Iter<'a, C>(&'a BaseConsumer<C>)
where
C: ConsumerContext;
impl<'a, C> Iterator for Iter<'a, C>
where
C: ConsumerContext,
{
type Item = KafkaResult<BorrowedMessage<'a>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(item) = self.0.poll(None) {
return Some(item);
}
}
}
}
impl<'a, C> IntoIterator for &'a BaseConsumer<C>
where
C: ConsumerContext,
{
type Item = KafkaResult<BorrowedMessage<'a>>;
type IntoIter = Iter<'a, C>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct PartitionQueue<C>
where
C: ConsumerContext,
{
consumer: Arc<BaseConsumer<C>>,
queue: NativeQueue,
}
impl<C> PartitionQueue<C>
where
C: ConsumerContext,
{
fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
PartitionQueue { consumer, queue }
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(
self.queue.ptr(),
timeout.into().as_millis(),
))
}
.map(|ptr| unsafe { BorrowedMessage::from_consumer(ptr, &self.consumer) })
}
}