use std::collections::HashMap;
use std::fmt;
use std::hash::{Hasher, SipHasher, BuildHasher, BuildHasherDefault};
use client::{self, KafkaClient};
pub use client::Compression;
use error::Result;
use utils::TopicPartitionOffset;
use ref_slice::ref_slice;
#[cfg(feature = "security")]
use client::SecurityConfig;
#[cfg(not(feature = "security"))]
type SecurityConfig = ();
pub const DEFAULT_ACK_TIMEOUT: i32 = 30 * 1000;
pub const DEFAULT_REQUIRED_ACKS: i16 = 1;
pub trait AsBytes {
fn as_bytes(&self) -> &[u8];
}
impl AsBytes for () {
fn as_bytes(&self) -> &[u8] {
&[]
}
}
impl AsBytes for String {
fn as_bytes(&self) -> &[u8] {
self.as_ref()
}
}
impl AsBytes for Vec<u8> {
fn as_bytes(&self) -> &[u8] {
self.as_ref()
}
}
impl<'a> AsBytes for &'a [u8] {
fn as_bytes(&self) -> &[u8] {
self
}
}
impl<'a> AsBytes for &'a str {
fn as_bytes(&self) -> &[u8] {
str::as_bytes(self)
}
}
pub struct Record<'a, K, V> {
pub key: K,
pub value: V,
pub topic: &'a str,
pub partition: i32,
}
impl<'a, K, V> Record<'a, K, V> {
#[inline]
pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
Record {
key: key,
value: value,
topic: topic,
partition: -1,
}
}
#[inline]
pub fn with_partition(mut self, partition: i32) -> Self {
self.partition = partition;
self
}
}
impl<'a, V> Record<'a, (), V> {
#[inline]
pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
Record {
key: (),
value: value,
topic: topic,
partition: -1,
}
}
}
impl<'a, K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'a, K, V> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",
self.topic,
self.partition,
self.key,
self.value)
}
}
pub struct Producer<P = DefaultPartitioner> {
client: KafkaClient,
state: State<P>,
config: Config,
}
struct State<P> {
partitions: HashMap<String, Partitions>,
partitioner: P,
}
struct Config {
ack_timeout: i32,
required_acks: i16,
}
impl Producer {
pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
Builder::new(Some(client), Vec::new())
}
pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
Builder::new(None, hosts)
}
pub fn client(self) -> KafkaClient {
self.client
}
}
impl<P: Partitioner> Producer<P> {
pub fn send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()>
where K: AsBytes,
V: AsBytes
{
let mut rs = try!(self.send_all(ref_slice(rec)));
assert_eq!(1, rs.len());
rs.pop().unwrap().offset.map(|_| ())
}
pub fn send_all<'a, K, V>(&mut self,
recs: &[Record<'a, K, V>])
-> Result<Vec<TopicPartitionOffset>>
where K: AsBytes,
V: AsBytes
{
let partitioner = &mut self.state.partitioner;
let partitions = &self.state.partitions;
let client = &mut self.client;
let config = &self.config;
client.produce_messages(config.required_acks,
config.ack_timeout,
recs.into_iter().map(|r| {
let mut m = client::ProduceMessage {
key: to_option(r.key.as_bytes()),
value: to_option(r.value.as_bytes()),
topic: r.topic,
partition: r.partition,
};
partitioner.partition(Topics::new(partitions), &mut m);
m
}))
}
}
fn to_option(data: &[u8]) -> Option<&[u8]> {
if data.is_empty() {
None
} else {
Some(data)
}
}
impl<P> State<P> {
fn new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>> {
let ts = client.topics();
let mut ids = HashMap::with_capacity(ts.len());
for t in ts {
let ps = t.partitions();
ids.insert(t.name().to_owned(),
Partitions {
available_ids: ps.available_ids(),
num_all_partitions: ps.len() as u32,
});
}
Ok(State {
partitions: ids,
partitioner: partitioner,
})
}
}
pub struct Builder<P = DefaultPartitioner> {
client: Option<KafkaClient>,
hosts: Vec<String>,
compression: Compression,
ack_timeout: i32,
required_acks: i16,
partitioner: P,
security_config: Option<SecurityConfig>,
}
impl Builder {
fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
let mut b = Builder {
client: client,
hosts: hosts,
compression: client::DEFAULT_COMPRESSION,
ack_timeout: DEFAULT_ACK_TIMEOUT,
required_acks: DEFAULT_REQUIRED_ACKS,
partitioner: DefaultPartitioner::default(),
security_config: None,
};
if let Some(ref c) = b.client {
b.compression = c.compression();
}
b
}
#[cfg(feature = "security")]
pub fn with_security(mut self, security: SecurityConfig) -> Self {
self.security_config = Some(security);
self
}
pub fn with_compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
pub fn with_ack_timeout(mut self, timeout: i32) -> Self {
self.ack_timeout = timeout;
self
}
pub fn with_required_acks(mut self, required_acks: i16) -> Self {
self.required_acks = required_acks;
self
}
}
impl<P> Builder<P> {
pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
Builder {
client: self.client,
hosts: self.hosts,
compression: self.compression,
ack_timeout: self.ack_timeout,
required_acks: self.required_acks,
partitioner: partitioner,
security_config: None,
}
}
#[cfg(not(feature = "security"))]
fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
KafkaClient::new(hosts)
}
#[cfg(feature = "security")]
fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
if let Some(security) = security {
KafkaClient::new_secure(hosts, security)
} else {
KafkaClient::new(hosts)
}
}
pub fn create(self) -> Result<Producer<P>> {
let mut client = match self.client {
Some(client) => client,
None => {
let mut client = Self::new_kafka_client(self.hosts, self.security_config);
try!(client.load_metadata_all());
client
}
};
client.set_compression(self.compression);
let state = try!(State::new(&mut client, self.partitioner));
let config = Config {
ack_timeout: self.ack_timeout,
required_acks: self.required_acks,
};
Ok(Producer {
client: client,
state: state,
config: config,
})
}
}
pub struct Topics<'a> {
partitions: &'a HashMap<String, Partitions>,
}
pub struct Partitions {
available_ids: Vec<i32>,
num_all_partitions: u32,
}
impl Partitions {
#[inline]
pub fn available_ids(&self) -> &[i32] {
&self.available_ids
}
#[inline]
pub fn num_available(&self) -> u32 {
self.available_ids.len() as u32
}
#[inline]
pub fn num_all(&self) -> u32 {
self.num_all_partitions
}
}
impl<'a> Topics<'a> {
fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
Topics { partitions: partitions }
}
#[inline]
pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {
self.partitions.get(topic)
}
}
pub trait Partitioner {
fn partition(&mut self, topics: Topics, msg: &mut client::ProduceMessage);
}
#[derive(Default)]
pub struct DefaultPartitioner<H = BuildHasherDefault<SipHasher>> {
hash_builder: H,
cntr: u32,
}
impl DefaultPartitioner {
pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
DefaultPartitioner {
hash_builder: hash_builder.into(),
cntr: 0,
}
}
pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
where B: Hasher + Default
{
DefaultPartitioner {
hash_builder: BuildHasherDefault::<B>::default(),
cntr: 0,
}
}
}
impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
#[allow(unused_variables)]
fn partition(&mut self, topics: Topics, rec: &mut client::ProduceMessage) {
if rec.partition >= 0 {
return;
}
let partitions = match topics.partitions(rec.topic) {
None => return, Some(partitions) => partitions,
};
match rec.key {
Some(key) => {
let num_partitions = partitions.num_all();
if num_partitions == 0 {
return;
}
let mut h = self.hash_builder.build_hasher();
h.write(key);
let hash = h.finish() as u32;
rec.partition = (hash % num_partitions) as i32;
}
None => {
let avail = partitions.available_ids();
if avail.len() > 0 {
rec.partition = avail[self.cntr as usize % avail.len()];
self.cntr = self.cntr.wrapping_add(1);
}
}
}
}
}
#[cfg(test)]
mod default_partitioner_tests {
use std::hash::{Hasher, SipHasher, BuildHasherDefault};
use std::collections::HashMap;
use client;
use super::{DefaultPartitioner, Partitioner, Partitions, Topics};
fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
let mut h = HashMap::new();
for topic in topics {
h.insert(topic.0.into(), topic.1);
}
h
}
fn assert_partitioning<P: Partitioner>(topics: &HashMap<String, Partitions>,
p: &mut P,
topic: &str,
key: &str)
-> i32 {
let mut msg = client::ProduceMessage {
key: Some(key.as_bytes()),
value: None,
topic: topic,
partition: -1,
};
p.partition(Topics::new(topics), &mut msg);
let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
assert!(msg.partition >= 0 && msg.partition < num_partitions);
msg.partition
}
#[test]
fn test_key_partitioning() {
let h = topics_map(vec![("foo",
Partitions {
available_ids: vec![0, 1, 4],
num_all_partitions: 5,
}),
("bar",
Partitions {
available_ids: vec![0, 1],
num_all_partitions: 2,
})]);
let mut p: DefaultPartitioner<BuildHasherDefault<SipHasher>> = Default::default();
let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
assert_eq!(h1, h2);
let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
assert!(h3 != h4);
}
#[derive(Default)]
struct MyCustomHasher(u64);
impl Hasher for MyCustomHasher {
fn finish(&self) -> u64 {
self.0
}
fn write(&mut self, bytes: &[u8]) {
self.0 = bytes[0] as u64;
}
}
#[test]
fn default_partitioner_with_custom_hasher_default() {
let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();
let h = topics_map(vec![("confirms",
Partitions {
available_ids: vec![0, 1],
num_all_partitions: 2,
}),
("contents",
Partitions {
available_ids: vec![0, 1, 9],
num_all_partitions: 10,
})]);
let p1 = assert_partitioning(&h, &mut p, "confirms", "A" );
assert_eq!(1, p1);
let p2 = assert_partitioning(&h, &mut p, "contents", "B" );
assert_eq!(6, p2);
}
}