use std::time::Duration;
use krafka::consumer::{AutoOffsetReset, Consumer, ConsumerRecord};
use krafka::error::Result;
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("krafka=debug")
.init();
let consumer: Consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("krafka-advanced-consumer")
.auto_offset_reset(AutoOffsetReset::Earliest)
.enable_auto_commit(false) .build()
.await?;
println!("Consumer connected!");
consumer.subscribe(&["test-topic"]).await?;
let assignments = consumer.assignment().await;
println!("Assigned partitions: {:?}", assignments);
let mut message_count = 0;
let mut batch_count = 0;
loop {
let poll_result: Result<Vec<ConsumerRecord>> =
consumer.poll(Duration::from_millis(500)).await;
match poll_result {
Ok(records) => {
if records.is_empty() {
continue;
}
batch_count += 1;
println!("\n--- Batch {} ---", batch_count);
for record in &records {
message_count += 1;
println!(
"[{}] {} partition={} offset={}: {:?}",
message_count,
record.topic,
record.partition,
record.offset,
record.value_str().unwrap_or("<binary>"),
);
if message_count == 5 {
println!(
"\n>>> Pausing partition {} for demonstration...",
record.partition
);
consumer.pause(&record.topic, &[record.partition]).await;
let paused = consumer.paused_partitions().await;
println!(">>> Paused partitions: {:?}", paused);
}
if message_count == 10 {
println!("\n>>> Resuming all paused partitions...");
let paused = consumer.paused_partitions().await;
for (topic, partition) in paused {
consumer.resume(&topic, &[partition]).await;
}
}
if message_count == 15 {
println!(
"\n>>> Seeking to beginning of partition {}...",
record.partition
);
consumer
.seek_to_beginning(&record.topic, record.partition)
.await?;
}
}
println!("Committing offsets for {} records...", records.len());
consumer.commit().await?;
for (topic, partitions) in consumer.assignment().await {
for partition in partitions {
if let Some(pos) = consumer.position(&topic, partition).await {
println!(" Position {}-{}: {}", topic, partition, pos);
}
}
}
if message_count >= 20 {
println!("\n>>> Demo complete! Processed {} messages.", message_count);
break;
}
}
Err(e) => {
eprintln!("Error polling: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
consumer.commit().await?;
consumer.close().await;
println!("Consumer closed gracefully.");
Ok(())
}