use std::collections::hash_map::HashMap;
use std::io::Cursor;
use std::iter::Iterator;
use std::mem;
#[cfg(feature = "security")]
use openssl::ssl::SslContext;
pub use compression::Compression;
pub use utils::PartitionOffset;
pub use utils::TopicPartitionOffset;
use codecs::{ToByte, FromByte};
use connection::KafkaConnection;
use error::{Result, Error, KafkaCode};
use protocol::{self, ResponseParser};
pub mod metadata;
mod state;
pub mod fetch {
pub use protocol::fetch::{Data, Message, Partition, Response, Topic};
}
const CLIENTID: &'static str = "kafka-rust";
const DEFAULT_SO_TIMEOUT_SECS: i32 = 120;
pub const DEFAULT_COMPRESSION: Compression = Compression::NONE;
pub const DEFAULT_FETCH_MAX_WAIT_TIME: i32 = 100;
pub const DEFAULT_FETCH_MIN_BYTES: i32 = 4096;
pub const DEFAULT_FETCH_MAX_BYTES_PER_PARTITION: i32 = 32 * 1024;
pub const DEFAULT_FETCH_CRC_VALIDATION: bool = true;
#[derive(Debug)]
pub struct KafkaClient {
config: ClientConfig,
conn_pool: ConnectionPool,
state: state::ClientState,
}
#[derive(Debug)]
struct ClientConfig {
client_id: String,
hosts: Vec<String>,
compression: Compression,
fetch_max_wait_time: i32,
fetch_min_bytes: i32,
fetch_max_bytes_per_partition: i32,
fetch_crc_validation: bool,
}
#[derive(Debug)]
struct ConnectionPool {
conns: HashMap<String, KafkaConnection>,
timeout: i32,
#[cfg(feature = "security")]
security_config: Option<SecurityConfig>
}
impl ConnectionPool {
#[cfg(not(feature = "security"))]
fn new(timeout: i32) -> ConnectionPool {
ConnectionPool {
conns: HashMap::new(),
timeout: timeout,
}
}
#[cfg(feature = "security")]
fn new(timeout: i32) -> ConnectionPool {
Self::new_with_security(timeout, None)
}
#[cfg(feature = "security")]
fn new_with_security(timeout: i32, security: Option<SecurityConfig>) -> ConnectionPool {
ConnectionPool {
conns: HashMap::new(),
timeout: timeout,
security_config: security,
}
}
fn get_conn<'a>(&'a mut self, host: &str) -> Result<&'a mut KafkaConnection> {
if let Some(conn) = self.conns.get_mut(host) {
return Ok(unsafe { mem::transmute(conn) });
}
let conn = try!(self.new_conn(host));
self.conns.insert(host.to_owned(), conn);
Ok(self.conns.get_mut(host).unwrap())
}
#[cfg(not(feature = "security"))]
fn new_conn(&self, host: &str) -> Result<KafkaConnection> {
KafkaConnection::new(host, self.timeout)
}
#[cfg(feature = "security")]
fn new_conn(&self, host: &str) -> Result<KafkaConnection> {
KafkaConnection::new(host, self.timeout, self.security_config.as_ref().map(|c| &c.0))
}
}
#[derive(Debug, Copy, Clone)]
pub enum FetchOffset {
Earliest,
Latest,
ByTime(i64),
}
impl FetchOffset {
fn to_kafka_value(&self) -> i64 {
match *self {
FetchOffset::Earliest => -2,
FetchOffset::Latest => -1,
FetchOffset::ByTime(n) => n,
}
}
}
#[derive(Debug)]
pub struct FetchGroupOffset<'a> {
pub topic: &'a str,
pub partition: i32,
}
impl<'a> FetchGroupOffset<'a> {
#[inline]
pub fn new(topic: &'a str, partition: i32) -> Self {
FetchGroupOffset { topic: topic, partition: partition }
}
}
impl<'a> AsRef<FetchGroupOffset<'a>> for FetchGroupOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct CommitOffset<'a> {
pub offset: i64,
pub topic: &'a str,
pub partition: i32,
}
impl<'a> CommitOffset<'a> {
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
CommitOffset { topic: topic, partition: partition, offset: offset }
}
}
impl<'a> AsRef<CommitOffset<'a>> for CommitOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct ProduceMessage<'a, 'b> {
pub key: Option<&'b [u8]>,
pub value: Option<&'b [u8]>,
pub topic: &'a str,
pub partition: i32,
}
impl<'a, 'b> AsRef<ProduceMessage<'a, 'b>> for ProduceMessage<'a, 'b> {
fn as_ref(&self) -> &Self {
self
}
}
impl<'a, 'b> ProduceMessage<'a, 'b> {
pub fn new(topic: &'a str, partition: i32, key: Option<&'b [u8]>, value: Option<&'b [u8]>) -> Self {
ProduceMessage { key: key, value: value, topic: topic, partition: partition }
}
}
#[derive(Debug)]
pub struct FetchPartition<'a> {
pub topic: &'a str,
pub offset: i64,
pub partition: i32,
pub max_bytes: i32,
}
impl<'a> FetchPartition<'a> {
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
FetchPartition {
topic: topic,
partition: partition,
offset: offset,
max_bytes: -1,
}
}
pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
self.max_bytes = max_bytes;
self
}
}
impl<'a> AsRef<FetchPartition<'a>> for FetchPartition<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[cfg(feature = "security")]
#[derive(Debug)]
pub struct SecurityConfig(SslContext);
#[cfg(feature = "security")]
impl SecurityConfig {
pub fn new(ssl: SslContext) -> SecurityConfig {
SecurityConfig(ssl)
}
}
impl KafkaClient {
pub fn new(hosts: Vec<String>) -> KafkaClient {
KafkaClient {
config: ClientConfig {
client_id: CLIENTID.to_owned(),
hosts: hosts,
compression: DEFAULT_COMPRESSION,
fetch_max_wait_time: DEFAULT_FETCH_MAX_WAIT_TIME,
fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
},
conn_pool: ConnectionPool::new(DEFAULT_SO_TIMEOUT_SECS),
state: state::ClientState::new(),
}
}
#[cfg(feature = "security")]
pub fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient {
KafkaClient {
config: ClientConfig {
client_id: CLIENTID.to_owned(),
hosts: hosts,
compression: DEFAULT_COMPRESSION,
fetch_max_wait_time: DEFAULT_FETCH_MAX_WAIT_TIME,
fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
},
conn_pool: ConnectionPool::new_with_security(DEFAULT_SO_TIMEOUT_SECS, Some(security)),
state: state::ClientState::new(),
}
}
#[inline]
pub fn hosts(&self) -> &[String] {
&self.config.hosts
}
#[inline]
pub fn set_compression(&mut self, compression: Compression) {
self.config.compression = compression;
}
#[inline]
pub fn compression(&self) -> Compression {
self.config.compression
}
#[inline]
pub fn set_fetch_max_wait_time(&mut self, max_wait_time: i32) {
self.config.fetch_max_wait_time = max_wait_time;
}
#[inline]
pub fn fetch_max_wait_time(&self) -> i32 {
self.config.fetch_max_wait_time
}
#[inline]
pub fn set_fetch_min_bytes(&mut self, min_bytes: i32) {
self.config.fetch_min_bytes = min_bytes;
}
#[inline]
pub fn fetch_min_bytes(&self) -> i32 {
self.config.fetch_min_bytes
}
#[inline]
pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32) {
self.config.fetch_max_bytes_per_partition = max_bytes;
}
#[inline]
pub fn fetch_max_bytes_per_partition(&self) -> i32 {
self.config.fetch_max_bytes_per_partition
}
#[inline]
pub fn set_fetch_crc_validation(&mut self, validate_crc: bool) {
self.config.fetch_crc_validation = validate_crc;
}
#[inline]
pub fn fetch_crc_validation(&self) -> bool {
self.config.fetch_crc_validation
}
#[inline]
pub fn topics(&self) -> metadata::Topics {
metadata::Topics::new(self)
}
#[inline]
pub fn load_metadata_all(&mut self) -> Result<()> {
self.reset_metadata();
self.load_metadata::<&str>(&[])
}
#[inline]
pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
let resp = try!(self.fetch_metadata(topics));
self.state.update_metadata(resp)
}
#[inline]
pub fn reset_metadata(&mut self) {
self.state.clear_metadata();
}
fn fetch_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<protocol::MetadataResponse> {
let correlation = self.state.next_correlation_id();
for host in &self.config.hosts {
debug!("Attempting to fetch metadata from {}", host);
match self.conn_pool.get_conn(host) {
Ok(conn) => {
let req = protocol::MetadataRequest::new(correlation, &self.config.client_id, topics);
match __send_request(conn, req) {
Ok(_) => return __get_response::<protocol::MetadataResponse>(conn),
Err(e) => debug!("Failed to request metadata from {}: {}", host, e),
}
}
Err(e) => {
debug!("Failed to connect to {}: {}", host, e);
}
}
}
Err(Error::NoHostReachable)
}
pub fn fetch_offsets<T: AsRef<str>>(&mut self, topics: &[T], offset: FetchOffset)
-> Result<HashMap<String, Vec<PartitionOffset>>>
{
let time = offset.to_kafka_value();
let n_topics = topics.len();
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut reqs: HashMap<&str, protocol::OffsetRequest> = HashMap::with_capacity(n_topics);
for topic in topics {
let topic = topic.as_ref();
if let Some(ps) = state.partitions_for(topic) {
for (id, host) in ps.iter().filter_map(|(id, p)| p.broker(&state).map(|b| (id, b.host()))) {
let entry = reqs.entry(host)
.or_insert_with(|| protocol::OffsetRequest::new(correlation, &config.client_id));
entry.add(topic, id, time);
}
}
}
let mut res: HashMap<String, Vec<PartitionOffset>> = HashMap::with_capacity(n_topics);
for (host, req) in reqs {
let resp = try!(__send_receive::<protocol::OffsetRequest, protocol::OffsetResponse>(&mut self.conn_pool, &host, req));
for tp in resp.topic_partitions {
let e = res.entry(tp.topic).or_insert(vec!());
for p in tp.partitions {
e.push(p.into_offset());
}
}
}
Ok(res)
}
pub fn fetch_topic_offsets<T: AsRef<str>>(&mut self, topic: T, offset: FetchOffset)
-> Result<Vec<PartitionOffset>>
{
let topic = topic.as_ref();
let mut m = try!(self.fetch_offsets(&[topic], offset));
let offs = m.remove(topic).unwrap_or(vec!());
if offs.is_empty() {
Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
} else {
Ok(offs)
}
}
pub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<fetch::Response>>
where J: AsRef<FetchPartition<'a>>, I: IntoIterator<Item=J>
{
let state = &mut self.state;
let config = &self.config;
let correlation = state.next_correlation_id();
let mut reqs: HashMap<&str, protocol::FetchRequest> = HashMap::new();
for inp in input {
let inp = inp.as_ref();
if let Some(broker) = state.find_broker(inp.topic, inp.partition) {
reqs.entry(broker)
.or_insert_with(|| {
protocol::FetchRequest::new(
correlation, &config.client_id,
config.fetch_max_wait_time, config.fetch_min_bytes)
})
.add(inp.topic, inp.partition, inp.offset,
if inp.max_bytes > 0 {
inp.max_bytes
} else {
config.fetch_max_bytes_per_partition
});
}
}
__fetch_messages(&mut self.conn_pool, config, reqs)
}
pub fn fetch_messages_for_partition<'a>(&mut self, req: &FetchPartition<'a>)
-> Result<Vec<fetch::Response>>
{
self.fetch_messages(&[req])
}
pub fn produce_messages<'a, 'b, I, J>(&mut self, required_acks: i16, ack_timeout: i32, messages: I)
-> Result<Vec<TopicPartitionOffset>>
where J: AsRef<ProduceMessage<'a, 'b>>, I: IntoIterator<Item=J>
{
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut reqs: HashMap<&str, protocol::ProduceRequest> = HashMap::new();
for msg in messages {
let msg = msg.as_ref();
match state.find_broker(msg.topic, msg.partition) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(broker) => reqs.entry(broker)
.or_insert_with(
|| protocol::ProduceRequest::new(
required_acks, ack_timeout, correlation,
&config.client_id, config.compression))
.add(msg.topic, msg.partition, msg.key, msg.value),
}
}
__produce_messages(&mut self.conn_pool, reqs, required_acks == 0)
}
pub fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
where J: AsRef<CommitOffset<'a>>, I: IntoIterator<Item=J>
{
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut reqs: HashMap<&str, protocol::OffsetCommitRequest> = HashMap:: new();
for tp in offsets {
let tp = tp.as_ref();
if let Some(broker) = state.find_broker(&tp.topic, tp.partition) {
reqs.entry(broker)
.or_insert(protocol::OffsetCommitRequest::new(group, correlation, &config.client_id))
.add(tp.topic, tp.partition, tp.offset, "");
}
}
__commit_offsets(&mut self.conn_pool, reqs)
}
pub fn commit_offset(&mut self, group: &str, topic: &str, partition: i32, offset: i64)
-> Result<()>
{
self.commit_offsets(group, &[CommitOffset::new(topic, partition, offset)])
}
pub fn fetch_group_offsets<'a, J, I>(&mut self, group: &str, partitions: I)
-> Result<Vec<TopicPartitionOffset>>
where J: AsRef<FetchGroupOffset<'a>>, I: IntoIterator<Item=J>
{
let correlation = self.state.next_correlation_id();
let mut reqs: HashMap<&str, protocol::OffsetFetchRequest> = HashMap:: new();
for tp in partitions {
let tp = tp.as_ref();
if let Some(broker) = self.state.find_broker(tp.topic, tp.partition) {
reqs.entry(broker)
.or_insert(protocol::OffsetFetchRequest::new(group, correlation, &self.config.client_id))
.add(tp.topic, tp.partition);
}
}
__fetch_group_offsets(&mut self.conn_pool, reqs)
}
pub fn fetch_group_topic_offsets(&mut self, group: &str, topic: &str)
-> Result<Vec<TopicPartitionOffset>>
{
let tps: Vec<_> =
match self.state.partitions_for(topic) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(tp) => tp.iter().map(|(id, _)| FetchGroupOffset::new(topic, id)).collect(),
};
self.fetch_group_offsets(group, tps)
}
}
fn __commit_offsets(conn_pool: &mut ConnectionPool,
reqs: HashMap<&str, protocol::OffsetCommitRequest>)
-> Result<()> {
for (host, req) in reqs {
try!(__send_receive::<protocol::OffsetCommitRequest, protocol::OffsetCommitResponse>(conn_pool, host, req));
}
Ok(())
}
fn __fetch_group_offsets(conn_pool: &mut ConnectionPool,
reqs: HashMap<&str, protocol::OffsetFetchRequest>)
-> Result<Vec<TopicPartitionOffset>> {
let mut res = vec!();
for (host, req) in reqs {
let resp = try!(__send_receive::<protocol::OffsetFetchRequest, protocol::OffsetFetchResponse>(conn_pool, host, req));
let o = resp.get_offsets();
for tpo in o {
res.push(tpo);
}
}
Ok(res)
}
fn __fetch_messages(conn_pool: &mut ConnectionPool,
config: &ClientConfig,
reqs: HashMap<&str, protocol::FetchRequest>)
-> Result<Vec<fetch::Response>>
{
let mut res = Vec::with_capacity(reqs.len());
for (host, req) in reqs {
let p = protocol::fetch::ResponseParser {
validate_crc: config.fetch_crc_validation,
requests: Some(&req),
};
res.push(try!(__z_send_receive::<&protocol::FetchRequest, _>(conn_pool, host, &req, &p)));
}
Ok(res)
}
fn __produce_messages(conn_pool: &mut ConnectionPool,
reqs: HashMap<&str, protocol::ProduceRequest>,
no_acks: bool)
-> Result<Vec<TopicPartitionOffset>>
{
if no_acks {
for (host, req) in reqs {
try!(__send_noack::<protocol::ProduceRequest, protocol::ProduceResponse>(conn_pool, host, req));
}
Ok(vec!())
} else {
let mut res: Vec<TopicPartitionOffset> = vec![];
for (host, req) in reqs {
let resp = try!(__send_receive::<protocol::ProduceRequest, protocol::ProduceResponse>(conn_pool, &host, req));
for tpo in resp.get_response() {
res.push(tpo);
}
}
Ok(res)
}
}
fn __send_receive<T: ToByte, V: FromByte>(conn_pool: &mut ConnectionPool, host: &str, req: T)
-> Result<V::R>
{
let mut conn = try!(conn_pool.get_conn(host));
try!(__send_request(&mut conn, req));
__get_response::<V>(&mut conn)
}
fn __send_noack<T: ToByte, V: FromByte>(conn_pool: &mut ConnectionPool, host: &str, req: T)
-> Result<usize>
{
let mut conn = try!(conn_pool.get_conn(&host));
__send_request(&mut conn, req)
}
fn __send_request<T: ToByte>(conn: &mut KafkaConnection, request: T)
-> Result<usize>
{
let mut buffer = Vec::with_capacity(4);
buffer.extend_from_slice(&[0, 0, 0, 0]);
try!(request.encode(&mut buffer));
let size = buffer.len() as i32 - 4;
try!(size.encode(&mut &mut buffer[..]));
conn.send(&buffer)
}
fn __get_response<T: FromByte>(conn: &mut KafkaConnection)
-> Result<T::R>
{
let v = try!(conn.read_exact(4));
let size = try!(i32::decode_new(&mut Cursor::new(v)));
let resp = try!(conn.read_exact(size as u64));
T::decode_new(&mut Cursor::new(resp))
}
fn __z_send_receive<R, P>(conn_pool: &mut ConnectionPool, host: &str, req: R, parser: &P)
-> Result<P::T>
where R: ToByte, P: ResponseParser
{
let mut conn = try!(conn_pool.get_conn(host));
try!(__send_request(&mut conn, req));
__z_get_response(&mut conn, parser)
}
fn __z_get_response<P>(conn: &mut KafkaConnection, parser: &P) -> Result<P::T>
where P: ResponseParser
{
let v = try!(conn.read_exact(4));
let size = try!(i32::decode_new(&mut Cursor::new(v)));
let resp = try!(conn.read_exact(size as u64));
parser.parse(resp)
}