rustfs-kafka 1.2.0

Rust client for Apache Kafka
Documentation
use std::io::{self, Write};
use std::time::Duration;
use std::{env, process};
//use std::ascii::AsciiExt;

use anyhow::{Result, bail};
use rustfs_kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};

use tracing::{error, info};

/// This is a very simple command line application reading from a
/// specific kafka topic and dumping the messages to standard output.
fn main() {
    tracing_subscriber::fmt::init();

    let cfg = match Config::from_cmdline() {
        Ok(cfg) => cfg,
        Err(e) => {
            error!("{}", e);
            process::exit(1);
        }
    };
    info!(
        "Starting consumer with the following configuration: {:?}",
        cfg
    );

    if let Err(e) = process(cfg) {
        error!("{}", e);
        process::exit(1);
    }
}

fn process(cfg: Config) -> Result<()> {
    let mut c = {
        let mut cb = Consumer::from_hosts(cfg.brokers)
            .with_group(cfg.group)
            .with_fallback_offset(cfg.fallback_offset)
            .with_fetch_max_wait_time(Duration::from_secs(1))
            .with_fetch_min_bytes(1_000)
            .with_fetch_max_bytes_per_partition(100_000)
            .with_retry_max_bytes_limit(1_000_000)
            .with_offset_storage(cfg.offset_storage)
            .with_client_id("kafka-rust-console-consumer".into());
        for topic in cfg.topics {
            cb = cb.with_topic(topic);
        }
        cb.create()?
    };

    let stdout = io::stdout();
    let mut stdout = stdout.lock();
    let mut buf = Vec::with_capacity(1024);

    loop {
        let mss = c.poll()?;
        for ms in mss.iter() {
            for m in ms.messages() {
                // ~ clear the output buffer
                buf.clear();
                // ~ format the message for output
                let _ = writeln!(buf, "{}:{}@{}:", ms.topic(), ms.partition(), m.offset);
                buf.extend_from_slice(&m.value);
                buf.push(b'\n');
                // ~ write to output channel
                stdout.write_all(&buf)?;
            }
            c.consume_messageset(&ms)?;
        }
        if cfg.commit {
            c.commit_consumed()?;
        }
        if !cfg.follow {
            return Ok(());
        }
    }
}

#[derive(Debug)]
struct Config {
    brokers: Vec<String>,
    group: String,
    topics: Vec<String>,
    commit: bool,
    offset_storage: Option<GroupOffsetStorage>,
    fallback_offset: FetchOffset,
    follow: bool,
}

impl Config {
    fn from_cmdline() -> Result<Config> {
        let args: Vec<_> = env::args().collect();
        let mut opts = getopts::Options::new();
        opts.optflag("h", "help", "Print this help screen");
        opts.optopt(
            "",
            "brokers",
            "Specify kafka brokers (comma separated)",
            "HOSTS",
        );
        opts.optopt("", "topics", "Specify topics (comma separated)", "NAMES");
        opts.optopt("", "group", "Specify the consumer group", "NAME");

        opts.optflag("", "commit", "Commit group offsets");
        opts.optopt(
            "",
            "storage",
            "Specify the offset store [zookeeper, kafka]",
            "STORE",
        );

        opts.optflag(
            "",
            "earliest",
            "Fall back to the earliest offset (when no group offset available)",
        );
        opts.optflag("", "follow", "Continue reading from the topic indefinitely");

        macro_rules! on_error {
            ($name:expr) => {{
                let brief = format!("{} [options]", args[0]);
                println!("{}", opts.usage(&brief));
                bail!($name);
            }};
        }
        if args.len() == 1 {
            on_error!("no arguments provided");
        }

        let m = match opts.parse(&args[1..]) {
            Ok(m) => m,
            Err(e) => {
                on_error!(format!(
                    "argument parsing encountered an error: {}",
                    e.to_string()
                ))
            }
        };
        if m.opt_present("help") {
            on_error!("help requested");
        }

        macro_rules! required_list {
            ($name:expr) => {{
                let opt = $name;
                let xs: Vec<_> = match m.opt_str(opt) {
                    None => Vec::new(),
                    Some(s) => s
                        .split(',')
                        .map(|s| s.trim().to_owned())
                        .filter(|s| !s.is_empty())
                        .collect(),
                };
                if xs.is_empty() {
                    on_error!(format!("missing argument --{}", opt));
                }
                xs
            }};
        }

        let brokers = required_list!("brokers");
        let topics = required_list!("topics");

        let offset_storage = if let Some(s) = m.opt_str("storage") {
            if s.eq_ignore_ascii_case("zookeeper") {
                Some(GroupOffsetStorage::Zookeeper)
            } else if s.eq_ignore_ascii_case("kafka") {
                Some(GroupOffsetStorage::Kafka)
            } else {
                on_error!(format!("unknown offset store: {}", s));
            }
        } else {
            None
        };
        Ok(Config {
            brokers,
            group: m.opt_str("group").unwrap_or_default(),
            topics,
            commit: m.opt_present("commit"),
            offset_storage,
            fallback_offset: if m.opt_present("earliest") {
                FetchOffset::Earliest
            } else {
                FetchOffset::Latest
            },
            follow: m.opt_present("follow"),
        })
    }
}