pub mod base_consumer;
pub mod stream_consumer;
use rdsys;
use rdsys::types::*;
use client::{Context, NativeClient};
use error::KafkaResult;
use groups::GroupList;
use message::Message;
use metadata::Metadata;
use util::cstr_to_owned;
use std::mem;
use std::ptr;
use std::os::raw::c_void;
pub use consumer::base_consumer::BaseConsumer;
pub use topic_partition_list::TopicPartitionList;
#[derive(Clone, Debug)]
pub enum Rebalance {
Assign(TopicPartitionList),
Revoke,
Error(String)
}
pub trait ConsumerContext: Context {
fn rebalance(&self, native_client: &NativeClient, err: RDKafkaRespErr,
partitions_ptr: *mut RDKafkaTopicPartitionList) {
let rebalance = match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
let topic_partition_list = TopicPartitionList::from_rdkafka(partitions_ptr);
Rebalance::Assign(topic_partition_list)
},
RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => {
Rebalance::Revoke
},
_ => {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err)) };
error!("Error rebalancing: {}", error);
Rebalance::Error(error)
}
};
self.pre_rebalance(&rebalance);
unsafe {
match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
rdsys::rd_kafka_assign(native_client.ptr(), partitions_ptr);
},
RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => {
rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
},
_ => {
rdsys::rd_kafka_assign(native_client.ptr(), ptr::null());
}
}
}
self.post_rebalance(&rebalance);
}
fn pre_rebalance(&self, _rebalance: &Rebalance) { }
fn post_rebalance(&self, _rebalance: &Rebalance) { }
}
#[derive(Clone)]
pub struct EmptyConsumerContext;
impl Context for EmptyConsumerContext { }
impl ConsumerContext for EmptyConsumerContext { }
unsafe extern "C" fn rebalance_cb<C: ConsumerContext>(rk: *mut RDKafka,
err: RDKafkaRespErr,
partitions: *mut RDKafkaTopicPartitionList,
opaque_ptr: *mut c_void) {
let context: &C = &*(opaque_ptr as *const C);
let native_client = NativeClient::from_ptr(rk);
context.rebalance(&native_client, err, partitions);
mem::forget(native_client); }
pub enum CommitMode {
Sync = 0,
Async = 1,
}
pub trait Consumer<C: ConsumerContext> {
fn get_base_consumer(&self) -> &BaseConsumer<C>;
fn subscribe(&self, topics: &Vec<&str>) -> KafkaResult<()> {
self.get_base_consumer().subscribe(topics)
}
fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
self.get_base_consumer().assign(assignment)
}
fn commit(&self, topic_partition_list: &TopicPartitionList, mode: CommitMode) -> KafkaResult<()> {
self.get_base_consumer().commit(topic_partition_list, mode)
}
fn commit_message(&self, message: &Message, mode: CommitMode) -> KafkaResult<()> {
self.get_base_consumer().commit_message(message, mode)
}
fn subscription(&self) -> KafkaResult<TopicPartitionList> {
self.get_base_consumer().subscription()
}
fn assignment(&self) -> KafkaResult<TopicPartitionList> {
self.get_base_consumer().assignment()
}
fn committed(&self, timeout_ms: i32) -> KafkaResult<TopicPartitionList> {
self.get_base_consumer().committed(timeout_ms)
}
fn position(&self) -> KafkaResult<TopicPartitionList> {
self.get_base_consumer().position()
}
fn fetch_metadata(&self, timeout_ms: i32) -> KafkaResult<Metadata> {
self.get_base_consumer().fetch_metadata(timeout_ms)
}
fn fetch_watermarks(&self, topic: &str, partition: i32, timeout_ms: i32) -> KafkaResult<(i64, i64)> {
self.get_base_consumer().fetch_watermarks(topic, partition, timeout_ms)
}
fn fetch_group_list(&self, group: Option<&str>, timeout_ms: i32) -> KafkaResult<GroupList> {
self.get_base_consumer().fetch_group_list(group, timeout_ms)
}
}