use crate::broker::{BrokerError, PublishResult};
use chrono::Utc;
use parking_lot::Mutex;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rskafka::BackoffConfig;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
pub struct KafkaPublisher {
runtime: tokio::runtime::Runtime,
clients: Mutex<HashMap<String, Arc<Client>>>,
}
impl Default for KafkaPublisher {
fn default() -> Self {
Self::new()
}
}
impl KafkaPublisher {
pub fn new() -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("kafka-runtime")
.enable_all()
.build()
.expect("failed to create Kafka runtime");
Self {
runtime,
clients: Mutex::new(HashMap::new()),
}
}
pub fn publish_blocking(
&self,
brokers: &str,
topic: &str,
key: Option<String>,
payload: &str,
headers: BTreeMap<String, String>,
) -> Result<PublishResult, BrokerError> {
self.runtime
.block_on(self.publish(brokers, topic, key, payload, headers))
}
async fn publish(
&self,
brokers: &str,
topic: &str,
key: Option<String>,
payload: &str,
headers: BTreeMap<String, String>,
) -> Result<PublishResult, BrokerError> {
let client = self.get_or_connect(brokers).await?;
let partition_client = client
.partition_client(topic.to_string(), 0, UnknownTopicHandling::Retry)
.await
.map_err(|e| BrokerError::ConnectionFailed(e.to_string()))?;
let record = Record {
key: key.map(|k| k.into_bytes()),
value: Some(payload.as_bytes().to_vec()),
headers: headers
.into_iter()
.map(|(k, v)| (k, v.into_bytes()))
.collect(),
timestamp: Utc::now(),
};
let offsets = partition_client
.produce(vec![record], Compression::NoCompression)
.await
.map_err(|e| BrokerError::PublishFailed(e.to_string()))?;
let offset = offsets.first().copied();
Ok(PublishResult {
success: true,
error: None,
topic: topic.to_string(),
partition: Some(0),
offset,
})
}
async fn get_or_connect(&self, brokers: &str) -> Result<Arc<Client>, BrokerError> {
{
let clients = self.clients.lock();
if let Some(client) = clients.get(brokers) {
return Ok(client.clone());
}
}
let broker_list: Vec<String> = brokers.split(',').map(|s| s.trim().to_string()).collect();
let backoff = BackoffConfig {
deadline: Some(Duration::from_secs(5)),
..Default::default()
};
let client = ClientBuilder::new(broker_list)
.backoff_config(backoff)
.build()
.await
.map_err(|e| BrokerError::ConnectionFailed(e.to_string()))?;
let client = Arc::new(client);
tracing::info!(brokers = %brokers, "established Kafka connection");
{
let mut clients = self.clients.lock();
clients.insert(brokers.to_string(), client.clone());
}
Ok(client)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn publisher_starts_empty() {
let publisher = KafkaPublisher::new();
let clients = publisher.clients.lock();
assert!(clients.is_empty());
}
#[test]
fn default_impl() {
let publisher = KafkaPublisher::default();
let clients = publisher.clients.lock();
assert!(clients.is_empty());
}
#[test]
fn publish_blocking_connection_refused() {
let publisher = KafkaPublisher::new();
let result = publisher.publish_blocking(
"127.0.0.1:19092",
"test-topic",
None,
"hello",
BTreeMap::new(),
);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, BrokerError::ConnectionFailed(_)));
}
#[test]
fn publish_blocking_from_thread_scope() {
let publisher = KafkaPublisher::new();
let result = std::thread::scope(|s| {
s.spawn(|| {
publisher.publish_blocking(
"127.0.0.1:19092",
"test-topic",
None,
"hello",
BTreeMap::new(),
)
})
.join()
.unwrap()
});
assert!(matches!(result, Err(BrokerError::ConnectionFailed(_))));
}
#[test]
fn publish_blocking_with_key_and_headers() {
let publisher = KafkaPublisher::new();
let mut headers = BTreeMap::new();
headers.insert("x-request-id".to_string(), "req-123".to_string());
let result = publisher.publish_blocking(
"127.0.0.1:19092",
"test-topic",
Some("order-456".to_string()),
r#"{"orderId":"456"}"#,
headers,
);
assert!(matches!(result, Err(BrokerError::ConnectionFailed(_))));
}
#[test]
fn publish_blocking_comma_separated_brokers() {
let publisher = KafkaPublisher::new();
let result = publisher.publish_blocking(
"127.0.0.1:19092, 127.0.0.1:19093",
"test-topic",
None,
"hello",
BTreeMap::new(),
);
assert!(matches!(result, Err(BrokerError::ConnectionFailed(_))));
}
}