use dsh_sdk::DshKafkaConfig;
use rdkafka::ClientConfig;
use rdkafka::Message;
use rdkafka::consumer::CommitMode;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
const TOTAL_MESSAGES: usize = 10;
async fn produce(producer: FutureProducer, topic: &str) {
println!("Producing messages to topic: {}", topic);
for key in 0..TOTAL_MESSAGES {
let payload = format!("hello world {}", key);
let msg = producer
.send(
FutureRecord::to(topic)
.payload(payload.as_bytes())
.key(&key.to_be_bytes()),
std::time::Duration::from_secs(1),
)
.await;
match msg {
Ok(_) => println!("Message {} sent to {}", key, topic),
Err(e) => println!("Error sending message: {}", e.0),
}
}
}
async fn consume(consumer: StreamConsumer, topic: &str) {
println!("Consuming messages from topic: {}", topic);
consumer.subscribe(&[topic]).unwrap();
let mut i = 0;
while i < TOTAL_MESSAGES {
let msg = consumer.recv().await.unwrap();
let payload = String::from_utf8_lossy(msg.payload().unwrap());
let key = usize::from_be_bytes(msg.key().unwrap().try_into().unwrap());
println!("Received message: key: {}, payload: {}", key, payload);
consumer.commit_message(&msg, CommitMode::Async).unwrap();
i += 1;
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::builder()
.filter(Some("dsh_sdk"), log::LevelFilter::Debug)
.target(env_logger::Target::Stdout)
.init();
println!("Enter topic to write and read from:");
let mut topic = String::new();
std::io::stdin()
.read_line(&mut topic)
.expect("Failed to read line");
let topic = topic.trim();
let producer: FutureProducer = ClientConfig::new().set_dsh_producer_config().create()?;
produce(producer, topic).await;
let consumer: StreamConsumer = ClientConfig::new().set_dsh_consumer_config().create()?;
consume(consumer, topic).await;
Ok(())
}