fn main() {
example::main();
}
#[cfg(any(feature = "security", feature = "security-ring"))]
mod example {
use rustfs_kafka;
use tracing::info;
use std::env;
use std::process;
use self::rustfs_kafka::client::{FetchOffset, KafkaClient, SecurityConfig};
pub fn main() {
tracing_subscriber::fmt::init();
let cfg = match Config::from_cmdline() {
Ok(cfg) => cfg,
Err(e) => {
println!("{}", e);
process::exit(1);
}
};
let mut security_config = SecurityConfig::new();
security_config = security_config.with_hostname_verification(cfg.verify_hostname);
if let Some(ca_cert) = cfg.ca_cert {
info!("loading ca-file={}", ca_cert);
security_config = security_config.with_ca_cert(ca_cert);
}
if let (Some(client_cert), Some(client_key)) = (cfg.client_cert, cfg.client_key) {
info!("loading cert-file={}, key-file={}", client_cert, client_key);
security_config = security_config.with_client_cert(client_cert, client_key);
}
let mut client = KafkaClient::new_secure(cfg.brokers, security_config);
match client.load_metadata_all() {
Err(e) => {
println!("{:?}", e);
drop(client);
process::exit(1);
}
Ok(_) => {
if client.topics().is_empty() {
println!("No topics available!");
} else {
let topics: Vec<String> = client.topics().names().map(Into::into).collect();
match client.fetch_offsets(topics.as_slice(), FetchOffset::Latest) {
Err(e) => {
println!("{:?}", e);
drop(client);
process::exit(1);
}
Ok(toffsets) => {
println!("Topic offsets:");
for (topic, mut offs) in toffsets {
offs.sort_by_key(|x| x.partition);
println!("{}", topic);
for off in offs {
println!("\t{}: {:?}", off.partition, off.offset);
}
}
}
}
}
}
}
}
struct Config {
brokers: Vec<String>,
client_cert: Option<String>,
client_key: Option<String>,
ca_cert: Option<String>,
verify_hostname: bool,
}
impl Config {
fn from_cmdline() -> Result<Config, String> {
let mut opts = getopts::Options::new();
opts.optflag("h", "help", "Print this help screen");
opts.optopt(
"",
"brokers",
"Specify kafka brokers (comma separated)",
"HOSTS",
);
opts.optopt("", "ca-cert", "Specify the trusted CA certificates", "FILE");
opts.optopt("", "client-cert", "Specify the client certificate", "FILE");
opts.optopt(
"",
"client-key",
"Specify key for the client certificate",
"FILE",
);
opts.optflag(
"",
"no-hostname-verification",
"Do not perform server hostname verification (insecure!)",
);
let args: Vec<_> = env::args().collect();
let m = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(e) => return Err(format!("{}", e)),
};
if m.opt_present("help") {
let brief = format!("{} [options]", args[0]);
return Err(opts.usage(&brief));
};
let brokers = m
.opt_str("brokers")
.map(|s| {
s.split(',')
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_else(|| vec!["localhost:9093".into()]);
if brokers.is_empty() {
return Err("Invalid --brokers specified!".to_owned());
}
Ok(Config {
brokers,
client_cert: m.opt_str("client-cert"),
client_key: m.opt_str("client-key"),
ca_cert: m.opt_str("ca-cert"),
verify_hostname: !m.opt_present("no-hostname-verification"),
})
}
}
}
#[cfg(not(any(feature = "security", feature = "security-ring")))]
mod example {
use std::process;
pub fn main() {
println!("example relevant only with a rustls security feature enabled!");
println!("Try: cargo run --example example-rustls --features=security");
process::exit(1);
}
}