use anyhow::Result;
use danube_client::{DanubeClient, SubType};
use std::env;
#[tokio::main]
async fn main() -> Result<()> {
let consumer_name = env::args()
.nth(1)
.unwrap_or_else(|| "ks_consumer".to_string());
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let topic = "/default/orders_topic";
let subscription = "orders_key_shared";
let mut consumer = client
.new_consumer()
.with_topic(topic)
.with_consumer_name(&consumer_name)
.with_subscription(subscription)
.with_subscription_type(SubType::KeyShared)
.build()?;
consumer.subscribe().await?;
println!(
"✅ Consumer '{}' subscribed to '{}' (Key-Shared)",
consumer_name, topic
);
println!("🎧 Listening for messages...\n");
let mut stream = consumer.receive().await?;
let mut count = 0;
while let Some(message) = stream.recv().await {
let payload = String::from_utf8_lossy(&message.payload);
let key = message.routing_key.as_deref().unwrap_or("<none>");
println!(
"📥 [{}] key={:<10} | offset={} | '{}'",
consumer_name, key, message.msg_id.topic_offset, payload
);
consumer.ack(&message).await?;
count += 1;
}
println!(
"\n✅ Consumer '{}' received {} messages total",
consumer_name, count
);
Ok(())
}