use std::time::Duration;
use kafka::error::Error as KafkaError;
use kafka::producer::{Producer, Record, RequiredAcks};
fn main() {
tracing_subscriber::fmt::init();
let broker = "localhost:9092";
let topic = "my-topic";
let data = "hello, kafka".as_bytes();
if let Err(e) = produce_message(data, topic, vec![broker.to_owned()]) {
println!("Failed producing messages: {}", e);
}
}
fn produce_message<'a, 'b>(
data: &'a [u8],
topic: &'b str,
brokers: Vec<String>,
) -> Result<(), KafkaError> {
println!("About to publish a message at {:?} to: {}", brokers, topic);
let mut producer = Producer::from_hosts(brokers)
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()?;
producer.send(&Record {
topic,
partition: -1,
key: (),
value: data,
})?;
producer.send(&Record::from_value(topic, data))?;
Ok(())
}