use rs_broker::Broker;
use rs_broker::{KafkaConfig, new_broker};
use std::env;
#[tokio::main]
async fn main() {
unsafe {
env::set_var("RUST_LOG", "debug");
}
env_logger::init();
let brokers = env::var("KAFKA_BROKERS").unwrap_or("localhost:9092".to_string());
let kafka_config = KafkaConfig::builder(&brokers)
.with_security_protocol("PLAINTEXT")
.build();
let broker = new_broker(kafka_config).expect("Could not create Kafka broker");
let topic = "my-topic";
let group = "group-1";
println!("consume message begin...");
broker
.subscribe(topic, group, |payload| {
let message = String::from_utf8_lossy(&payload).to_string();
async move {
println!("handler msg");
let _ = handler(&message).await;
Ok(())
}
})
.await
.expect("Could not subscribe to topic");
}
async fn async_handler(msg: Vec<u8>) -> Result<(), String> {
let s = String::from_utf8_lossy(&msg).to_string();
println!("handler msg {}", s);
let _ = handler(&s).await;
Ok(())
}
async fn handler(msg: &str) -> Result<(), String> {
println!("Got msg len: {}", msg.len());
println!("consumer msg: {}", msg);
Ok(())
}