fn main() {
example::main();
}
#[cfg(feature = "security")]
mod example {
use kafkang;
use tracing::info;
use std::env;
use std::process;
use self::kafkang::client::{FetchOffset, KafkaClient, SecurityConfig, TlsConnector};
pub fn main() {
tracing_subscriber::fmt::init();
let cfg = match Config::from_cmdline() {
Ok(cfg) => cfg,
Err(e) => {
println!("{}", e);
process::exit(1);
}
};
let mut builder = TlsConnector::builder();
if let (Some(ccert), Some(ckey)) = (cfg.client_cert, cfg.client_key) {
info!("loading cert-file={}, key-file={}", ccert, ckey);
builder = builder.with_client_auth_pem_files(ccert, ckey).unwrap();
}
if let Some(ca_cert) = cfg.ca_cert {
info!("loading ca-file={}", ca_cert);
builder = builder.add_ca_certs_pem_file(ca_cert).unwrap();
}
let connector = builder.build().unwrap();
let mut client = KafkaClient::new_secure(
cfg.brokers,
SecurityConfig::new(connector).with_hostname_verification(cfg.verify_hostname),
);
match client.load_metadata_all() {
Err(e) => {
println!("{:?}", e);
drop(client);
process::exit(1);
}
Ok(_) => {
if client.topics().is_empty() {
println!("No topics available!");
} else {
let topics: Vec<String> = client.topics().names().map(Into::into).collect();
match client.fetch_offsets(topics.as_slice(), FetchOffset::Latest) {
Err(e) => {
println!("{:?}", e);
drop(client);
process::exit(1);
}
Ok(toffsets) => {
println!("Topic offsets:");
for (topic, mut offs) in toffsets {
offs.sort_by_key(|x| x.partition);
println!("{}", topic);
for off in offs {
println!("\t{}: {:?}", off.partition, off.offset);
}
}
}
}
}
}
}
}
struct Config {
brokers: Vec<String>,
client_cert: Option<String>,
client_key: Option<String>,
ca_cert: Option<String>,
verify_hostname: bool,
}
impl Config {
fn from_cmdline() -> Result<Config, String> {
let mut opts = getopts::Options::new();
opts.optflag("h", "help", "Print this help screen");
opts.optopt(
"",
"brokers",
"Specify kafka brokers (comma separated)",
"HOSTS",
);
opts.optopt("", "ca-cert", "Specify the trusted CA certificates", "FILE");
opts.optopt("", "client-cert", "Specify the client certificate", "FILE");
opts.optopt(
"",
"client-key",
"Specify key for the client certificate",
"FILE",
);
opts.optflag(
"",
"no-hostname-verification",
"Do not perform server hostname verification (insecure!)",
);
let args: Vec<_> = env::args().collect();
let m = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(e) => return Err(format!("{}", e)),
};
if m.opt_present("help") {
let brief = format!("{} [options]", args[0]);
return Err(opts.usage(&brief));
};
let brokers = m
.opt_str("brokers")
.map(|s| {
s.split(',')
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_else(|| vec!["localhost:9092".into()]);
if brokers.is_empty() {
return Err("Invalid --brokers specified!".to_owned());
}
Ok(Config {
brokers,
client_cert: m.opt_str("client-cert"),
client_key: m.opt_str("client-key"),
ca_cert: m.opt_str("ca-cert"),
verify_hostname: !m.opt_present("no-hostname-verification"),
})
}
}
}
#[cfg(not(feature = "security"))]
mod example {
use std::process;
pub fn main() {
println!("example relevant only with the \"security\" feature enabled!");
process::exit(1);
}
}