use std::env;
use dsh_sdk::DshKafkaConfig;
use rdkafka::ClientConfig;
use rdkafka::Message;
use rdkafka::consumer::{Consumer, StreamConsumer};
const KAFKA_BOOTSTRAP_SERVERS: &str = "kafkaproxy urls"; const PKI_CONFIG_DIR: &str = "path/to/pki/config/dir"; const DSH_TENANT_NAME: &str = "tenant"; const TOPIC: &str = "scratch.topic-name.tenant";
async fn consume(consumer: StreamConsumer) {
consumer.subscribe(&[TOPIC]).unwrap();
loop {
let msg = consumer.recv().await.unwrap();
let payload = String::from_utf8_lossy(msg.payload().unwrap());
println!(
"Received message: key: {:?}, payload: {}",
msg.key(),
payload
);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
unsafe {
env::set_var("KAFKA_BOOTSTRAP_SERVERS", KAFKA_BOOTSTRAP_SERVERS);
env::set_var("PKI_CONFIG_DIR", PKI_CONFIG_DIR);
env::set_var("DSH_TENANT_NAME", DSH_TENANT_NAME);
}
let consumer: StreamConsumer = ClientConfig::new().set_dsh_consumer_config().create()?;
consume(consumer).await;
Ok(())
}