use kafkang::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use kafkang::error::Error as KafkaError;
fn main() {
tracing_subscriber::fmt::init();
let broker = "localhost:9092".to_owned();
let topic = "my-topic".to_owned();
let group = "my-group".to_owned();
if let Err(e) = consume_messages(group, topic, vec![broker]) {
println!("Failed consuming messages: {}", e);
}
}
fn consume_messages(group: String, topic: String, brokers: Vec<String>) -> Result<(), KafkaError> {
let mut con = Consumer::from_hosts(brokers)
.with_topic(topic)
.with_group(group)
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()?;
loop {
let mss = con.poll()?;
if mss.is_empty() {
println!("No messages available right now.");
return Ok(());
}
for ms in mss.into_iter() {
for m in ms.messages() {
println!(
"{}:{}@{}: {:?}",
ms.topic(),
ms.partition(),
m.offset,
m.value
);
}
let _ = con.consume_messageset(&ms);
}
con.commit_consumed()?;
}
}