use std::ffi::{CStr, CString};
use std::mem::ManuallyDrop;
use std::os::raw::c_void;
use std::ptr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use log::{error, warn};
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::{Client, NativeQueue};
use crate::config::{
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
};
use crate::consumer::{
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext,
RebalanceProtocol,
};
use crate::error::{IsError, KafkaError, KafkaResult, RDKafkaError};
use crate::groups::GroupList;
use crate::log::trace;
use crate::message::{BorrowedMessage, Message};
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, NativePtr, Timeout};
pub struct BaseConsumer<C = DefaultConsumerContext>
where
C: ConsumerContext,
{
client: Client<C>,
queue: NativeQueue,
group_id: Option<String>,
}
impl FromClientConfig for BaseConsumer {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
BaseConsumer::from_config_and_context(config, DefaultConsumerContext)
}
}
impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
BaseConsumer::new(config, config.create_native_config()?, context)
}
}
impl<C> BaseConsumer<C>
where
C: ConsumerContext,
{
pub(crate) fn new(
config: &ClientConfig,
native_config: NativeClientConfig,
context: C,
) -> KafkaResult<BaseConsumer<C>> {
unsafe {
rdsys::rd_kafka_conf_set_events(
native_config.ptr(),
rdsys::RD_KAFKA_EVENT_REBALANCE
| rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT
| rdsys::RD_KAFKA_EVENT_STATS
| rdsys::RD_KAFKA_EVENT_ERROR
| rdsys::RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH,
)
};
let client = Client::new(
config,
native_config,
RDKafkaType::RD_KAFKA_CONSUMER,
context,
)?;
let group_id = config.get("group.id").map(|s| s.to_string());
let queue = if group_id.is_some() {
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
client.consumer_queue().ok_or_else(|| {
KafkaError::ClientCreation("rdkafka consumer queue not available".to_string())
})?
} else {
client.main_queue()
};
Ok(BaseConsumer {
client,
queue,
group_id,
})
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
self.poll_queue(self.get_queue(), timeout)
}
pub(crate) fn poll_queue<T: Into<Timeout>>(
&self,
queue: &NativeQueue,
timeout: T,
) -> Option<KafkaResult<BorrowedMessage<'_>>> {
let now = Instant::now();
let mut timeout = timeout.into();
let min_poll_interval = self.context().main_queue_min_poll_interval();
loop {
let op_timeout = std::cmp::min(timeout, min_poll_interval);
let maybe_event = self.client().poll_event(queue, op_timeout);
if let Some(event) = maybe_event {
let evtype = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
match evtype {
rdsys::RD_KAFKA_EVENT_FETCH => {
if let Some(result) = self.handle_fetch_event(event) {
return Some(result);
}
}
rdsys::RD_KAFKA_EVENT_ERROR => {
if let Some(err) = self.handle_error_event(event) {
return Some(Err(err));
}
}
rdsys::RD_KAFKA_EVENT_REBALANCE => {
self.handle_rebalance_event(event);
if timeout != Timeout::Never {
return None;
}
}
rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => {
self.handle_offset_commit_event(event);
if timeout != Timeout::Never {
return None;
}
}
_ => {
let buf = unsafe {
let evname = rdsys::rd_kafka_event_name(event.ptr());
CStr::from_ptr(evname).to_bytes()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on consumer poll", evname);
}
}
}
timeout = timeout.saturating_sub(now.elapsed());
if timeout.is_zero() {
return None;
}
}
}
fn handle_fetch_event(
&self,
event: NativePtr<RDKafkaEvent>,
) -> Option<KafkaResult<BorrowedMessage<'_>>> {
unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_event_message_next(event.ptr()) as *mut _)
.map(|ptr| BorrowedMessage::from_client(ptr, Arc::new(event), self.client()))
}
}
fn handle_rebalance_event(&self, event: NativePtr<RDKafkaEvent>) {
let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
match err {
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
| rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => {
let tpl = unsafe {
let native_tpl = rdsys::rd_kafka_event_topic_partition_list(event.ptr());
TopicPartitionList::from_ptr(native_tpl)
};
let mut tpl = ManuallyDrop::new(tpl);
self.context()
.rebalance(self.client.native_client(), err, &mut tpl);
}
_ => {
let buf = unsafe {
let err_name =
rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
CStr::from_ptr(err_name).to_bytes()
};
let err = String::from_utf8(buf.to_vec()).unwrap();
warn!("invalid rebalance event: {:?}", err);
}
}
}
fn handle_offset_commit_event(&self, event: NativePtr<RDKafkaEvent>) {
let err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
let commit_error = if err.is_error() {
Err(KafkaError::ConsumerCommit(err.into()))
} else {
Ok(())
};
let offsets = unsafe { rdsys::rd_kafka_event_topic_partition_list(event.ptr()) };
if offsets.is_null() {
let tpl = TopicPartitionList::new();
self.context().commit_callback(commit_error, &tpl);
} else {
let tpl = ManuallyDrop::new(unsafe { TopicPartitionList::from_ptr(offsets) });
self.context().commit_callback(commit_error, &tpl);
}
}
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
if rdkafka_err.is_error() {
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
Some(KafkaError::PartitionEOF(partition))
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
} else {
Some(KafkaError::MessageConsumption(rdkafka_err.into()))
}
} else {
None
}
}
pub fn iter(&self) -> Iter<'_, C> {
Iter(self)
}
pub(crate) fn get_queue(&self) -> &NativeQueue {
&self.queue
}
pub fn split_partition_queue(
self: &Arc<Self>,
topic: &str,
partition: i32,
) -> Option<PartitionQueue<C>> {
let topic = match CString::new(topic) {
Ok(topic) => topic,
Err(_) => return None,
};
let queue = unsafe {
NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
self.client.native_ptr(),
topic.as_ptr(),
partition,
))
};
queue.map(|queue| {
unsafe { rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut()) }
PartitionQueue::new(self.clone(), queue)
})
}
pub fn close_queue(&self) -> KafkaResult<()> {
let err = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_consumer_close_queue(
self.client.native_ptr(),
self.queue.ptr(),
))
};
if err.is_error() {
Err(KafkaError::ConsumerQueueClose(err.code()))
} else {
Ok(())
}
}
pub fn closed(&self) -> bool {
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
}
}
impl<C> Consumer<C> for BaseConsumer<C>
where
C: ConsumerContext,
{
fn client(&self) -> &Client<C> {
&self.client
}
fn group_metadata(&self) -> Option<ConsumerGroupMetadata> {
let ptr = unsafe {
NativePtr::from_ptr(rdsys::rd_kafka_consumer_group_metadata(
self.client.native_ptr(),
))
}?;
Some(ConsumerGroupMetadata(ptr))
}
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
let mut tpl = TopicPartitionList::new();
for topic in topics {
tpl.add_topic_unassigned(topic);
}
let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tpl.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn unsubscribe(&self) {
unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
}
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
let ret_code =
unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), assignment.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn unassign(&self) -> KafkaResult<()> {
let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), ptr::null()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_assign(
self.client.native_ptr(),
assignment.ptr(),
))
};
if ret.is_error() {
let error = ret.name();
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_incremental_unassign(
self.client.native_ptr(),
assignment.ptr(),
))
};
if ret.is_error() {
let error = ret.name();
return Err(KafkaError::Subscription(error));
};
Ok(())
}
fn seek<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
offset: Offset,
timeout: T,
) -> KafkaResult<()> {
let topic = self.client.native_topic(topic)?;
let ret_code = match offset.to_raw() {
Some(offset) => unsafe {
rdsys::rd_kafka_seek(topic.ptr(), partition, offset, timeout.into().as_millis())
},
None => return Err(KafkaError::Seek("Local: Unrepresentable offset".into())),
};
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Seek(error));
};
Ok(())
}
fn seek_partitions<T: Into<Timeout>>(
&self,
topic_partition_list: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let ret = unsafe {
RDKafkaError::from_ptr(rdsys::rd_kafka_seek_partitions(
self.client.native_ptr(),
topic_partition_list.ptr(),
timeout.into().as_millis(),
))
};
if ret.is_error() {
let error = ret.name();
return Err(KafkaError::Seek(error));
}
Ok(topic_partition_list)
}
fn commit(
&self,
topic_partition_list: &TopicPartitionList,
mode: CommitMode,
) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_commit(
self.client.native_ptr(),
topic_partition_list.ptr(),
mode as i32,
)
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error.into()))
} else {
Ok(())
}
}
fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_commit(self.client.native_ptr(), ptr::null_mut(), mode as i32)
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error.into()))
} else {
Ok(())
}
}
fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32)
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error.into()))
} else {
Ok(())
}
}
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
let topic = self.client.native_topic(topic)?;
let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}
fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
};
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
let error = unsafe { rdsys::rd_kafka_offsets_store(self.client.native_ptr(), tpl.ptr()) };
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}
fn subscription(&self) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let error = unsafe { rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tpl_ptr) };
if error.is_error() {
Err(KafkaError::MetadataFetch(error.into()))
} else {
Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
}
}
fn assignment(&self) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let error = unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
if error.is_error() {
Err(KafkaError::MetadataFetch(error.into()))
} else {
Ok(unsafe { TopicPartitionList::from_ptr(tpl_ptr) })
}
}
fn assignment_lost(&self) -> bool {
unsafe { rdsys::rd_kafka_assignment_lost(self.client.native_ptr()) == 1 }
}
fn committed<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let assignment_error =
unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
if assignment_error.is_error() {
return Err(KafkaError::MetadataFetch(assignment_error.into()));
}
self.committed_offsets(unsafe { TopicPartitionList::from_ptr(tpl_ptr) }, timeout)
}
fn committed_offsets<T: Into<Timeout>>(
&self,
tpl: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let committed_error = unsafe {
rdsys::rd_kafka_committed(
self.client.native_ptr(),
tpl.ptr(),
timeout.into().as_millis(),
)
};
if committed_error.is_error() {
Err(KafkaError::MetadataFetch(committed_error.into()))
} else {
Ok(tpl)
}
}
fn offsets_for_timestamp<T: Into<Timeout>>(
&self,
timestamp: i64,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let mut tpl_ptr = ptr::null_mut();
let assignment_error =
unsafe { rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tpl_ptr) };
if assignment_error.is_error() {
return Err(KafkaError::MetadataFetch(assignment_error.into()));
}
let mut tpl = unsafe { TopicPartitionList::from_ptr(tpl_ptr) };
tpl.set_all_offsets(Offset::Offset(timestamp))?;
self.offsets_for_times(tpl, timeout)
}
fn offsets_for_times<T: Into<Timeout>>(
&self,
timestamps: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList> {
let offsets_for_times_error = unsafe {
rdsys::rd_kafka_offsets_for_times(
self.client.native_ptr(),
timestamps.ptr(),
timeout.into().as_millis(),
)
};
if offsets_for_times_error.is_error() {
Err(KafkaError::MetadataFetch(offsets_for_times_error.into()))
} else {
Ok(timestamps)
}
}
fn position(&self) -> KafkaResult<TopicPartitionList> {
let tpl = self.assignment()?;
let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };
if error.is_error() {
Err(KafkaError::MetadataFetch(error.into()))
} else {
Ok(tpl)
}
}
fn fetch_metadata<T: Into<Timeout>>(
&self,
topic: Option<&str>,
timeout: T,
) -> KafkaResult<Metadata> {
self.client.fetch_metadata(topic, timeout)
}
fn fetch_watermarks<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)> {
self.client.fetch_watermarks(topic, partition, timeout)
}
fn fetch_group_list<T: Into<Timeout>>(
&self,
group: Option<&str>,
timeout: T,
) -> KafkaResult<GroupList> {
self.client.fetch_group_list(group, timeout)
}
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code =
unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code = unsafe {
rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
};
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}
fn rebalance_protocol(&self) -> RebalanceProtocol {
self.client.native_client().rebalance_protocol()
}
}
impl<C> Drop for BaseConsumer<C>
where
C: ConsumerContext,
{
fn drop(&mut self) {
trace!("Destroying consumer: {:?}", self.client.native_ptr());
if self.group_id.is_some() {
if let Err(err) = self.close_queue() {
error!("Failed to close consumer queue on drop: {}", err);
} else {
while !self.closed() {
self.poll(Duration::from_millis(100));
}
}
}
trace!("Consumer destroyed: {:?}", self.client.native_ptr());
}
}
pub struct Iter<'a, C>(&'a BaseConsumer<C>)
where
C: ConsumerContext;
impl<'a, C> Iterator for Iter<'a, C>
where
C: ConsumerContext,
{
type Item = KafkaResult<BorrowedMessage<'a>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(item) = self.0.poll(None) {
return Some(item);
}
}
}
}
impl<'a, C> IntoIterator for &'a BaseConsumer<C>
where
C: ConsumerContext,
{
type Item = KafkaResult<BorrowedMessage<'a>>;
type IntoIter = Iter<'a, C>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct PartitionQueue<C>
where
C: ConsumerContext,
{
consumer: Arc<BaseConsumer<C>>,
pub(crate) queue: NativeQueue,
nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
}
impl<C> PartitionQueue<C>
where
C: ConsumerContext,
{
pub(crate) fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
PartitionQueue {
consumer,
queue,
nonempty_callback: None,
}
}
pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> Option<KafkaResult<BorrowedMessage<'_>>> {
self.consumer.poll_queue(&self.queue, timeout)
}
pub fn set_nonempty_callback<F>(&mut self, f: F)
where
F: Fn() + Send + Sync + 'static,
{
unsafe extern "C" fn native_message_queue_nonempty_cb(
_: *mut RDKafka,
opaque_ptr: *mut c_void,
) {
let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
(**f)();
}
let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
unsafe {
rdsys::rd_kafka_queue_cb_event_enable(
self.queue.ptr(),
Some(native_message_queue_nonempty_cb),
&*f as *const _ as *mut c_void,
)
}
self.nonempty_callback = Some(f);
}
}
impl<C> Drop for PartitionQueue<C>
where
C: ConsumerContext,
{
fn drop(&mut self) {
unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
}
}