use crate::config::{CommonConfig, WriteConfig};
use rdkafka::error::KafkaError;
use rdkafka::message::ToBytes;
use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;
use std::time::Duration;
#[derive(Clone)]
pub struct Writer {
producer: rdkafka::producer::FutureProducer,
}
impl Writer {
pub fn new(cfg: &WriteConfig) -> Result<Self, KafkaError> {
let producer: rdkafka::producer::FutureProducer = cfg.inner.create()?;
Ok(Writer { producer })
}
pub fn new_with_brokers<V: AsRef<str>>(brokers: &[V]) -> Result<Self, KafkaError> {
let mut c = WriteConfig::default();
c.set_brokers(brokers);
Writer::new(&c)
}
pub async fn write<K, P>(&self, record: FutureRecord<'_, K, P>) -> Result<(), KafkaError>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
{
self.producer
.send(record, Duration::from_secs(3))
.await
.map(|_| ())
.map_err(|e| e.0)
}
pub async fn write_timeout<K, P, T>(
&self,
record: FutureRecord<'_, K, P>,
queue_timeout: T,
) -> Result<(), KafkaError>
where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
T: Into<Timeout>,
{
self.producer
.send(record, queue_timeout)
.await
.map(|_| ())
.map_err(|e| e.0)
}
}