fraiseql_core/runtime/subscription/
kafka.rs1use async_trait::async_trait;
2use serde::Serialize;
3
4use super::{SubscriptionError, transport::TransportAdapter, types::SubscriptionEvent};
5
6#[derive(Debug, Clone)]
8pub struct KafkaConfig {
9 pub brokers: String,
11
12 pub default_topic: String,
14
15 pub client_id: String,
17
18 pub acks: String,
20
21 pub timeout_ms: u64,
23
24 pub compression: Option<String>,
26}
27
28impl KafkaConfig {
29 #[must_use]
31 pub fn new(brokers: impl Into<String>, default_topic: impl Into<String>) -> Self {
32 Self {
33 brokers: brokers.into(),
34 default_topic: default_topic.into(),
35 client_id: "fraiseql".to_string(),
36 acks: "all".to_string(),
37 timeout_ms: 30_000,
38 compression: None,
39 }
40 }
41
42 #[must_use]
44 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
45 self.client_id = client_id.into();
46 self
47 }
48
49 #[must_use]
51 pub fn with_acks(mut self, acks: impl Into<String>) -> Self {
52 self.acks = acks.into();
53 self
54 }
55
56 #[must_use]
58 pub const fn with_timeout(mut self, timeout_ms: u64) -> Self {
59 self.timeout_ms = timeout_ms;
60 self
61 }
62
63 #[must_use]
65 pub fn with_compression(mut self, compression: impl Into<String>) -> Self {
66 self.compression = Some(compression.into());
67 self
68 }
69}
70
71#[derive(Debug, Clone, Serialize)]
73pub struct KafkaMessage {
74 pub event_id: String,
76
77 pub subscription_name: String,
79
80 pub entity_type: String,
82
83 pub entity_id: String,
85
86 pub operation: String,
88
89 pub data: serde_json::Value,
91
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub old_data: Option<serde_json::Value>,
95
96 pub timestamp: String,
98
99 pub sequence_number: u64,
101}
102
103impl KafkaMessage {
104 #[must_use]
106 pub fn from_event(event: &SubscriptionEvent, subscription_name: &str) -> Self {
107 Self {
108 event_id: event.event_id.clone(),
109 subscription_name: subscription_name.to_string(),
110 entity_type: event.entity_type.clone(),
111 entity_id: event.entity_id.clone(),
112 operation: format!("{:?}", event.operation),
113 data: event.data.clone(),
114 old_data: event.old_data.clone(),
115 timestamp: event.timestamp.to_rfc3339(),
116 sequence_number: event.sequence_number,
117 }
118 }
119
120 #[must_use]
122 pub fn key(&self) -> &str {
123 &self.entity_id
124 }
125}
126
127#[cfg(feature = "kafka")]
155pub struct KafkaAdapter {
156 config: KafkaConfig,
157 producer: rdkafka::producer::FutureProducer,
158}
159
160#[cfg(feature = "kafka")]
161impl std::fmt::Debug for KafkaAdapter {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("KafkaAdapter")
164 .field("brokers", &self.config.brokers)
165 .field("default_topic", &self.config.default_topic)
166 .field("client_id", &self.config.client_id)
167 .finish_non_exhaustive()
168 }
169}
170
171#[cfg(feature = "kafka")]
172impl KafkaAdapter {
173 pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
179 use rdkafka::{config::ClientConfig, producer::FutureProducer};
180
181 let mut client_config = ClientConfig::new();
182 client_config
183 .set("bootstrap.servers", &config.brokers)
184 .set("client.id", &config.client_id)
185 .set("acks", &config.acks)
186 .set("message.timeout.ms", config.timeout_ms.to_string());
187
188 if let Some(ref compression) = config.compression {
189 client_config.set("compression.type", compression);
190 }
191
192 let producer: FutureProducer = client_config.create().map_err(|e| {
193 SubscriptionError::Internal(format!("Failed to create Kafka producer: {e}"))
194 })?;
195
196 tracing::info!(
197 brokers = %config.brokers,
198 topic = %config.default_topic,
199 client_id = %config.client_id,
200 "KafkaAdapter created with rdkafka producer"
201 );
202
203 Ok(Self { config, producer })
204 }
205
206 fn get_topic(&self, _subscription_name: &str) -> &str {
208 &self.config.default_topic
210 }
211
212 #[must_use = "the producer reference should be used for Kafka operations"]
214 pub const fn producer(&self) -> &rdkafka::producer::FutureProducer {
215 &self.producer
216 }
217}
218
219#[cfg(feature = "kafka")]
220#[async_trait]
224impl TransportAdapter for KafkaAdapter {
225 async fn deliver(
226 &self,
227 event: &SubscriptionEvent,
228 subscription_name: &str,
229 ) -> Result<(), SubscriptionError> {
230 use std::time::Duration;
231
232 use rdkafka::producer::FutureRecord;
233
234 let message = KafkaMessage::from_event(event, subscription_name);
235 let topic = self.get_topic(subscription_name);
236
237 let payload = serde_json::to_string(&message).map_err(|e| {
238 SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
239 })?;
240
241 let record = FutureRecord::to(topic).key(message.key()).payload(&payload);
242
243 let timeout = Duration::from_millis(self.config.timeout_ms);
244
245 match self.producer.send(record, timeout).await {
246 Ok(delivery) => {
247 tracing::debug!(
248 topic = topic,
249 partition = delivery.partition,
250 offset = delivery.offset,
251 key = message.key(),
252 event_id = %event.event_id,
253 "Kafka message delivered successfully"
254 );
255 Ok(())
256 },
257 Err((kafka_error, _)) => {
258 tracing::error!(
259 topic = topic,
260 key = message.key(),
261 event_id = %event.event_id,
262 error = %kafka_error,
263 "Failed to deliver Kafka message"
264 );
265 Err(SubscriptionError::DeliveryFailed {
266 transport: "kafka".to_string(),
267 reason: kafka_error.to_string(),
268 })
269 },
270 }
271 }
272
273 fn name(&self) -> &'static str {
274 "kafka"
275 }
276
277 async fn health_check(&self) -> bool {
278 use std::time::Duration;
280
281 use rdkafka::producer::Producer;
282
283 match self.producer.client().fetch_metadata(
284 None, Duration::from_secs(5),
286 ) {
287 Ok(metadata) => {
288 tracing::debug!(
289 broker_count = metadata.brokers().len(),
290 topic_count = metadata.topics().len(),
291 "Kafka health check passed"
292 );
293 true
294 },
295 Err(e) => {
296 tracing::warn!(
297 error = %e,
298 "Kafka health check failed"
299 );
300 false
301 },
302 }
303 }
304}
305
306#[cfg(not(feature = "kafka"))]
315#[derive(Debug)]
316pub struct KafkaAdapter {
317 config: KafkaConfig,
318}
319
320#[cfg(not(feature = "kafka"))]
321impl KafkaAdapter {
322 pub fn new(config: KafkaConfig) -> Result<Self, SubscriptionError> {
332 tracing::warn!(
333 brokers = %config.brokers,
334 topic = %config.default_topic,
335 "KafkaAdapter created (STUB - enable 'kafka' feature for real Kafka support)"
336 );
337 Ok(Self { config })
338 }
339
340 fn get_topic(&self, _subscription_name: &str) -> &str {
342 &self.config.default_topic
343 }
344}
345
346#[cfg(not(feature = "kafka"))]
347#[async_trait]
351impl TransportAdapter for KafkaAdapter {
352 async fn deliver(
353 &self,
354 event: &SubscriptionEvent,
355 subscription_name: &str,
356 ) -> Result<(), SubscriptionError> {
357 let message = KafkaMessage::from_event(event, subscription_name);
358 let topic = self.get_topic(subscription_name);
359
360 let _payload = serde_json::to_string(&message).map_err(|e| {
361 SubscriptionError::Internal(format!("Failed to serialize message: {e}"))
362 })?;
363
364 tracing::info!(
366 topic = topic,
367 key = message.key(),
368 event_id = %event.event_id,
369 "Kafka delivery (STUB) - enable 'kafka' feature for actual delivery"
370 );
371
372 Ok(())
373 }
374
375 fn name(&self) -> &'static str {
376 "kafka"
377 }
378
379 async fn health_check(&self) -> bool {
380 tracing::debug!("Kafka health check (STUB) - always returns true");
382 true
383 }
384}