Skip to main content

fraiseql_core/runtime/subscription/
kafka.rs

1use async_trait::async_trait;
2use serde::Serialize;
3
4use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
5
6/// Kafka transport adapter configuration.
7#[derive(Debug, Clone)]
8pub struct KafkaConfig {
9    /// Kafka broker addresses (comma-separated).
10    pub brokers: String,
11
12    /// Default topic for events (can be overridden per subscription).
13    pub default_topic: String,
14
15    /// Client ID for Kafka producer.
16    pub client_id: String,
17
18    /// Message acknowledgment mode ("all", "1", "0").
19    pub acks: String,
20
21    /// Message timeout in milliseconds.
22    pub timeout_ms: u64,
23
24    /// Enable message compression.
25    pub compression: Option<String>,
26}
27
28impl KafkaConfig {
29    /// Create a new Kafka configuration.
30    #[must_use]
31    pub fn new(brokers: impl Into<String>, default_topic: impl Into<String>) -> Self {
32        Self {
33            brokers:       brokers.into(),
34            default_topic: default_topic.into(),
35            client_id:     "fraiseql".to_string(),
36            acks:          "all".to_string(),
37            timeout_ms:    30_000,
38            compression:   None,
39        }
40    }
41
42    /// Set the client ID.
43    #[must_use]
44    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
45        self.client_id = client_id.into();
46        self
47    }
48
49    /// Set acknowledgment mode.
50    #[must_use]
51    pub fn with_acks(mut self, acks: impl Into<String>) -> Self {
52        self.acks = acks.into();
53        self
54    }
55
56    /// Set message timeout.
57    #[must_use]
58    pub const fn with_timeout(mut self, timeout_ms: u64) -> Self {
59        self.timeout_ms = timeout_ms;
60        self
61    }
62
63    /// Enable compression (e.g., "gzip", "snappy", "lz4").
64    #[must_use]
65    pub fn with_compression(mut self, compression: impl Into<String>) -> Self {
66        self.compression = Some(compression.into());
67        self
68    }
69}
70
71/// Kafka message format for event delivery.
72#[derive(Debug, Clone, Serialize)]
73pub struct KafkaMessage {
74    /// Unique event identifier.
75    pub event_id: String,
76
77    /// Subscription name.
78    pub subscription_name: String,
79
80    /// Entity type.
81    pub entity_type: String,
82
83    /// Entity primary key (used as message key).
84    pub entity_id: String,
85
86    /// Operation type.
87    pub operation: String,
88
89    /// Event data.
90    pub data: serde_json::Value,
91
92    /// Previous data (for UPDATE operations).
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub old_data: Option<serde_json::Value>,
95
96    /// Event timestamp.
97    pub timestamp: String,
98
99    /// Sequence number.
100    pub sequence_number: u64,
101}
102
103impl KafkaMessage {
104    /// Create a Kafka message from a subscription event.
105    #[must_use]
106    pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
107        Self {
108            event_id:          event.event_id.clone(),
109            subscription_name: subscription_name.to_string(),
110            entity_type:       event.entity_type.clone(),
111            entity_id:         event.entity_id.clone(),
112            operation:         format!("{:?}", event.operation),
113            data:              event.data.clone(),
114            old_data:          event.old_data.clone(),
115            timestamp:         event.timestamp.to_rfc3339(),
116            sequence_number:   event.sequence_number,
117        }
118    }
119
120    /// Get the message key (`entity_id` for partitioning).
121    #[must_use]
122    pub fn key(&self) -> &str {
123        &self.entity_id
124    }
125}
126
127// =============================================================================
128// Kafka Adapter - Full Implementation (with `kafka` feature)
129// =============================================================================
130
131/// Kafka transport adapter for event streaming.
132///
133/// Delivers subscription events to Apache Kafka topics.
134/// Uses the `entity_id` as the message key for consistent partitioning.
135///
136/// # Feature Flag
137///
138/// This adapter has two implementations:
139/// - **With `kafka` feature**: Full rdkafka-based producer with actual Kafka delivery
140/// - **Without `kafka` feature**: Stub that logs events (for development/testing)
141///
142/// # Example
143///
144/// ```ignore
145/// use fraiseql_core::runtime::subscription::{KafkaAdapter, KafkaConfig};
146///
147/// let config = KafkaConfig::new("localhost:9092", "fraiseql-events")
148///     .with_client_id("my-service")
149///     .with_compression("lz4");
150///
151/// let adapter = KafkaAdapter::new(config)?;
152/// adapter.deliver(&event, "orderCreated").await?;
153/// ```
154#[cfg(feature = "kafka")]
155pub struct KafkaAdapter {
156    config:   KafkaConfig,
157    producer: rdkafka::producer::FutureProducer,
158}
159
160#[cfg(feature = "kafka")]
161impl std::fmt::Debug for KafkaAdapter {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        f.debug_struct("KafkaAdapter")
164            .field("brokers", &self.config.brokers)
165            .field("default_topic", &self.config.default_topic)
166            .field("client_id", &self.config.client_id)
167            .finish_non_exhaustive()
168    }
169}
170
171#[cfg(feature = "kafka")]
172impl KafkaAdapter {
173    /// Create a new Kafka adapter with a producer connection.
174    ///
175    /// # Errors
176    ///
177    /// Returns error if the Kafka producer cannot be created (e.g., invalid config).
178    pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
179        use rdkafka::{config::ClientConfig, producer::FutureProducer};
180
181        let mut client_config = ClientConfig::new();
182        client_config
183            .set("bootstrap.servers", &config.brokers)
184            .set("client.id", &config.client_id)
185            .set("acks", &config.acks)
186            .set("message.timeout.ms", config.timeout_ms.to_string());
187
188        if let Some(ref compression) = config.compression {
189            client_config.set("compression.type", compression);
190        }
191
192        let producer: FutureProducer = client_config.create().map_err(|e| {
193            SubscriptionError::Internal(format!("Failed to create Kafka producer: {e}"))
194        })?;
195
196        tracing::info!(
197            brokers = %config.brokers,
198            topic = %config.default_topic,
199            client_id = %config.client_id,
200            "KafkaAdapter created with rdkafka producer"
201        );
202
203        Ok(Self { config, producer })
204    }
205
206    /// Get the topic for a subscription (uses default if not specified).
207    fn get_topic(&self, _subscription_name: &str) -> &str {
208        // Could be extended to support per-subscription topic mapping
209        &self.config.default_topic
210    }
211
212    /// Get reference to the underlying producer for direct Kafka operations.
213    #[must_use = "the producer reference should be used for Kafka operations"]
214    pub const fn producer(&self) -> &rdkafka::producer::FutureProducer {
215        &self.producer
216    }
217}
218
219#[cfg(feature = "kafka")]
220// Reason: TransportAdapter is defined with #[async_trait]; all implementations must match
221// its transformed method signatures to satisfy the trait contract
222// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
223#[async_trait]
224impl TransportAdapter for KafkaAdapter {
225    async fn deliver(
226        &self,
227        event: &SubscriptionEvent,
228        subscription_name: &str,
229    ) -> Result<(), SubscriptionError> {
230        use std::time::Duration;
231
232        use rdkafka::producer::FutureRecord;
233
234        let message = KafkaMessage::from_event(event, subscription_name);
235        let topic = self.get_topic(subscription_name);
236
237        let payload = serde_json::to_string(&message).map_err(|e| {
238            SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
239        })?;
240
241        let record = FutureRecord::to(topic).key(message.key()).payload(&payload);
242
243        let timeout = Duration::from_millis(self.config.timeout_ms);
244
245        match self.producer.send(record, timeout).await {
246            Ok(delivery) => {
247                tracing::debug!(
248                    topic = topic,
249                    partition = delivery.partition,
250                    offset = delivery.offset,
251                    key = message.key(),
252                    event_id = %event.event_id,
253                    "Kafka message delivered successfully"
254                );
255                Ok(())
256            },
257            Err((kafka_error, _)) => {
258                tracing::error!(
259                    topic = topic,
260                    key = message.key(),
261                    event_id = %event.event_id,
262                    error = %kafka_error,
263                    "Failed to deliver Kafka message"
264                );
265                Err(SubscriptionError::DeliveryFailed {
266                    transport: "kafka".to_string(),
267                    reason:    kafka_error.to_string(),
268                })
269            },
270        }
271    }
272
273    fn name(&self) -> &'static str {
274        "kafka"
275    }
276
277    async fn health_check(&self) -> bool {
278        // Check if we can fetch cluster metadata as a health check
279        use std::time::Duration;
280
281        use rdkafka::producer::Producer;
282
283        match self.producer.client().fetch_metadata(
284            None, // All topics
285            Duration::from_secs(5),
286        ) {
287            Ok(metadata) => {
288                tracing::debug!(
289                    broker_count = metadata.brokers().len(),
290                    topic_count = metadata.topics().len(),
291                    "Kafka health check passed"
292                );
293                true
294            },
295            Err(e) => {
296                tracing::warn!(
297                    error = %e,
298                    "Kafka health check failed"
299                );
300                false
301            },
302        }
303    }
304}
305
306// =============================================================================
307// Kafka Adapter - Stub Implementation (without `kafka` feature)
308// =============================================================================
309
310/// Kafka transport adapter stub (without `kafka` feature).
311///
312/// This is a stub implementation for development and testing.
313/// Enable the `kafka` feature for actual Kafka delivery.
314#[cfg(not(feature = "kafka"))]
315#[derive(Debug)]
316pub struct KafkaAdapter {
317    config: KafkaConfig,
318}
319
320#[cfg(not(feature = "kafka"))]
321impl KafkaAdapter {
322    /// Create a new Kafka adapter stub.
323    ///
324    /// # Note
325    ///
326    /// This is a stub implementation. Enable the `kafka` feature for actual delivery.
327    ///
328    /// # Errors
329    ///
330    /// This stub implementation never fails, but returns `Result` for API compatibility.
331    pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
332        tracing::warn!(
333            brokers = %config.brokers,
334            topic = %config.default_topic,
335            "KafkaAdapter created (STUB - enable 'kafka' feature for real Kafka support)"
336        );
337        Ok(Self { config })
338    }
339
340    /// Get the topic for a subscription (uses default if not specified).
341    fn get_topic(&self, _subscription_name: &str) -> &str {
342        &self.config.default_topic
343    }
344}
345
346#[cfg(not(feature = "kafka"))]
347// Reason: TransportAdapter is defined with #[async_trait]; all implementations must match
348// its transformed method signatures to satisfy the trait contract
349// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
350#[async_trait]
351impl TransportAdapter for KafkaAdapter {
352    async fn deliver(
353        &self,
354        event: &SubscriptionEvent,
355        subscription_name: &str,
356    ) -> Result<(), SubscriptionError> {
357        let message = KafkaMessage::from_event(event, subscription_name);
358        let topic = self.get_topic(subscription_name);
359
360        let _payload = serde_json::to_string(&message).map_err(|e| {
361            SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
362        })?;
363
364        // Stub implementation - log the event
365        tracing::info!(
366            topic = topic,
367            key = message.key(),
368            event_id = %event.event_id,
369            "Kafka delivery (STUB) - enable 'kafka' feature for actual delivery"
370        );
371
372        Ok(())
373    }
374
375    fn name(&self) -> &'static str {
376        "kafka"
377    }
378
379    async fn health_check(&self) -> bool {
380        // Stub always returns true
381        tracing::debug!("Kafka health check (STUB) - always returns true");
382        true
383    }
384}