use crate::Config;
use crate::config::Ack;
use rdkafka::message::ToBytes;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Clone)]
pub struct Producer {
inner: rdkafka::producer::FutureProducer,
}
impl Producer {
pub fn build(config: &Config) -> Result<Producer, rdkafka::error::KafkaError> {
let producer: rdkafka::producer::FutureProducer = config.inner.create()?;
Ok(Self { inner: producer })
}
pub fn build_with_reliability(config: &Config) -> Result<Producer, rdkafka::error::KafkaError> {
let mut config = config.clone();
config.set_idempotence(true).set_ack(Ack::All);
Self::build(&config)
}
pub fn producer(&self) -> &rdkafka::producer::FutureProducer {
&self.inner
}
pub async fn simple_send<'a, P: ToBytes + ?Sized>(
&self,
topic: &'a str,
payload: &'a P,
timeout: Option<Duration>,
) -> Result<i64, rdkafka::error::KafkaError> {
self.simple_send_with_key(topic, "", payload, timeout).await
}
pub async fn simple_send_with_key<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized>(
&self,
topic: &'a str,
key: &'a K,
payload: &'a P,
timeout: Option<Duration>,
) -> Result<i64, rdkafka::error::KafkaError> {
let header: Option<HashMap<&'a str, &'a str>> = None;
self.simple_send_with_header(topic, key, payload, header, timeout)
.await
}
pub async fn simple_send_with_header<
'a,
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
V: ToBytes + ?Sized,
>(
&self,
topic: &'a str,
key: &'a K,
payload: &'a P,
header: Option<HashMap<&'a str, &V>>,
timeout: Option<Duration>,
) -> Result<i64, rdkafka::error::KafkaError> {
let mut record = rdkafka::producer::FutureRecord::to(topic)
.key(key)
.payload(payload);
if let Some(h) = header {
let mut header = rdkafka::message::OwnedHeaders::new();
for (k, v) in h {
header = header.insert(rdkafka::message::Header {
key: k,
value: Some(v),
});
}
record = record.headers(header);
}
match self.inner.send(record, timeout).await {
Ok(d) => Ok(d.offset),
Err(e) => Err(e.0),
}
}
}