use rdsys;
use rdsys::types::*;
use client::Client;
use config::{FromClientConfig, FromClientConfigAndContext, ClientConfig};
use consumer::rebalance_cb; use consumer::{Consumer, ConsumerContext, CommitMode, EmptyConsumerContext};
use error::{KafkaError, KafkaResult, IsError};
use groups::GroupList;
use message::Message;
use metadata::Metadata;
use topic_partition_list::TopicPartitionList;
use util::cstr_to_owned;
use std::str;
pub struct BaseConsumer<C: ConsumerContext> {
client: Client<C>,
}
impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
fn get_base_consumer(&self) -> &BaseConsumer<C> {
self
}
}
impl FromClientConfig for BaseConsumer<EmptyConsumerContext> {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer<EmptyConsumerContext>> {
BaseConsumer::from_config_and_context(config, EmptyConsumerContext)
}
}
impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
let native_config = config.create_native_config()?;
unsafe { rdsys::rd_kafka_conf_set_rebalance_cb(native_config.ptr(), Some(rebalance_cb::<C>)) };
let client = Client::new(config, native_config, RDKafkaType::RD_KAFKA_CONSUMER, context)?;
unsafe { rdsys::rd_kafka_poll_set_consumer(client.native_ptr()) };
Ok(BaseConsumer { client: client })
}
}
impl<C: ConsumerContext> BaseConsumer<C> {
pub fn subscribe(&self, topics: &[&str]) -> KafkaResult<()> {
let tp_list = TopicPartitionList::with_topics(topics).create_native_topic_partition_list();
let ret_code = unsafe { rdsys::rd_kafka_subscribe(self.client.native_ptr(), tp_list) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error))
};
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
Ok(())
}
pub fn unsubscribe(&self) {
unsafe { rdsys::rd_kafka_unsubscribe(self.client.native_ptr()) };
}
pub fn assign(&self, assignment: &TopicPartitionList) -> KafkaResult<()> {
let tp_list = assignment.create_native_topic_partition_list();
let ret_code = unsafe { rdsys::rd_kafka_assign(self.client.native_ptr(), tp_list) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error))
};
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
Ok(())
}
pub fn poll(&self, timeout_ms: i32) -> KafkaResult<Option<Message>> {
let message_ptr = unsafe { rdsys::rd_kafka_consumer_poll(self.client.native_ptr(), timeout_ms) };
if message_ptr.is_null() {
return Ok(None);
}
let error = unsafe { (*message_ptr).err };
if error.is_error() {
return Err(match error {
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
KafkaError::PartitionEOF(unsafe { (*message_ptr).partition } )
},
e => KafkaError::MessageConsumption(e)
})
}
let kafka_message = Message::new(message_ptr);
Ok(Some(kafka_message))
}
pub fn commit(&self, topic_partition_list: &TopicPartitionList, mode: CommitMode) -> KafkaResult<()> {
let tp_list = topic_partition_list.create_native_topic_partition_list();
let error = unsafe {
let e = rdsys::rd_kafka_commit(self.client.native_ptr(), tp_list, mode as i32);
rdsys::rd_kafka_topic_partition_list_destroy(tp_list);
e
};
if error.is_error() {
Err(KafkaError::ConsumerCommit(error))
} else {
Ok(())
}
}
pub fn commit_message(&self, message: &Message, mode: CommitMode) -> KafkaResult<()> {
let error = unsafe { rdsys::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32) };
if error.is_error() {
Err(KafkaError::ConsumerCommit(error))
} else {
Ok(())
}
}
pub fn subscription(&self) -> KafkaResult<TopicPartitionList> {
let mut tp_list = unsafe { rdsys::rd_kafka_topic_partition_list_new(0) };
let error = unsafe {
rdsys::rd_kafka_subscription(self.client.native_ptr(), &mut tp_list)
};
let result = if error.is_error() {
Err(KafkaError::MetadataFetch(error))
} else {
Ok(TopicPartitionList::from_rdkafka(tp_list))
};
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
result
}
pub fn assignment(&self) -> KafkaResult<TopicPartitionList> {
let mut tp_list = unsafe { rdsys::rd_kafka_topic_partition_list_new(0) };
let error = unsafe {
rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tp_list)
};
let result = if error.is_error() {
Err(KafkaError::MetadataFetch(error))
} else {
Ok(TopicPartitionList::from_rdkafka(tp_list))
};
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
result
}
pub fn committed(&self, timeout_ms: i32) -> KafkaResult<TopicPartitionList> {
let mut tp_list = unsafe { rdsys::rd_kafka_topic_partition_list_new(0) };
let assignment_error = unsafe {
rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tp_list)
};
if assignment_error.is_error() {
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
return Err(KafkaError::MetadataFetch(assignment_error))
}
let committed_error = unsafe {
rdsys::rd_kafka_committed(self.client.native_ptr(), tp_list, timeout_ms)
};
let result = if committed_error.is_error() {
Err(KafkaError::MetadataFetch(committed_error))
} else {
Ok(TopicPartitionList::from_rdkafka(tp_list))
};
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
result
}
pub fn position(&self) -> KafkaResult<TopicPartitionList> {
let mut tp_list = unsafe { rdsys::rd_kafka_topic_partition_list_new(0) };
let error = unsafe {
rdsys::rd_kafka_assignment(self.client.native_ptr(), &mut tp_list);
rdsys::rd_kafka_position(self.client.native_ptr(), tp_list)
};
let result = if error.is_error() {
Err(KafkaError::MetadataFetch(error))
} else {
Ok(TopicPartitionList::from_rdkafka(tp_list))
};
unsafe { rdsys::rd_kafka_topic_partition_list_destroy(tp_list) };
result
}
pub fn fetch_metadata(&self, timeout_ms: i32) -> KafkaResult<Metadata> {
self.client.fetch_metadata(timeout_ms)
}
pub fn fetch_watermarks(&self, topic: &str, partition: i32, timeout_ms: i32) -> KafkaResult<(i64, i64)> {
self.client.fetch_watermarks(topic, partition, timeout_ms)
}
pub fn fetch_group_list(&self, group: Option<&str>, timeout_ms: i32) -> KafkaResult<GroupList> {
self.client.fetch_group_list(group, timeout_ms)
}
}
impl<C: ConsumerContext> Drop for BaseConsumer<C> {
fn drop(&mut self) {
trace!("Destroying consumer"); unsafe { rdsys::rd_kafka_consumer_close(self.client.native_ptr()) };
}
}