use std::ptr;
use std::sync::Arc;
use std::time::Duration;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::{Client, ClientContext};
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::log::{error, trace};
use crate::message::BorrowedMessage;
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{KafkaDrop, NativePtr, Timeout};
pub mod base_consumer;
pub mod stream_consumer;
#[doc(inline)]
pub use self::base_consumer::BaseConsumer;
#[doc(inline)]
pub use self::stream_consumer::{MessageStream, StreamConsumer};
#[derive(Clone, Debug)]
pub enum Rebalance<'a> {
Assign(&'a TopicPartitionList),
Revoke(&'a TopicPartitionList),
Error(KafkaError),
}
pub trait ConsumerContext: ClientContext + Sized {
fn rebalance(
&self,
base_consumer: &BaseConsumer<Self>,
err: RDKafkaRespErr,
tpl: &mut TopicPartitionList,
) {
let rebalance = match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => Rebalance::Assign(tpl),
RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke(tpl),
_ => {
let error_code: RDKafkaErrorCode = err.into();
error!("Error rebalancing: {}", error_code);
Rebalance::Error(KafkaError::Rebalance(error_code))
}
};
trace!("Running pre-rebalance with {:?}", rebalance);
self.pre_rebalance(base_consumer, &rebalance);
trace!("Running rebalance with {:?}", rebalance);
let native_client = base_consumer.native_client();
unsafe {
match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
match native_client.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
rdsys::rd_kafka_incremental_assign(native_client.ptr(), tpl.ptr());
}
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), tpl.ptr());
}
}
}
_ => match native_client.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
rdsys::rd_kafka_incremental_unassign(native_client.ptr(), tpl.ptr());
}
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
}
},
}
}
trace!("Running post-rebalance with {:?}", rebalance);
self.post_rebalance(base_consumer, &rebalance);
}
#[allow(unused_variables)]
fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
#[allow(unused_variables)]
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
#[allow(unused_variables)]
fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
fn main_queue_min_poll_interval(&self) -> Timeout {
Timeout::After(Duration::from_secs(1))
}
}
#[derive(Clone, Debug, Default)]
pub struct DefaultConsumerContext;
impl ClientContext for DefaultConsumerContext {}
impl ConsumerContext for DefaultConsumerContext {}
#[derive(Clone, Copy, Debug)]
pub enum CommitMode {
Sync = 0,
Async = 1,
}
pub struct ConsumerGroupMetadata(NativePtr<RDKafkaConsumerGroupMetadata>);
impl ConsumerGroupMetadata {
pub(crate) fn ptr(&self) -> *const RDKafkaConsumerGroupMetadata {
self.0.ptr()
}
}
unsafe impl KafkaDrop for RDKafkaConsumerGroupMetadata {
const TYPE: &'static str = "consumer_group_metadata";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_consumer_group_metadata_destroy;
}
unsafe impl Send for ConsumerGroupMetadata {}
unsafe impl Sync for ConsumerGroupMetadata {}
pub enum RebalanceProtocol {
None,
Eager,
Cooperative,
}
pub trait Consumer<C = DefaultConsumerContext>
where
C: ConsumerContext,
{
fn client(&self) -> &Client<C>;
fn context(&self) -> &Arc<C> {
self.client().context()
}
fn group_metadata(&self) -> Option<ConsumerGroupMetadata>;
fn subscribe(&self, topics: &[&str]) -> KafkaResult<()>;
fn unsubscribe(&self);
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
fn unassign(&self) -> KafkaResult<()>;
fn incremental_assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
fn incremental_unassign(&self, assignment: &TopicPartitionList) -> KafkaResult<()>;
fn seek<T: Into<Timeout>>(
&self,
topic: &str,
partition: i32,
offset: Offset,
timeout: T,
) -> KafkaResult<()>;
fn seek_partitions<T: Into<Timeout>>(
&self,
topic_partition_list: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList>;
fn commit(
&self,
topic_partition_list: &TopicPartitionList,
mode: CommitMode,
) -> KafkaResult<()>;
fn commit_consumer_state(&self, mode: CommitMode) -> KafkaResult<()>;
fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>;
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>;
fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()>;
fn subscription(&self) -> KafkaResult<TopicPartitionList>;
fn assignment(&self) -> KafkaResult<TopicPartitionList>;
fn assignment_lost(&self) -> bool;
fn committed<T>(&self, timeout: T) -> KafkaResult<TopicPartitionList>
where
T: Into<Timeout>,
Self: Sized;
fn committed_offsets<T>(
&self,
tpl: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList>
where
T: Into<Timeout>;
fn offsets_for_timestamp<T>(
&self,
timestamp: i64,
timeout: T,
) -> KafkaResult<TopicPartitionList>
where
T: Into<Timeout>,
Self: Sized;
fn offsets_for_times<T>(
&self,
timestamps: TopicPartitionList,
timeout: T,
) -> KafkaResult<TopicPartitionList>
where
T: Into<Timeout>,
Self: Sized;
fn position(&self) -> KafkaResult<TopicPartitionList>;
fn fetch_metadata<T>(&self, topic: Option<&str>, timeout: T) -> KafkaResult<Metadata>
where
T: Into<Timeout>,
Self: Sized;
fn fetch_watermarks<T>(
&self,
topic: &str,
partition: i32,
timeout: T,
) -> KafkaResult<(i64, i64)>
where
T: Into<Timeout>,
Self: Sized;
fn fetch_group_list<T>(&self, group: Option<&str>, timeout: T) -> KafkaResult<GroupList>
where
T: Into<Timeout>,
Self: Sized;
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()>;
fn rebalance_protocol(&self) -> RebalanceProtocol;
}