kafka 0.3.2

Rust client for Apache Kafka
//! Kafka Producer - A higher-level API for sending messages to Kafka
//! topics.
//! This module hosts a multi-topic capable producer for a Kafka
//! cluster providing a convenient API for sending messages
//! synchronously.
//! In Kafka, each message is a key/value pair where one or the other
//! is optional.  A `Record` represents all the data necessary to
//! produce such a message to Kafka using the `Producer`.  It
//! specifies the target topic and the target partition the message is
//! supposed to be delivered to as well as the key and the value.
//! # Example
//! ```no_run
//! use std::fmt::Write;
//! use kafka::producer::{Producer, Record};
//! let mut producer =
//!     Producer::from_hosts(vec!("localhost:9092".to_owned()))
//!         .with_ack_timeout(1000)
//!         .with_required_acks(1)
//!         .create()
//!         .unwrap();
//! let mut buf = String::with_capacity(2);
//! for i in 0..10 {
//!   let _ = write!(&mut buf, "{}", i); // some computation of the message data to be sent
//!   producer.send(&Record::from_value("my-topic", buf.as_bytes())).unwrap();
//!   buf.clear();
//! }
//! ```
//! In this example, when the `producer.send(..)` returns
//! successfully, we are guaranteed the message is delivered to Kafka
//! and persisted by at least one Kafka broker.  However, when sending
//! multiple messages just like in this example, it is more efficient
//! to send them in batches using `Producer::send_all`.
//! Since some of the `Record`s attributes are optional, convenience
//! methods exist to ease their creation.  In this example, the call
//! to `Record::from_value` creates a key-less, value-only record with
//! an unspecified partition.  The `Record` struct, however, is
//! intended to provide full control over its lifecycle to client
//! code, and, hence, is fully open.  Its current constructor methods
//! are provided for convience only.
//! Beside the target topic, key, and the value of a `Record`, client
//! code is allowed to specify the topic partition the message is
//! supposed to be delivered to.  If the partition of a `Record` is
//! not specified - more precisely speaking if it's negative -
//! `Producer` will rely on its underlying `Partitioner` to find a
//! suitable one.  A `Partitioner` implementation can be supplied by
//! client code at the `Producer`'s construction time and defaults to
//! `DefaultPartitioner`.  See that for more information for its
//! strategy to find a partition.

// XXX 1) rethink return values for the send_all() method
// XXX 2) Handle recoverable errors behind the scenes through retry attempts

use std::collections::HashMap;
use std::fmt;
use std::hash::{Hasher, SipHasher, BuildHasher, BuildHasherDefault};

use client::{self, KafkaClient};
// public re-exports
pub use client::Compression;
use error::Result;
use utils::TopicPartitionOffset;

use ref_slice::ref_slice;

#[cfg(feature = "security")]
use client::SecurityConfig;

#[cfg(not(feature = "security"))]
type SecurityConfig = ();

/// The default value for `Builder::with_ack_timeout`.
pub const DEFAULT_ACK_TIMEOUT: i32 = 30 * 1000;

/// The default value for `Builder::with_required_acks`.
pub const DEFAULT_REQUIRED_ACKS: i16 = 1;

// --------------------------------------------------------------------

/// A trait used by `Producer` to obtain the bytes `Record::key` and
/// `Record::value` represent.  This leaves the choice of the types
/// for `key` and `value` with the client.
pub trait AsBytes {
    fn as_bytes(&self) -> &[u8];

impl AsBytes for () {
    fn as_bytes(&self) -> &[u8] {

// There seems to be some compiler issue with this:
// impl<T: AsRef<[u8]>> AsBytes for T {
//     fn as_bytes(&self) -> &[u8] { self.as_ref() }
// }

// for now we provide the impl for some standard library types
impl AsBytes for String {
    fn as_bytes(&self) -> &[u8] {
impl AsBytes for Vec<u8> {
    fn as_bytes(&self) -> &[u8] {

impl<'a> AsBytes for &'a [u8] {
    fn as_bytes(&self) -> &[u8] {
impl<'a> AsBytes for &'a str {
    fn as_bytes(&self) -> &[u8] {

// --------------------------------------------------------------------

/// A structure representing a message to be sent to Kafka through the
/// `Producer` API.  Such a message is basically a key/value pair
/// specifying the target topic and optionally the topic's partition.
pub struct Record<'a, K, V> {
    /// Key data of this (message) record.
    pub key: K,

    /// Value data of this (message) record.
    pub value: V,

    /// Name of the topic this message is supposed to be delivered to.
    pub topic: &'a str,

    /// The partition id of the topic to deliver this message to.
    /// This partition may be `< 0` in which case it is considered
    /// "unspecified".  A `Producer` will then typically try to derive
    /// a partition on its own.
    pub partition: i32,

impl<'a, K, V> Record<'a, K, V> {
    /// Convenience function to create a new key/value record with an
    /// "unspecified" partition - this is, a partition set to a negative
    /// value.
    pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
        Record {
            key: key,
            value: value,
            topic: topic,
            partition: -1,

    /// Convenience method to set the partition.
    pub fn with_partition(mut self, partition: i32) -> Self {
        self.partition = partition;

impl<'a, V> Record<'a, (), V> {
    /// Convenience function to create a new value only record with an
    /// "unspecified" partition - this is, a partition set to a negative
    /// value.
    pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
        Record {
            key: (),
            value: value,
            topic: topic,
            partition: -1,

impl<'a, K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'a, K, V> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
               "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",

// --------------------------------------------------------------------

/// The Kafka Producer
/// See module level documentation.
pub struct Producer<P = DefaultPartitioner> {
    client: KafkaClient,
    state: State<P>,
    config: Config,

struct State<P> {
    /// A list of available partition IDs for each topic.
    partitions: HashMap<String, Partitions>,
    /// The partitioner to decide how to distribute messages
    partitioner: P,

struct Config {
    /// The maximum time in millis to wait for acknowledgements. See
    /// `KafkaClient::produce_messages`.
    ack_timeout: i32,
    /// The number of acks to request. See
    /// `KafkaClient::produce_messages`.
    required_acks: i16,

impl Producer {
    /// Starts building a new producer using the given Kafka client.
    pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
        Builder::new(Some(client), Vec::new())

    /// Starts building a producer bootstraping internally a new kafka
    /// client from the given kafka hosts.
    pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
        Builder::new(None, hosts)

    /// Destroys this producer returning the underlying kafka client.
    pub fn client(self) -> KafkaClient {

impl<P: Partitioner> Producer<P> {
    /// Synchronously send the specified message to Kafka.
    pub fn send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()>
        where K: AsBytes,
              V: AsBytes
        let mut rs = try!(self.send_all(ref_slice(rec)));
        assert_eq!(1, rs.len());
        rs.pop().unwrap().offset.map(|_| ())

    /// Synchronously send all of the specified messages to Kafka.
    pub fn send_all<'a, K, V>(&mut self,
                              recs: &[Record<'a, K, V>])
                              -> Result<Vec<TopicPartitionOffset>>
        where K: AsBytes,
              V: AsBytes
        let partitioner = &mut self.state.partitioner;
        let partitions = &self.state.partitions;
        let client = &mut self.client;
        let config = &self.config;

                                recs.into_iter().map(|r| {
                                    let mut m = client::ProduceMessage {
                                        key: to_option(r.key.as_bytes()),
                                        value: to_option(r.value.as_bytes()),
                                        topic: r.topic,
                                        partition: r.partition,
                                    partitioner.partition(Topics::new(partitions), &mut m);

fn to_option(data: &[u8]) -> Option<&[u8]> {
    if data.is_empty() {
    } else {

// --------------------------------------------------------------------

impl<P> State<P> {
    fn new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>> {
        let ts = client.topics();
        let mut ids = HashMap::with_capacity(ts.len());
        for t in ts {
            let ps = t.partitions();
                       Partitions {
                           available_ids: ps.available_ids(),
                           num_all_partitions: ps.len() as u32,
        Ok(State {
            partitions: ids,
            partitioner: partitioner,

// --------------------------------------------------------------------

/// A Kafka Producer builder easing the process of setting up various
/// configuration settings.
pub struct Builder<P = DefaultPartitioner> {
    client: Option<KafkaClient>,
    hosts: Vec<String>,
    compression: Compression,
    ack_timeout: i32,
    required_acks: i16,
    partitioner: P,
    security_config: Option<SecurityConfig>,

impl Builder {
    fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
        let mut b = Builder {
            client: client,
            hosts: hosts,
            compression: client::DEFAULT_COMPRESSION,
            ack_timeout: DEFAULT_ACK_TIMEOUT,
            required_acks: DEFAULT_REQUIRED_ACKS,
            partitioner: DefaultPartitioner::default(),
            security_config: None,
        if let Some(ref c) = b.client {
            b.compression = c.compression();

    /// Specifies the security config to use.
    /// See `KafkaClient::new_secure` for more info.
    #[cfg(feature = "security")]
    pub fn with_security(mut self, security: SecurityConfig) -> Self {
        self.security_config = Some(security);

    /// Sets the compression algorithm to use when sending out data.
    /// See `KafkaClient::set_compression`.
    pub fn with_compression(mut self, compression: Compression) -> Self {
        self.compression = compression;

    /// Sets the maximum time in milliseconds the kafka brokers can
    /// await the receipt of required acknowledgements (which is
    /// specified through `Builder::with_required_acks`.)  Note that
    /// Kafka explicitely documents this not to be a hard limit.
    pub fn with_ack_timeout(mut self, timeout: i32) -> Self {
        self.ack_timeout = timeout;

    /// Sets how many acknowledgements the kafka brokers should
    /// receive before responding to sent messages.  If it is 0 the
    /// servers will not send any response.  If it is 1, the server
    /// will wait the data is written to the local server log before
    /// sending a replying.  If it is -1 the servers will block until
    /// the messages are committed by all in sync replicas before
    /// replaying.  For any number `> 1` the servers will block
    /// waiting for this number of acknowledgements to occur (but the
    /// servers will never wait for more acknowledgements than there
    /// are in-sync replicas).
    pub fn with_required_acks(mut self, required_acks: i16) -> Self {
        self.required_acks = required_acks;

impl<P> Builder<P> {
    /// Sets the partitioner to dispatch when sending messages without
    /// an explicit partition assignment.
    pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
        Builder {
            client: self.client,
            hosts: self.hosts,
            compression: self.compression,
            ack_timeout: self.ack_timeout,
            required_acks: self.required_acks,
            partitioner: partitioner,
            security_config: None,

    #[cfg(not(feature = "security"))]
    fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {

    #[cfg(feature = "security")]
    fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
        if let Some(security) = security {
            KafkaClient::new_secure(hosts, security)
        } else {

    /// Finally creates/builds a new producer based on the so far
    /// supplied settings.
    pub fn create(self) -> Result<Producer<P>> {
        let mut client = match self.client {
            Some(client) => client,
            None => {
                let mut client = Self::new_kafka_client(self.hosts, self.security_config);
        let state = try!(State::new(&mut client, self.partitioner));
        let config = Config {
            ack_timeout: self.ack_timeout,
            required_acks: self.required_acks,
        Ok(Producer {
            client: client,
            state: state,
            config: config,

// --------------------------------------------------------------------

/// A description of available topics and their available partitions.
/// Indented for use by `Partitioner`s.
pub struct Topics<'a> {
    partitions: &'a HashMap<String, Partitions>,

/// Producer relevant partition information of a particular topic.
/// Indented for use by `Partition`s.
pub struct Partitions {
    available_ids: Vec<i32>,
    num_all_partitions: u32,

impl Partitions {
    /// Retrieves the list of the identifiers of currently "available"
    /// partitions for the given topic.  This list excludes partitions
    /// which do not have a leader broker assigned.
    pub fn available_ids(&self) -> &[i32] {

    /// Retrieves the number of "available" partitions. This is a
    /// merely a convenience method. See `Partitions::available_ids`.
    pub fn num_available(&self) -> u32 {
        self.available_ids.len() as u32

    /// The total number of partitions of the underlygin topic.  This
    /// number includes also partitions without a current leader
    /// assignment.
    pub fn num_all(&self) -> u32 {

impl<'a> Topics<'a> {
    fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
        Topics { partitions: partitions }

    /// Retrieves informationa about a topic's partitions.
    pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {

/// A partitioner is given a chance to choose/redefine a partition for
/// a message to be sent to Kafka.  See also
/// `Record#with_partition`.
/// Implementations can be stateful.
pub trait Partitioner {
    /// Supposed to inspect the given message and if desired re-assign
    /// the message's target partition.
    /// `topics` a description of the currently known topics and their
    /// currently available partitions.
    /// `msg` the message whose partition assignment potentially to
    /// change.
    fn partition(&mut self, topics: Topics, msg: &mut client::ProduceMessage);

/// As its name implies `DefaultPartitioner` is the default
/// partitioner for `Producer`.
/// For every message it proceedes as follows:
/// - If the messages contains a non-negative partition value it
/// leaves the message untouched.  This will cause `Producer` to try
/// to send the message to exactly that partition to.
/// - Otherwise, if the message has an "unspecified" `partition` -
/// this is, it has a negative partition value - and a specified key,
/// `DefaultPartitioner` will compute a hash from the key using the
/// underlying hasher and take `hash % num_all_partitions` to derive
/// the partition to send the message to.  This will consistently
/// cause messages with the same key to be sent to the same partition.
/// - Otherwise - a message with an "unspecified" `partition` and no
/// key - `DefaultPartitioner` will "randomly" pick one from the
/// "available" partitions trying to distribute the messages across
/// the multiple partitions.  In particular, it tries to distribute
/// such messsages across the "available" partitions in a round robin
/// fashion.  "Available" it this context means partitions with a
/// known leader.
/// This behavior may not suffice every workload.  If your application
/// is dependent on a particular distribution scheme different from
/// the one outlined above, you want to provide your own partioner to
/// the `Producer` at its initialization time.
/// See `Builder::with_partitioner`.
pub struct DefaultPartitioner<H = BuildHasherDefault<SipHasher>> {
    // ~ a hasher builder; used to consistently hash keys
    hash_builder: H,
    // ~ a counter incremented with each partitioned message to
    // achieve a different partition assignment for each message
    cntr: u32,

impl DefaultPartitioner {
    /// Creates a new partitioner which will use the given hash
    /// builder to hash message keys.
    pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
        DefaultPartitioner {
            hash_builder: hash_builder.into(),
            cntr: 0,

    pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
        where B: Hasher + Default
        DefaultPartitioner {
            hash_builder: BuildHasherDefault::<B>::default(),
            cntr: 0,

impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
    fn partition(&mut self, topics: Topics, rec: &mut client::ProduceMessage) {
        if rec.partition >= 0 {
            // ~ partition explicitely defined, trust the user
        let partitions = match topics.partitions(rec.topic) {
            None => return, // ~ unknown topic, this is not the place to deal with it.
            Some(partitions) => partitions,
        match rec.key {
            Some(key) => {
                let num_partitions = partitions.num_all();
                if num_partitions == 0 {
                    // ~ no partitions at all ... a rather strange
                    // topic. again, this is not the right place to
                    // deal with it.
                let mut h = self.hash_builder.build_hasher();
                // ~ unconditionally dispatch to partitions no matter
                // whether they are currently available or not.  this
                // guarantees consistency which is the point of
                // partitioning by key.  other behaviour - if desired
                // - can be implemented in custom, user provided
                // partitioners.
                let hash = h.finish() as u32;
                // if `num_partitions == u32::MAX` this can lead to a
                // negative partition ... such a partition count is very
                // unlikely though
                rec.partition = (hash % num_partitions) as i32;
            None => {
                // ~ no key available, determine a partition from the
                // available ones.
                let avail = partitions.available_ids();
                if avail.len() > 0 {
                    rec.partition = avail[self.cntr as usize % avail.len()];
                    // ~ update internal state so that the next time we choose
                    // a different partition
                    self.cntr = self.cntr.wrapping_add(1);

// --------------------------------------------------------------------

mod default_partitioner_tests {
    use std::hash::{Hasher, SipHasher, BuildHasherDefault};
    use std::collections::HashMap;

    use client;
    use super::{DefaultPartitioner, Partitioner, Partitions, Topics};

    fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
        let mut h = HashMap::new();
        for topic in topics {
            h.insert(topic.0.into(), topic.1);

    fn assert_partitioning<P: Partitioner>(topics: &HashMap<String, Partitions>,
                                           p: &mut P,
                                           topic: &str,
                                           key: &str)
                                           -> i32 {
        let mut msg = client::ProduceMessage {
            key: Some(key.as_bytes()),
            value: None,
            topic: topic,
            partition: -1,
        p.partition(Topics::new(topics), &mut msg);
        let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
        assert!(msg.partition >= 0 && msg.partition < num_partitions);

    /// Validate consistent partitioning on a message's key
    fn test_key_partitioning() {
        let h = topics_map(vec![("foo",
                                 Partitions {
                                    available_ids: vec![0, 1, 4],
                                    num_all_partitions: 5,
                                 Partitions {
                                    available_ids: vec![0, 1],
                                    num_all_partitions: 2,

        let mut p: DefaultPartitioner<BuildHasherDefault<SipHasher>> = Default::default();

        // ~ validate that partitioning by the same key leads to the same
        // partition
        let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
        let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
        assert_eq!(h1, h2);

        // ~ validate that partitioning by different keys leads to
        // different partitions (the keys are chosen such that they lead
        // to different partitions)
        let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
        let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
        assert!(h3 != h4);

    struct MyCustomHasher(u64);

    impl Hasher for MyCustomHasher {
        fn finish(&self) -> u64 {
        fn write(&mut self, bytes: &[u8]) {
            self.0 = bytes[0] as u64;

    /// Validate it is possible to register a custom hasher with the
    /// default partitioner
    fn default_partitioner_with_custom_hasher_default() {
        // this must compile
        let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();

        let h = topics_map(vec![("confirms",
                                 Partitions {
                                    available_ids: vec![0, 1],
                                    num_all_partitions: 2,
                                 Partitions {
                                    available_ids: vec![0, 1, 9],
                                    num_all_partitions: 10,

        // verify also the partitioner derives the correct partition
        // ... this is hash modulo num_all_partitions. here it is a
        // topic with a total of 2 partitions.
        let p1 = assert_partitioning(&h, &mut p, "confirms", "A" /* ascii: 65 */);
        assert_eq!(1, p1);

        // here it is a topic with a total of 10 partitions
        let p2 = assert_partitioning(&h, &mut p, "contents", "B" /* ascii: 66 */);
        assert_eq!(6, p2);