use async_trait::async_trait;
use serde::Serialize;
use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
#[derive(Debug, Clone)]
pub struct KafkaConfig {
pub brokers: String,
pub default_topic: String,
pub client_id: String,
pub acks: String,
pub timeout_ms: u64,
pub compression: Option<String>,
}
impl KafkaConfig {
#[must_use]
pub fn new(brokers: impl Into<String>, default_topic: impl Into<String>) -> Self {
Self {
brokers: brokers.into(),
default_topic: default_topic.into(),
client_id: "fraiseql".to_string(),
acks: "all".to_string(),
timeout_ms: 30_000,
compression: None,
}
}
#[must_use]
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.client_id = client_id.into();
self
}
#[must_use]
pub fn with_acks(mut self, acks: impl Into<String>) -> Self {
self.acks = acks.into();
self
}
#[must_use]
pub const fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
#[must_use]
pub fn with_compression(mut self, compression: impl Into<String>) -> Self {
self.compression = Some(compression.into());
self
}
}
#[derive(Debug, Clone, Serialize)]
pub struct KafkaMessage {
pub event_id: String,
pub subscription_name: String,
pub entity_type: String,
pub entity_id: String,
pub operation: String,
pub data: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub old_data: Option<serde_json::Value>,
pub timestamp: String,
pub sequence_number: u64,
}
impl KafkaMessage {
#[must_use]
pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
Self {
event_id: event.event_id.clone(),
subscription_name: subscription_name.to_string(),
entity_type: event.entity_type.clone(),
entity_id: event.entity_id.clone(),
operation: format!("{:?}", event.operation),
data: event.data.clone(),
old_data: event.old_data.clone(),
timestamp: event.timestamp.to_rfc3339(),
sequence_number: event.sequence_number,
}
}
#[must_use]
pub fn key(&self) -> &str {
&self.entity_id
}
}
#[cfg(feature = "kafka")]
pub struct KafkaAdapter {
config: KafkaConfig,
producer: rdkafka::producer::FutureProducer,
}
#[cfg(feature = "kafka")]
impl std::fmt::Debug for KafkaAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaAdapter")
.field("brokers", &self.config.brokers)
.field("default_topic", &self.config.default_topic)
.field("client_id", &self.config.client_id)
.finish_non_exhaustive()
}
}
#[cfg(feature = "kafka")]
impl KafkaAdapter {
pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
use rdkafka::{config::ClientConfig, producer::FutureProducer};
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &config.brokers)
.set("client.id", &config.client_id)
.set("acks", &config.acks)
.set("message.timeout.ms", config.timeout_ms.to_string());
if let Some(ref compression) = config.compression {
client_config.set("compression.type", compression);
}
let producer: FutureProducer = client_config.create().map_err(|e| {
SubscriptionError::Internal(format!("Failed to create Kafka producer: {e}"))
})?;
tracing::info!(
brokers = %config.brokers,
topic = %config.default_topic,
client_id = %config.client_id,
"KafkaAdapter created with rdkafka producer"
);
Ok(Self { config, producer })
}
fn get_topic(&self, _subscription_name: &str) -> &str {
&self.config.default_topic
}
#[must_use = "the producer reference should be used for Kafka operations"]
pub const fn producer(&self) -> &rdkafka::producer::FutureProducer {
&self.producer
}
}
#[cfg(feature = "kafka")]
#[async_trait]
impl TransportAdapter for KafkaAdapter {
async fn deliver(
&self,
event: &SubscriptionEvent,
subscription_name: &str,
) -> Result<(), SubscriptionError> {
use std::time::Duration;
use rdkafka::producer::FutureRecord;
let message = KafkaMessage::from_event(event, subscription_name);
let topic = self.get_topic(subscription_name);
let payload = serde_json::to_string(&message).map_err(|e| {
SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
})?;
let record = FutureRecord::to(topic).key(message.key()).payload(&payload);
let timeout = Duration::from_millis(self.config.timeout_ms);
match self.producer.send(record, timeout).await {
Ok(delivery) => {
tracing::debug!(
topic = topic,
partition = delivery.partition,
offset = delivery.offset,
key = message.key(),
event_id = %event.event_id,
"Kafka message delivered successfully"
);
Ok(())
},
Err((kafka_error, _)) => {
tracing::error!(
topic = topic,
key = message.key(),
event_id = %event.event_id,
error = %kafka_error,
"Failed to deliver Kafka message"
);
Err(SubscriptionError::DeliveryFailed {
transport: "kafka".to_string(),
reason: kafka_error.to_string(),
})
},
}
}
fn name(&self) -> &'static str {
"kafka"
}
async fn health_check(&self) -> bool {
use std::time::Duration;
use rdkafka::producer::Producer;
match self.producer.client().fetch_metadata(
None, Duration::from_secs(5),
) {
Ok(metadata) => {
tracing::debug!(
broker_count = metadata.brokers().len(),
topic_count = metadata.topics().len(),
"Kafka health check passed"
);
true
},
Err(e) => {
tracing::warn!(
error = %e,
"Kafka health check failed"
);
false
},
}
}
}
#[cfg(not(feature = "kafka"))]
#[derive(Debug)]
pub struct KafkaAdapter {
config: KafkaConfig,
}
#[cfg(not(feature = "kafka"))]
impl KafkaAdapter {
pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
tracing::warn!(
brokers = %config.brokers,
topic = %config.default_topic,
"KafkaAdapter created (STUB - enable 'kafka' feature for real Kafka support)"
);
Ok(Self { config })
}
fn get_topic(&self, _subscription_name: &str) -> &str {
&self.config.default_topic
}
}
#[cfg(not(feature = "kafka"))]
#[async_trait]
impl TransportAdapter for KafkaAdapter {
async fn deliver(
&self,
event: &SubscriptionEvent,
subscription_name: &str,
) -> Result<(), SubscriptionError> {
let message = KafkaMessage::from_event(event, subscription_name);
let topic = self.get_topic(subscription_name);
let _payload = serde_json::to_string(&message).map_err(|e| {
SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
})?;
tracing::info!(
topic = topic,
key = message.key(),
event_id = %event.event_id,
"Kafka delivery (STUB) - enable 'kafka' feature for actual delivery"
);
Ok(())
}
fn name(&self) -> &'static str {
"kafka"
}
async fn health_check(&self) -> bool {
tracing::debug!("Kafka health check (STUB) - always returns true");
true
}
}