use crate::connectors::{ConnectorError, StreamConnector};
use crate::RS2Stream;
use async_stream::stream;
use async_trait::async_trait;
use futures_util::StreamExt;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{Message, TopicPartitionList};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConnector {
bootstrap_servers: String,
consumer_group: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConfig {
pub topic: String,
pub group_id: Option<String>,
pub partition: Option<i32>,
pub from_beginning: bool,
pub kafka_config: Option<HashMap<String, String>>,
pub enable_auto_commit: bool,
pub auto_commit_interval_ms: Option<u64>,
pub session_timeout_ms: Option<u64>,
pub message_timeout_ms: Option<u64>,
}
impl Default for KafkaConfig {
fn default() -> Self {
Self {
topic: String::new(),
group_id: None,
partition: None,
from_beginning: false,
kafka_config: None,
enable_auto_commit: true,
auto_commit_interval_ms: Some(5000),
session_timeout_ms: Some(30000),
message_timeout_ms: Some(30000),
}
}
}
#[derive(Debug, Clone)]
pub struct KafkaMetadata {
pub topic: String,
pub partition_count: i32,
pub messages_produced: u64,
pub messages_consumed: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub last_offset: Option<i64>,
pub consumer_lag: Option<i64>,
pub throughput: f64,
}
impl KafkaConnector {
pub fn new(bootstrap_servers: &str) -> Self {
Self {
bootstrap_servers: bootstrap_servers.to_string(),
consumer_group: None,
}
}
pub fn with_consumer_group(mut self, group_id: &str) -> Self {
self.consumer_group = Some(group_id.to_string());
self
}
fn create_consumer_config(&self, config: &KafkaConfig) -> ClientConfig {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &self.bootstrap_servers)
.set("enable.auto.commit", &config.enable_auto_commit.to_string())
.set(
"session.timeout.ms",
&config.session_timeout_ms.unwrap_or(30000).to_string(),
);
let group_id = config
.group_id
.clone()
.or_else(|| self.consumer_group.clone());
if let Some(group_id) = group_id {
client_config.set("group.id", group_id);
}
if let Some(interval) = config.auto_commit_interval_ms {
client_config.set("auto.commit.interval.ms", &interval.to_string());
}
if config.from_beginning {
client_config.set("auto.offset.reset", "earliest");
} else {
client_config.set("auto.offset.reset", "latest");
}
if let Some(kafka_config) = &config.kafka_config {
for (key, value) in kafka_config {
client_config.set(key, value);
}
}
client_config
}
fn create_producer_config(&self, config: &KafkaConfig) -> ClientConfig {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &self.bootstrap_servers)
.set(
"message.timeout.ms",
&config.message_timeout_ms.unwrap_or(30000).to_string(),
);
if let Some(kafka_config) = &config.kafka_config {
for (key, value) in kafka_config {
client_config.set(key, value);
}
}
client_config
}
}
#[async_trait]
impl<T> StreamConnector<T> for KafkaConnector
where
T: for<'de> Deserialize<'de> + Serialize + Send + 'static,
{
type Config = KafkaConfig;
type Error = ConnectorError;
type Metadata = KafkaMetadata;
async fn from_source(&self, config: Self::Config) -> Result<RS2Stream<T>, Self::Error> {
let client_config = self.create_consumer_config(&config);
let consumer: StreamConsumer = client_config
.create()
.map_err(|e| ConnectorError::ConnectionFailed(e.to_string()))?;
let topics = vec![config.topic.as_str()];
consumer
.subscribe(&topics)
.map_err(|e| ConnectorError::ConnectorSpecific(e.to_string()))?;
if let Some(partition) = config.partition {
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&config.topic, partition);
consumer
.assign(&tpl)
.map_err(|e| ConnectorError::ConnectorSpecific(e.to_string()))?;
}
let topic = config.topic.clone();
let stream = stream! {
loop {
match consumer.recv().await {
Ok(message) => {
if let Some(payload) = message.payload() {
match serde_json::from_slice::<T>(payload) {
Ok(item) => {
log::debug!("Received message from Kafka topic: {}", topic);
yield item;
}
Err(e) => {
log::error!("Failed to deserialize Kafka message: {}", e);
continue;
}
}
}
}
Err(e) => {
log::error!("Kafka consumer error: {}", e);
break;
}
}
}
};
Ok(stream.boxed())
}
async fn to_sink(
&self,
stream: RS2Stream<T>,
config: Self::Config,
) -> Result<Self::Metadata, Self::Error> {
if config.topic.trim().is_empty() {
return Err(ConnectorError::InvalidConfiguration(
"Topic name cannot be empty".to_string(),
));
}
let client_config = self.create_producer_config(&config);
let producer: FutureProducer = client_config
.create()
.map_err(|e| ConnectorError::ConnectionFailed(e.to_string()))?;
let start = Instant::now();
let messages_produced = Arc::new(Mutex::new(0u64));
let bytes_sent = Arc::new(Mutex::new(0u64));
stream
.for_each(|item| {
let producer = producer.clone();
let topic = config.topic.clone();
let partition = config.partition;
let messages_counter = Arc::clone(&messages_produced);
let bytes_counter = Arc::clone(&bytes_sent);
async move {
match serde_json::to_vec(&item) {
Ok(payload) => {
let key_string = if let Ok(message_str) =
serde_json::from_slice::<String>(&payload)
{
if let Some(key) = message_str.split('-').next() {
Some(key.to_string())
} else {
None
}
} else {
None
};
let mut record = FutureRecord::to(&topic).payload(&payload);
if let Some(ref key) = key_string {
record = record.key(key);
}
if let Some(p) = partition {
record = record.partition(p);
}
match producer.send(record, Duration::from_secs(30)).await {
Ok(_) => {
*messages_counter.lock().await += 1; *bytes_counter.lock().await += payload.len() as u64; log::debug!("Sent message to Kafka topic: {}", topic);
}
Err((e, _)) => {
log::error!("Failed to send message to Kafka: {}", e);
}
}
}
Err(e) => {
log::error!("Failed to serialize message for Kafka: {}", e);
}
}
}
})
.await;
let elapsed = start.elapsed();
let final_messages_produced = *messages_produced.lock().await; let final_bytes_sent = *bytes_sent.lock().await;
let throughput = if elapsed.as_secs_f64() > 0.0 {
final_messages_produced as f64 / elapsed.as_secs_f64()
} else {
0.0
};
Ok(KafkaMetadata {
topic: config.topic,
partition_count: 1,
messages_produced: final_messages_produced,
messages_consumed: 0,
bytes_sent: final_bytes_sent,
bytes_received: 0,
last_offset: None,
consumer_lag: None,
throughput,
})
}
async fn health_check(&self) -> Result<bool, Self::Error> {
let client_config = ClientConfig::new()
.set("bootstrap.servers", &self.bootstrap_servers)
.clone();
let consumer: StreamConsumer = client_config
.create()
.map_err(|e| ConnectorError::ConnectionFailed(e.to_string()))?;
match consumer.fetch_metadata(Some("__consumer_offsets"), Duration::from_secs(5)) {
Ok(_) => Ok(true),
Err(e) => Err(ConnectorError::ConnectionFailed(format!(
"Failed to fetch metadata: {}",
e
))),
}
}
async fn metadata(&self) -> Result<Self::Metadata, Self::Error> {
Ok(KafkaMetadata {
topic: "unknown".to_string(),
partition_count: 0,
messages_produced: 0,
messages_consumed: 0,
bytes_sent: 0,
bytes_received: 0,
last_offset: None,
consumer_lag: None,
throughput: 0.0,
})
}
fn name(&self) -> &'static str {
"kafka"
}
fn version(&self) -> &'static str {
"1.0.0"
}
}