1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
extern crate rdkafka_sys as rdkafka;
extern crate futures;
use self::rdkafka::types::*;
use std::str;
use client::Client;
use config::{FromClientConfig, FromClientConfigAndContext, ClientConfig};
use consumer::{Consumer, ConsumerContext, CommitMode, EmptyConsumerContext};
use consumer::rebalance_cb;
use error::{KafkaError, KafkaResult, IsError};
use metadata::Metadata;
use message::Message;
use util::cstr_to_owned;
use topic_partition_list::TopicPartitionList;
pub struct BaseConsumer<C: ConsumerContext> {
client: Client<C>,
}
impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
fn get_base_consumer(&self) -> &BaseConsumer<C> {
self
}
fn get_base_consumer_mut(&mut self) -> &mut BaseConsumer<C> {
self
}
}
impl FromClientConfig for BaseConsumer<EmptyConsumerContext> {
fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer<EmptyConsumerContext>> {
BaseConsumer::from_config_and_context(config, EmptyConsumerContext)
}
}
impl<C: ConsumerContext> FromClientConfigAndContext<C> for BaseConsumer<C> {
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseConsumer<C>> {
let config_ptr = try!(config.create_native_config());
unsafe { rdkafka::rd_kafka_conf_set_rebalance_cb(config_ptr, Some(rebalance_cb::<C>)) };
let client = try!(Client::new(config_ptr, RDKafkaType::RD_KAFKA_CONSUMER, context));
unsafe { rdkafka::rd_kafka_poll_set_consumer(client.native_ptr()) };
Ok(BaseConsumer { client: client })
}
}
impl<C: ConsumerContext> BaseConsumer<C> {
pub fn subscribe(&mut self, topics: &Vec<&str>) -> KafkaResult<()> {
let tp_list = TopicPartitionList::with_topics(topics).create_native_topic_partition_list();
let ret_code = unsafe { rdkafka::rd_kafka_subscribe(self.client.native_ptr(), tp_list) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdkafka::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error))
};
unsafe { rdkafka::rd_kafka_topic_partition_list_destroy(tp_list) };
Ok(())
}
pub fn unsubscribe(&mut self) {
unsafe { rdkafka::rd_kafka_unsubscribe(self.client.native_ptr()) };
}
pub fn assign(&mut self, assignment: &TopicPartitionList) -> KafkaResult<()> {
let tp_list = assignment.create_native_topic_partition_list();
let ret_code = unsafe { rdkafka::rd_kafka_assign(self.client.native_ptr(), tp_list) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdkafka::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::Subscription(error))
};
unsafe { rdkafka::rd_kafka_topic_partition_list_destroy(tp_list) };
Ok(())
}
pub fn get_subscriptions(&self) -> TopicPartitionList {
let mut tp_list = unsafe { rdkafka::rd_kafka_topic_partition_list_new(0) };
unsafe { rdkafka::rd_kafka_subscription(self.client.native_ptr(), &mut tp_list as *mut *mut RDKafkaTopicPartitionList) };
TopicPartitionList::from_rdkafka(tp_list)
}
pub fn poll(&self, timeout_ms: i32) -> KafkaResult<Option<Message>> {
let message_ptr = unsafe { rdkafka::rd_kafka_consumer_poll(self.client.native_ptr(), timeout_ms) };
if message_ptr.is_null() {
return Ok(None);
}
let error = unsafe { (*message_ptr).err };
if error.is_error() {
return Err(match error {
rdkafka::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
KafkaError::PartitionEOF(unsafe { (*message_ptr).partition } )
},
e => KafkaError::MessageConsumption(e)
})
}
let kafka_message = Message::new(message_ptr);
Ok(Some(kafka_message))
}
pub fn commit_message(&self, message: &Message, mode: CommitMode) {
unsafe { rdkafka::rd_kafka_commit_message(self.client.native_ptr(), message.ptr(), mode as i32) };
}
pub fn fetch_metadata(&self, timeout_ms: i32) -> KafkaResult<Metadata> {
self.client.fetch_metadata(timeout_ms)
}
}
impl<C: ConsumerContext> Drop for BaseConsumer<C> {
fn drop(&mut self) {
trace!("Destroying consumer");
unsafe { rdkafka::rd_kafka_consumer_close(self.client.native_ptr()) };
}
}