fraiseql_core/runtime/subscription/
kafka.rs1use serde::Serialize;
2
3use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
4
5#[derive(Debug, Clone)]
7pub struct KafkaConfig {
8 pub brokers: String,
10
11 pub default_topic: String,
13
14 pub client_id: String,
16
17 pub acks: String,
19
20 pub timeout_ms: u64,
22
23 pub compression: Option<String>,
25}
26
27impl KafkaConfig {
28 #[must_use]
30 pub fn new(brokers: impl Into<String>, default_topic: impl Into<String>) -> Self {
31 Self {
32 brokers: brokers.into(),
33 default_topic: default_topic.into(),
34 client_id: "fraiseql".to_string(),
35 acks: "all".to_string(),
36 timeout_ms: 30_000,
37 compression: None,
38 }
39 }
40
41 #[must_use]
43 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
44 self.client_id = client_id.into();
45 self
46 }
47
48 #[must_use]
50 pub fn with_acks(mut self, acks: impl Into<String>) -> Self {
51 self.acks = acks.into();
52 self
53 }
54
55 #[must_use]
57 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
58 self.timeout_ms = timeout_ms;
59 self
60 }
61
62 #[must_use]
64 pub fn with_compression(mut self, compression: impl Into<String>) -> Self {
65 self.compression = Some(compression.into());
66 self
67 }
68}
69
70#[derive(Debug, Clone, Serialize)]
72pub struct KafkaMessage {
73 pub event_id: String,
75
76 pub subscription_name: String,
78
79 pub entity_type: String,
81
82 pub entity_id: String,
84
85 pub operation: String,
87
88 pub data: serde_json::Value,
90
91 #[serde(skip_serializing_if = "Option::is_none")]
93 pub old_data: Option<serde_json::Value>,
94
95 pub timestamp: String,
97
98 pub sequence_number: u64,
100}
101
102impl KafkaMessage {
103 #[must_use]
105 pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
106 Self {
107 event_id: event.event_id.clone(),
108 subscription_name: subscription_name.to_string(),
109 entity_type: event.entity_type.clone(),
110 entity_id: event.entity_id.clone(),
111 operation: format!("{:?}", event.operation),
112 data: event.data.clone(),
113 old_data: event.old_data.clone(),
114 timestamp: event.timestamp.to_rfc3339(),
115 sequence_number: event.sequence_number,
116 }
117 }
118
119 #[must_use]
121 pub fn key(&self) -> &str {
122 &self.entity_id
123 }
124}
125
126#[cfg(feature = "kafka")]
154pub struct KafkaAdapter {
155 config: KafkaConfig,
156 producer: rdkafka::producer::FutureProducer,
157}
158
159#[cfg(feature = "kafka")]
160impl std::fmt::Debug for KafkaAdapter {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("KafkaAdapter")
163 .field("brokers", &self.config.brokers)
164 .field("default_topic", &self.config.default_topic)
165 .field("client_id", &self.config.client_id)
166 .finish_non_exhaustive()
167 }
168}
169
170#[cfg(feature = "kafka")]
171impl KafkaAdapter {
172 pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
178 use rdkafka::{config::ClientConfig, producer::FutureProducer};
179
180 let mut client_config = ClientConfig::new();
181 client_config
182 .set("bootstrap.servers", &config.brokers)
183 .set("client.id", &config.client_id)
184 .set("acks", &config.acks)
185 .set("message.timeout.ms", config.timeout_ms.to_string());
186
187 if let Some(ref compression) = config.compression {
188 client_config.set("compression.type", compression);
189 }
190
191 let producer: FutureProducer = client_config.create().map_err(|e| {
192 SubscriptionError::Internal(format!("Failed to create Kafka producer: {e}"))
193 })?;
194
195 tracing::info!(
196 brokers = %config.brokers,
197 topic = %config.default_topic,
198 client_id = %config.client_id,
199 "KafkaAdapter created with rdkafka producer"
200 );
201
202 Ok(Self { config, producer })
203 }
204
205 fn get_topic(&self, _subscription_name: &str) -> &str {
207 &self.config.default_topic
209 }
210
211 #[must_use = "the producer reference should be used for Kafka operations"]
213 pub fn producer(&self) -> &rdkafka::producer::FutureProducer {
214 &self.producer
215 }
216}
217
218#[cfg(feature = "kafka")]
219#[async_trait::async_trait]
220impl TransportAdapter for KafkaAdapter {
221 async fn deliver(
222 &self,
223 event: &SubscriptionEvent,
224 subscription_name: &str,
225 ) -> Result<(), SubscriptionError> {
226 use std::time::Duration;
227
228 use rdkafka::producer::FutureRecord;
229
230 let message = KafkaMessage::from_event(event, subscription_name);
231 let topic = self.get_topic(subscription_name);
232
233 let payload = serde_json::to_string(&message).map_err(|e| {
234 SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
235 })?;
236
237 let record = FutureRecord::to(topic).key(message.key()).payload(&payload);
238
239 let timeout = Duration::from_millis(self.config.timeout_ms);
240
241 match self.producer.send(record, timeout).await {
242 Ok(delivery) => {
243 tracing::debug!(
244 topic = topic,
245 partition = delivery.partition,
246 offset = delivery.offset,
247 key = message.key(),
248 event_id = %event.event_id,
249 "Kafka message delivered successfully"
250 );
251 Ok(())
252 },
253 Err((kafka_error, _)) => {
254 tracing::error!(
255 topic = topic,
256 key = message.key(),
257 event_id = %event.event_id,
258 error = %kafka_error,
259 "Failed to deliver Kafka message"
260 );
261 Err(SubscriptionError::DeliveryFailed {
262 transport: "kafka".to_string(),
263 reason: kafka_error.to_string(),
264 })
265 },
266 }
267 }
268
269 fn name(&self) -> &'static str {
270 "kafka"
271 }
272
273 async fn health_check(&self) -> bool {
274 use std::time::Duration;
276
277 use rdkafka::producer::Producer;
278
279 match self.producer.client().fetch_metadata(
280 None, Duration::from_secs(5),
282 ) {
283 Ok(metadata) => {
284 tracing::debug!(
285 broker_count = metadata.brokers().len(),
286 topic_count = metadata.topics().len(),
287 "Kafka health check passed"
288 );
289 true
290 },
291 Err(e) => {
292 tracing::warn!(
293 error = %e,
294 "Kafka health check failed"
295 );
296 false
297 },
298 }
299 }
300}
301
302#[cfg(not(feature = "kafka"))]
311#[derive(Debug)]
312pub struct KafkaAdapter {
313 config: KafkaConfig,
314}
315
316#[cfg(not(feature = "kafka"))]
317impl KafkaAdapter {
318 pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
328 tracing::warn!(
329 brokers = %config.brokers,
330 topic = %config.default_topic,
331 "KafkaAdapter created (STUB - enable 'kafka' feature for real Kafka support)"
332 );
333 Ok(Self { config })
334 }
335
336 fn get_topic(&self, _subscription_name: &str) -> &str {
338 &self.config.default_topic
339 }
340}
341
342#[cfg(not(feature = "kafka"))]
343#[async_trait::async_trait]
344impl TransportAdapter for KafkaAdapter {
345 async fn deliver(
346 &self,
347 event: &SubscriptionEvent,
348 subscription_name: &str,
349 ) -> Result<(), SubscriptionError> {
350 let message = KafkaMessage::from_event(event, subscription_name);
351 let topic = self.get_topic(subscription_name);
352
353 let _payload = serde_json::to_string(&message).map_err(|e| {
354 SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
355 })?;
356
357 tracing::info!(
359 topic = topic,
360 key = message.key(),
361 event_id = %event.event_id,
362 "Kafka delivery (STUB) - enable 'kafka' feature for actual delivery"
363 );
364
365 Ok(())
366 }
367
368 fn name(&self) -> &'static str {
369 "kafka"
370 }
371
372 async fn health_check(&self) -> bool {
373 tracing::debug!("Kafka health check (STUB) - always returns true");
375 true
376 }
377}