Skip to main content

fraiseql_core/runtime/subscription/
kafka.rs

1use serde::Serialize;
2
3use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
4
5/// Kafka transport adapter configuration.
6#[derive(Debug, Clone)]
7pub struct KafkaConfig {
8    /// Kafka broker addresses (comma-separated).
9    pub brokers: String,
10
11    /// Default topic for events (can be overridden per subscription).
12    pub default_topic: String,
13
14    /// Client ID for Kafka producer.
15    pub client_id: String,
16
17    /// Message acknowledgment mode ("all", "1", "0").
18    pub acks: String,
19
20    /// Message timeout in milliseconds.
21    pub timeout_ms: u64,
22
23    /// Enable message compression.
24    pub compression: Option<String>,
25}
26
27impl KafkaConfig {
28    /// Create a new Kafka configuration.
29    #[must_use]
30    pub fn new(brokers: impl Into<String>, default_topic: impl Into<String>) -> Self {
31        Self {
32            brokers:       brokers.into(),
33            default_topic: default_topic.into(),
34            client_id:     "fraiseql".to_string(),
35            acks:          "all".to_string(),
36            timeout_ms:    30_000,
37            compression:   None,
38        }
39    }
40
41    /// Set the client ID.
42    #[must_use]
43    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
44        self.client_id = client_id.into();
45        self
46    }
47
48    /// Set acknowledgment mode.
49    #[must_use]
50    pub fn with_acks(mut self, acks: impl Into<String>) -> Self {
51        self.acks = acks.into();
52        self
53    }
54
55    /// Set message timeout.
56    #[must_use]
57    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
58        self.timeout_ms = timeout_ms;
59        self
60    }
61
62    /// Enable compression (e.g., "gzip", "snappy", "lz4").
63    #[must_use]
64    pub fn with_compression(mut self, compression: impl Into<String>) -> Self {
65        self.compression = Some(compression.into());
66        self
67    }
68}
69
70/// Kafka message format for event delivery.
71#[derive(Debug, Clone, Serialize)]
72pub struct KafkaMessage {
73    /// Unique event identifier.
74    pub event_id: String,
75
76    /// Subscription name.
77    pub subscription_name: String,
78
79    /// Entity type.
80    pub entity_type: String,
81
82    /// Entity primary key (used as message key).
83    pub entity_id: String,
84
85    /// Operation type.
86    pub operation: String,
87
88    /// Event data.
89    pub data: serde_json::Value,
90
91    /// Previous data (for UPDATE operations).
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub old_data: Option<serde_json::Value>,
94
95    /// Event timestamp.
96    pub timestamp: String,
97
98    /// Sequence number.
99    pub sequence_number: u64,
100}
101
102impl KafkaMessage {
103    /// Create a Kafka message from a subscription event.
104    #[must_use]
105    pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
106        Self {
107            event_id:          event.event_id.clone(),
108            subscription_name: subscription_name.to_string(),
109            entity_type:       event.entity_type.clone(),
110            entity_id:         event.entity_id.clone(),
111            operation:         format!("{:?}", event.operation),
112            data:              event.data.clone(),
113            old_data:          event.old_data.clone(),
114            timestamp:         event.timestamp.to_rfc3339(),
115            sequence_number:   event.sequence_number,
116        }
117    }
118
119    /// Get the message key (entity_id for partitioning).
120    #[must_use]
121    pub fn key(&self) -> &str {
122        &self.entity_id
123    }
124}
125
126// =============================================================================
127// Kafka Adapter - Full Implementation (with `kafka` feature)
128// =============================================================================
129
130/// Kafka transport adapter for event streaming.
131///
132/// Delivers subscription events to Apache Kafka topics.
133/// Uses the entity_id as the message key for consistent partitioning.
134///
135/// # Feature Flag
136///
137/// This adapter has two implementations:
138/// - **With `kafka` feature**: Full rdkafka-based producer with actual Kafka delivery
139/// - **Without `kafka` feature**: Stub that logs events (for development/testing)
140///
141/// # Example
142///
143/// ```ignore
144/// use fraiseql_core::runtime::subscription::{KafkaAdapter, KafkaConfig};
145///
146/// let config = KafkaConfig::new("localhost:9092", "fraiseql-events")
147///     .with_client_id("my-service")
148///     .with_compression("lz4");
149///
150/// let adapter = KafkaAdapter::new(config)?;
151/// adapter.deliver(&event, "orderCreated").await?;
152/// ```
153#[cfg(feature = "kafka")]
154pub struct KafkaAdapter {
155    config:   KafkaConfig,
156    producer: rdkafka::producer::FutureProducer,
157}
158
159#[cfg(feature = "kafka")]
160impl std::fmt::Debug for KafkaAdapter {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("KafkaAdapter")
163            .field("brokers", &self.config.brokers)
164            .field("default_topic", &self.config.default_topic)
165            .field("client_id", &self.config.client_id)
166            .finish_non_exhaustive()
167    }
168}
169
170#[cfg(feature = "kafka")]
171impl KafkaAdapter {
172    /// Create a new Kafka adapter with a producer connection.
173    ///
174    /// # Errors
175    ///
176    /// Returns error if the Kafka producer cannot be created (e.g., invalid config).
177    pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
178        use rdkafka::{config::ClientConfig, producer::FutureProducer};
179
180        let mut client_config = ClientConfig::new();
181        client_config
182            .set("bootstrap.servers", &config.brokers)
183            .set("client.id", &config.client_id)
184            .set("acks", &config.acks)
185            .set("message.timeout.ms", config.timeout_ms.to_string());
186
187        if let Some(ref compression) = config.compression {
188            client_config.set("compression.type", compression);
189        }
190
191        let producer: FutureProducer = client_config.create().map_err(|e| {
192            SubscriptionError::Internal(format!("Failed to create Kafka producer: {e}"))
193        })?;
194
195        tracing::info!(
196            brokers = %config.brokers,
197            topic = %config.default_topic,
198            client_id = %config.client_id,
199            "KafkaAdapter created with rdkafka producer"
200        );
201
202        Ok(Self { config, producer })
203    }
204
205    /// Get the topic for a subscription (uses default if not specified).
206    fn get_topic(&self, _subscription_name: &str) -> &str {
207        // Could be extended to support per-subscription topic mapping
208        &self.config.default_topic
209    }
210
211    /// Get reference to the underlying producer for direct Kafka operations.
212    #[must_use = "the producer reference should be used for Kafka operations"]
213    pub fn producer(&self) -> &rdkafka::producer::FutureProducer {
214        &self.producer
215    }
216}
217
218#[cfg(feature = "kafka")]
219#[async_trait::async_trait]
220impl TransportAdapter for KafkaAdapter {
221    async fn deliver(
222        &self,
223        event: &SubscriptionEvent,
224        subscription_name: &str,
225    ) -> Result<(), SubscriptionError> {
226        use std::time::Duration;
227
228        use rdkafka::producer::FutureRecord;
229
230        let message = KafkaMessage::from_event(event, subscription_name);
231        let topic = self.get_topic(subscription_name);
232
233        let payload = serde_json::to_string(&message).map_err(|e| {
234            SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
235        })?;
236
237        let record = FutureRecord::to(topic).key(message.key()).payload(&payload);
238
239        let timeout = Duration::from_millis(self.config.timeout_ms);
240
241        match self.producer.send(record, timeout).await {
242            Ok(delivery) => {
243                tracing::debug!(
244                    topic = topic,
245                    partition = delivery.partition,
246                    offset = delivery.offset,
247                    key = message.key(),
248                    event_id = %event.event_id,
249                    "Kafka message delivered successfully"
250                );
251                Ok(())
252            },
253            Err((kafka_error, _)) => {
254                tracing::error!(
255                    topic = topic,
256                    key = message.key(),
257                    event_id = %event.event_id,
258                    error = %kafka_error,
259                    "Failed to deliver Kafka message"
260                );
261                Err(SubscriptionError::DeliveryFailed {
262                    transport: "kafka".to_string(),
263                    reason:    kafka_error.to_string(),
264                })
265            },
266        }
267    }
268
269    fn name(&self) -> &'static str {
270        "kafka"
271    }
272
273    async fn health_check(&self) -> bool {
274        // Check if we can fetch cluster metadata as a health check
275        use std::time::Duration;
276
277        use rdkafka::producer::Producer;
278
279        match self.producer.client().fetch_metadata(
280            None, // All topics
281            Duration::from_secs(5),
282        ) {
283            Ok(metadata) => {
284                tracing::debug!(
285                    broker_count = metadata.brokers().len(),
286                    topic_count = metadata.topics().len(),
287                    "Kafka health check passed"
288                );
289                true
290            },
291            Err(e) => {
292                tracing::warn!(
293                    error = %e,
294                    "Kafka health check failed"
295                );
296                false
297            },
298        }
299    }
300}
301
302// =============================================================================
303// Kafka Adapter - Stub Implementation (without `kafka` feature)
304// =============================================================================
305
306/// Kafka transport adapter stub (without `kafka` feature).
307///
308/// This is a stub implementation for development and testing.
309/// Enable the `kafka` feature for actual Kafka delivery.
310#[cfg(not(feature = "kafka"))]
311#[derive(Debug)]
312pub struct KafkaAdapter {
313    config: KafkaConfig,
314}
315
316#[cfg(not(feature = "kafka"))]
317impl KafkaAdapter {
318    /// Create a new Kafka adapter stub.
319    ///
320    /// # Note
321    ///
322    /// This is a stub implementation. Enable the `kafka` feature for actual delivery.
323    ///
324    /// # Errors
325    ///
326    /// This stub implementation never fails, but returns `Result` for API compatibility.
327    pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
328        tracing::warn!(
329            brokers = %config.brokers,
330            topic = %config.default_topic,
331            "KafkaAdapter created (STUB - enable 'kafka' feature for real Kafka support)"
332        );
333        Ok(Self { config })
334    }
335
336    /// Get the topic for a subscription (uses default if not specified).
337    fn get_topic(&self, _subscription_name: &str) -> &str {
338        &self.config.default_topic
339    }
340}
341
342#[cfg(not(feature = "kafka"))]
343#[async_trait::async_trait]
344impl TransportAdapter for KafkaAdapter {
345    async fn deliver(
346        &self,
347        event: &SubscriptionEvent,
348        subscription_name: &str,
349    ) -> Result<(), SubscriptionError> {
350        let message = KafkaMessage::from_event(event, subscription_name);
351        let topic = self.get_topic(subscription_name);
352
353        let _payload = serde_json::to_string(&message).map_err(|e| {
354            SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
355        })?;
356
357        // Stub implementation - log the event
358        tracing::info!(
359            topic = topic,
360            key = message.key(),
361            event_id = %event.event_id,
362            "Kafka delivery (STUB) - enable 'kafka' feature for actual delivery"
363        );
364
365        Ok(())
366    }
367
368    fn name(&self) -> &'static str {
369        "kafka"
370    }
371
372    async fn health_check(&self) -> bool {
373        // Stub always returns true
374        tracing::debug!("Kafka health check (STUB) - always returns true");
375        true
376    }
377}