use std::time::Duration;
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;
use super::make_producer;
use super::KafkaSettings;
#[derive(Clone)]
pub struct KafkaClient {
pub producer: FutureProducer,
pub queue_timeout: u32,
}
impl KafkaClient {
pub fn build(settings: &KafkaSettings) -> Self {
Self {
producer: make_producer(settings),
queue_timeout: settings.queue_timeout_secs.unwrap(),
}
}
pub async fn send_message<T>(&self, topic: &str, message: &T) -> Option<String>
where
T: serde::Serialize,
{
let msg = match serde_json::to_string(message) {
Ok(val) => val,
Err(e) => {
log::error!("kafka-message-serde-failed: error={:?}", e);
return None;
}
};
let queue_timeout = Duration::from_secs(self.queue_timeout as u64);
let message_id = uuid::Uuid::now_v7().to_string();
let record = FutureRecord::to(topic).key(&message_id).payload(&msg);
match self.producer.send(record, queue_timeout).await {
Ok(_) => Some(message_id),
Err(e) => {
log::error!("kafka-message-send-failed: error={}", e.0);
None
}
}
}
pub async fn msg(&self, topic: &str, msg: &str) -> Option<String> {
let queue_timeout = Duration::from_secs(self.queue_timeout as u64);
let message_id = uuid::Uuid::now_v7().to_string();
let record = FutureRecord::to(topic).key(&message_id).payload(msg);
match self.producer.send(record, queue_timeout).await {
Ok(_) => Some(message_id),
Err(e) => {
log::error!("kafka-message-send-failed: error={:?}", e);
None
}
}
}
pub async fn publish(&self, topic: &str, buf: &[u8]) -> Option<String> {
match std::str::from_utf8(buf) {
Ok(utf8_string) => {
let queue_timeout = Duration::from_secs(self.queue_timeout as u64);
let message_id = uuid::Uuid::now_v7().to_string();
let record = FutureRecord::to(topic)
.key(&message_id)
.payload(utf8_string);
match self.producer.send(record, queue_timeout).await {
Ok(_) => Some(message_id),
Err(e) => {
log::error!("kafka-message-send-failed: error={:?}", e);
None
}
}
}
Err(e) => {
log::error!("kafka-message-send-failed: error={:?}", e);
None
}
}
}
}