use crate::{builder::TracingProducerContext, metadata::RedpandaMetadata};
use rdkafka::{
error::KafkaError,
message::OwnedHeaders,
producer::FutureProducer,
util::Timeout, Timestamp,
};
use tracing::{event, instrument, Level};
type TracingProducer = FutureProducer<TracingProducerContext>;
pub use rdkafka::producer::FutureRecord;
pub use rdkafka::producer::Producer;
pub use rdkafka::producer::DeliveryFuture;
#[derive(Debug, Clone)]
pub struct RedpandaRecord {
topic: String,
key: Option<Vec<u8>>,
payload: Vec<u8>,
headers: Option<OwnedHeaders>,
created_timestamp: Timestamp,
}
impl RedpandaRecord {
pub fn new(topic: &str, key: Option<Vec<u8>>, payload: Vec<u8>, headers: Option<OwnedHeaders>) -> Self {
Self {
topic: topic.to_owned(),
key,
payload,
headers,
created_timestamp: Timestamp::now(),
}
}
}
impl<'a> From<&'a RedpandaRecord> for FutureRecord<'a, Vec<u8>, Vec<u8>> {
fn from(r: &'a RedpandaRecord) -> Self {
FutureRecord {
topic: &r.topic,
partition: Option::None,
payload: Some(&r.payload),
key: r.key.as_ref(),
timestamp: r.created_timestamp.to_millis(),
headers: r.headers.clone(),
}
}
}
#[derive(Clone)]
pub struct RedpandaProducer {
pub producer: TracingProducer,
}
impl RedpandaProducer {
#[instrument(skip(producer))]
pub fn new(producer: TracingProducer, request_timeout: Timeout) -> Result<Self, KafkaError> {
let client = producer.client();
match client.fetch_metadata(Option::None, request_timeout) {
Ok(m) => {
let m: RedpandaMetadata = m.into();
event!(
Level::INFO,
"Connected consumer to Redpanda cluster {:?}",
m
);
m
}
Err(e) => return Err(e),
};
Ok(Self { producer })
}
pub fn send_result<'a>(&self, record: &'a RedpandaRecord) ->
Result<DeliveryFuture, (KafkaError, FutureRecord<'a, Vec<u8>, Vec<u8>>)>
{
self.producer.send_result(record.into())
}
}