kafka 0.3.2

Rust client for Apache Kafka
Documentation
extern crate kafka;
extern crate getopts;
extern crate env_logger;

use std::{env, io, fmt, process};

use kafka::consumer::{Consumer, FetchOffset};

/// This is a very simple command line application reading from a
/// specific kafka topic and dumping the messages to standard output.
fn main() {
    env_logger::init().unwrap();

    let cfg = match Config::from_cmdline() {
        Ok(cfg) => cfg,
        Err(e) => {
            println!("{}", e);
            process::exit(1);
        }
    };
    if let Err(e) = process(cfg) {
        println!("{}", e);
        process::exit(1);
    }
}

fn process(cfg: Config) -> Result<(), Error> {
    let mut c =
        try!(Consumer::from_hosts(cfg.brokers, cfg.group, cfg.topic)
             .with_fetch_max_wait_time(100)
             .with_fetch_min_bytes(1_000)
             .with_fetch_max_bytes_per_partition(100_000)
             .with_fallback_offset(FetchOffset::Earliest)
             .with_retry_max_bytes_limit(1_000_000)
             .create());

    let do_commit = !cfg.no_commit;
    loop {
        for ms in try!(c.poll()).iter() {
            for m in ms.messages() {
                let s = String::from_utf8_lossy(m.value);
                println!("{}:{}@{}: {}", ms.topic(), ms.partition(), m.offset, s.trim());
            }
            c.consume_messageset(ms);
        }
        if do_commit {
            try!(c.commit_consumed());
        }
    }
}

// --------------------------------------------------------------------

enum Error {
    Kafka(kafka::error::Error),
    Io(io::Error),
    Literal(String),
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            &Error::Kafka(ref e) => write!(f, "kafka-error: {}", e),
            &Error::Io(ref e) => write!(f, "io-error: {}", e),
            &Error::Literal(ref s) => write!(f, "{}", s),
        }
    }
}

impl From<kafka::error::Error> for Error {
    fn from(e: kafka::error::Error) -> Self { Error::Kafka(e) }
}

impl From<io::Error> for Error {
    fn from(e: io::Error) -> Self { Error::Io(e) }
}

// --------------------------------------------------------------------

struct Config {
    brokers: Vec<String>,
    group: String,
    topic: String,
    no_commit: bool,
}

impl Config {
    fn from_cmdline() -> Result<Config, Error> {
        let args: Vec<_> = env::args().collect();
        let mut opts = getopts::Options::new();
        opts.optflag("h", "help", "Print this help screen");
        opts.optopt("", "brokers", "Specify kafka brokers (comma separated)", "HOSTS");
        opts.optopt("", "topic", "Specify target topic", "NAME");
        opts.optopt("", "group", "Specify the group_id file", "NAME");
        opts.optflag("", "no-commit", "Do not commit consumed messages");

        let m = match opts.parse(&args[1..]) {
            Ok(m) => m,
            Err(e) => return Err(Error::Literal(e.to_string())),
        };
        if m.opt_present("help") {
            let brief = format!("{} [options]", args[0]);
            return Err(Error::Literal(opts.usage(&brief)));
        }
        Ok(Config {
            brokers: m.opt_str("brokers")
                .unwrap_or_else(|| "localhost:9092".to_owned())
                .split(',')
                .map(|s| s.trim().to_owned())
                .collect(),
            group: m.opt_str("group")
                .unwrap_or_else(|| "my-group".to_owned()),
            topic: m.opt_str("topic")
                .unwrap_or_else(|| "my-topic".to_owned()),
            no_commit: m.opt_present("no-commit"),
        })
   }
}