1use std::sync::Arc;
7
8use indexmap::IndexMap;
9use rustc_hash::{FxHashMap, FxHashSet};
10use tracing::{debug, warn};
11use varpulis_core::ast::ConnectorParam;
12
13use crate::connector;
14#[cfg(feature = "kafka")]
15use crate::connector::SinkConnector;
16
17pub fn connector_params_to_config(
19 connector_type: &str,
20 params: &[ConnectorParam],
21) -> connector::ConnectorConfig {
22 let mut url = String::new();
23 let mut topic = None;
24 let mut properties = IndexMap::new();
25
26 for param in params {
27 let value_str = match ¶m.value {
28 varpulis_core::ast::ConfigValue::Str(s) => s.clone(),
29 varpulis_core::ast::ConfigValue::Ident(s) => s.clone(),
30 varpulis_core::ast::ConfigValue::Int(i) => i.to_string(),
31 varpulis_core::ast::ConfigValue::Float(f) => f.to_string(),
32 varpulis_core::ast::ConfigValue::Bool(b) => b.to_string(),
33 varpulis_core::ast::ConfigValue::Duration(d) => format!("{d}ns"),
34 varpulis_core::ast::ConfigValue::Array(_) => continue,
35 varpulis_core::ast::ConfigValue::Map(_) => continue,
36 };
37 match param.name.as_str() {
38 "url" | "host" | "brokers" | "servers" => url = value_str,
39 "topic" => topic = Some(value_str),
40 other => {
41 properties.insert(other.to_string(), value_str);
42 }
43 }
44 }
45
46 let mut config = connector::ConnectorConfig::new(connector_type, &url);
47 if let Some(t) = topic {
48 config = config.with_topic(&t);
49 }
50 config.properties = properties;
51 config
52}
53
54pub struct SinkConnectorAdapter {
56 name: String,
57 inner: tokio::sync::Mutex<Box<dyn connector::SinkConnector>>,
58}
59
60impl std::fmt::Debug for SinkConnectorAdapter {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("SinkConnectorAdapter")
63 .field("name", &self.name)
64 .finish_non_exhaustive()
65 }
66}
67
68impl SinkConnectorAdapter {
69 pub fn new(name: &str, connector: Box<dyn connector::SinkConnector>) -> Self {
71 Self {
72 name: name.to_string(),
73 inner: tokio::sync::Mutex::new(connector),
74 }
75 }
76}
77
78#[async_trait::async_trait]
79impl crate::sink::Sink for SinkConnectorAdapter {
80 fn name(&self) -> &str {
81 &self.name
82 }
83 async fn connect(&self) -> Result<(), crate::sink::SinkError> {
84 let mut inner = self.inner.lock().await;
85 inner.connect().await.map_err(crate::sink::SinkError::from)
86 }
87 async fn send(&self, event: &crate::event::Event) -> Result<(), crate::sink::SinkError> {
88 let inner = self.inner.lock().await;
89 inner
90 .send(event)
91 .await
92 .map_err(crate::sink::SinkError::from)
93 }
94 async fn send_batch(
95 &self,
96 events: &[std::sync::Arc<crate::event::Event>],
97 ) -> Result<(), crate::sink::SinkError> {
98 let inner = self.inner.lock().await;
100 for event in events {
101 inner
102 .send(event)
103 .await
104 .map_err(crate::sink::SinkError::from)?;
105 }
106 Ok(())
107 }
108 async fn flush(&self) -> Result<(), crate::sink::SinkError> {
109 let inner = self.inner.lock().await;
110 inner.flush().await.map_err(crate::sink::SinkError::from)
111 }
112 async fn close(&self) -> Result<(), crate::sink::SinkError> {
113 let inner = self.inner.lock().await;
114 inner.close().await.map_err(crate::sink::SinkError::from)
115 }
116}
117
118#[cfg(feature = "kafka")]
125pub struct TransactionalKafkaSinkAdapter {
126 name: String,
127 inner: tokio::sync::Mutex<connector::KafkaSinkFull>,
128}
129
130#[cfg(feature = "kafka")]
131impl TransactionalKafkaSinkAdapter {
132 pub fn new(name: &str, sink: connector::KafkaSinkFull) -> Self {
133 Self {
134 name: name.to_string(),
135 inner: tokio::sync::Mutex::new(sink),
136 }
137 }
138}
139
140#[cfg(feature = "kafka")]
141#[async_trait::async_trait]
142impl crate::sink::Sink for TransactionalKafkaSinkAdapter {
143 fn name(&self) -> &str {
144 &self.name
145 }
146 async fn connect(&self) -> Result<(), crate::sink::SinkError> {
147 Ok(()) }
149 async fn send(&self, event: &crate::event::Event) -> Result<(), crate::sink::SinkError> {
150 let inner = self.inner.lock().await;
151 inner
152 .send(event)
153 .await
154 .map_err(crate::sink::SinkError::from)
155 }
156 async fn send_batch(
157 &self,
158 events: &[std::sync::Arc<crate::event::Event>],
159 ) -> Result<(), crate::sink::SinkError> {
160 let inner = self.inner.lock().await;
161 inner
162 .send_batch_transactional(events)
163 .await
164 .map_err(crate::sink::SinkError::from)
165 }
166 async fn flush(&self) -> Result<(), crate::sink::SinkError> {
167 let inner = self.inner.lock().await;
168 inner.flush().await.map_err(crate::sink::SinkError::from)
169 }
170 async fn close(&self) -> Result<(), crate::sink::SinkError> {
171 let inner = self.inner.lock().await;
172 inner.close().await.map_err(crate::sink::SinkError::from)
173 }
174}
175
176#[cfg(feature = "kafka")]
183pub struct BatchKafkaSinkAdapter {
184 name: String,
185 inner: tokio::sync::Mutex<connector::KafkaSinkFull>,
186}
187
188#[cfg(feature = "kafka")]
189impl BatchKafkaSinkAdapter {
190 pub fn new(name: &str, sink: connector::KafkaSinkFull) -> Self {
191 Self {
192 name: name.to_string(),
193 inner: tokio::sync::Mutex::new(sink),
194 }
195 }
196}
197
198#[cfg(feature = "kafka")]
199#[async_trait::async_trait]
200impl crate::sink::Sink for BatchKafkaSinkAdapter {
201 fn name(&self) -> &str {
202 &self.name
203 }
204 async fn connect(&self) -> Result<(), crate::sink::SinkError> {
205 Ok(()) }
207 async fn send(&self, event: &crate::event::Event) -> Result<(), crate::sink::SinkError> {
208 let inner = self.inner.lock().await;
209 inner
210 .send(event)
211 .await
212 .map_err(crate::sink::SinkError::from)
213 }
214 async fn send_batch(
215 &self,
216 events: &[std::sync::Arc<crate::event::Event>],
217 ) -> Result<(), crate::sink::SinkError> {
218 let inner = self.inner.lock().await;
219 inner
220 .send_batch(events)
221 .await
222 .map_err(crate::sink::SinkError::from)
223 }
224 async fn flush(&self) -> Result<(), crate::sink::SinkError> {
225 let inner = self.inner.lock().await;
226 inner.flush().await.map_err(crate::sink::SinkError::from)
227 }
228 async fn close(&self) -> Result<(), crate::sink::SinkError> {
229 let inner = self.inner.lock().await;
230 inner.close().await.map_err(crate::sink::SinkError::from)
231 }
232}
233
234#[allow(unused_variables)]
239pub fn create_sink_from_config(
240 name: &str,
241 config: &connector::ConnectorConfig,
242 topic_override: Option<&str>,
243 context_name: Option<&str>,
244 topic_prefix: Option<&str>,
245) -> Option<Arc<dyn crate::sink::Sink>> {
246 if let Some(factory) = connector::component::find_factory(&config.connector_type) {
248 if factory.info().supports_sink {
249 match factory.create_engine_sink(name, config, topic_override, context_name) {
250 Ok(sink) => return Some(sink),
251 Err(connector::ConnectorError::NotAvailable(_)) => {} Err(e) => {
253 warn!("Factory error creating sink '{}': {}", name, e);
254 return None;
255 }
256 }
257 }
258 }
259
260 match config.connector_type.as_str() {
262 "console" => Some(Arc::new(crate::sink::ConsoleSink::new(name))),
263 "file" => {
264 let path = if config.url.is_empty() {
265 config
266 .properties
267 .get("path")
268 .cloned()
269 .unwrap_or_else(|| format!("{name}.jsonl"))
270 } else {
271 config.url.clone()
272 };
273 match crate::sink::FileSink::new(name, &path) {
274 Ok(sink) => Some(Arc::new(sink)),
275 Err(e) => {
276 warn!("Failed to create file sink '{}': {}", name, e);
277 None
278 }
279 }
280 }
281 "http" => {
282 let url = config.url.clone();
283 if url.is_empty() {
284 warn!("HTTP connector '{}' has no URL configured", name);
285 None
286 } else {
287 Some(Arc::new(crate::sink::HttpSink::new(name, &url)))
288 }
289 }
290 "kafka" => {
291 #[cfg(feature = "kafka")]
292 {
293 let brokers = config
294 .properties
295 .get("brokers")
296 .cloned()
297 .unwrap_or_else(|| config.url.clone());
298 let base_topic = topic_override
299 .map(|s| s.to_string())
300 .or_else(|| config.topic.clone())
301 .unwrap_or_else(|| format!("{}-output", name));
302 let topic = match topic_prefix {
304 Some(prefix) => format!("{prefix}.{base_topic}"),
305 None => base_topic,
306 };
307
308 let transactional_id =
311 config
312 .properties
313 .get("transactional_id")
314 .cloned()
315 .or_else(|| {
316 config
317 .properties
318 .get("exactly_once")
319 .filter(|v| v == &"true")
320 .map(|_| format!("varpulis-{}", name))
321 });
322
323 let mut properties = config.properties.clone();
325 let param_mapping: &[(&str, &str)] = &[
326 ("batch_size", "batch.size"),
327 ("linger_ms", "linger.ms"),
328 ("compression_type", "compression.type"),
329 ("message_timeout_ms", "message.timeout.ms"),
330 ("security_protocol", "security.protocol"),
332 ("sasl_mechanism", "sasl.mechanism"),
333 ("sasl_username", "sasl.username"),
334 ("sasl_password", "sasl.password"),
335 ("ssl_ca_location", "ssl.ca.location"),
337 ("ssl_certificate_location", "ssl.certificate.location"),
338 ("ssl_key_location", "ssl.key.location"),
339 ];
340 for &(vpl_name, rdkafka_name) in param_mapping {
341 if let Some(value) = properties.swap_remove(vpl_name) {
342 properties.insert(rdkafka_name.to_string(), value);
343 }
344 }
345
346 let mut kafka_config =
347 connector::KafkaConfig::new(&brokers, &topic).with_properties(properties);
348 if let Some(tid) = transactional_id {
349 kafka_config = kafka_config.with_transactional_id(&tid);
350 }
351
352 match connector::KafkaSinkFull::new(name, kafka_config) {
353 Ok(sink) => {
354 if sink.is_transactional() {
355 Some(Arc::new(TransactionalKafkaSinkAdapter::new(name, sink)))
356 } else {
357 Some(Arc::new(BatchKafkaSinkAdapter::new(name, sink)))
358 }
359 }
360 Err(e) => {
361 warn!("Failed to create Kafka sink '{}': {}", name, e);
362 None
363 }
364 }
365 }
366 #[cfg(not(feature = "kafka"))]
367 {
368 warn!("Kafka connector '{}' requires 'kafka' feature flag", name);
369 None
370 }
371 }
372 "mqtt" => {
373 #[cfg(feature = "mqtt")]
374 {
375 let broker = config.url.clone();
376 let port: u16 = config
377 .properties
378 .get("port")
379 .and_then(|v| v.parse().ok())
380 .unwrap_or(1883);
381 let base_topic = topic_override
382 .map(|s| s.to_string())
383 .or_else(|| config.topic.clone())
384 .unwrap_or_else(|| format!("{name}-output"));
385 let topic = match topic_prefix {
387 Some(prefix) => format!("{prefix}.{base_topic}"),
388 None => base_topic,
389 };
390 let base_id = config
391 .properties
392 .get("client_id")
393 .cloned()
394 .unwrap_or_else(|| name.to_string());
395 let client_id = match context_name {
396 Some(ctx) => format!("{base_id}-{ctx}"),
397 None => base_id,
398 };
399 let mqtt_config = connector::MqttConfig::new(&broker, &topic)
400 .with_port(port)
401 .with_client_id(&client_id);
402 let sink = connector::MqttSink::new(name, mqtt_config);
403 Some(Arc::new(SinkConnectorAdapter {
404 name: name.to_string(),
405 inner: tokio::sync::Mutex::new(Box::new(sink)),
406 }))
407 }
408 #[cfg(not(feature = "mqtt"))]
409 {
410 warn!("MQTT connector '{}' requires 'mqtt' feature flag", name);
411 None
412 }
413 }
414 "nats" => {
415 #[cfg(feature = "nats")]
416 {
417 let servers = if config.url.is_empty() {
418 config
419 .properties
420 .get("servers")
421 .cloned()
422 .unwrap_or_else(|| "nats://localhost:4222".to_string())
423 } else {
424 config.url.clone()
425 };
426 let base_subject = topic_override
427 .map(|s| s.to_string())
428 .or_else(|| config.topic.clone())
429 .unwrap_or_else(|| format!("{}-output", name));
430 let subject = match topic_prefix {
431 Some(prefix) => format!("{prefix}.{base_subject}"),
432 None => base_subject,
433 };
434 let nats_config = connector::NatsConfig::new(&servers, &subject);
435 let sink = connector::NatsSink::new(name, nats_config);
436 Some(Arc::new(SinkConnectorAdapter {
437 name: name.to_string(),
438 inner: tokio::sync::Mutex::new(Box::new(sink)),
439 }))
440 }
441 #[cfg(not(feature = "nats"))]
442 {
443 warn!("NATS connector '{}' requires 'nats' feature flag", name);
444 None
445 }
446 }
447 "pulsar" => {
448 #[cfg(feature = "pulsar")]
449 {
450 let service_url = if config.url.is_empty() {
451 config
452 .properties
453 .get("url")
454 .cloned()
455 .unwrap_or_else(|| "pulsar://localhost:6650".to_string())
456 } else {
457 config.url.clone()
458 };
459 let base_topic = topic_override
460 .map(|s| s.to_string())
461 .or_else(|| config.topic.clone())
462 .unwrap_or_else(|| format!("{}-output", name));
463 let topic = match topic_prefix {
464 Some(prefix) => format!("{prefix}.{base_topic}"),
465 None => base_topic,
466 };
467
468 let mut pulsar_config = connector::PulsarConfig::new(&service_url, &topic);
469 if let Some(token) = config.properties.get("token") {
470 pulsar_config = pulsar_config.with_token(token);
471 }
472 let sink = connector::PulsarSink::new(name, pulsar_config);
473 Some(Arc::new(SinkConnectorAdapter {
474 name: name.to_string(),
475 inner: tokio::sync::Mutex::new(Box::new(sink)),
476 }))
477 }
478 #[cfg(not(feature = "pulsar"))]
479 {
480 warn!("Pulsar connector '{}' requires 'pulsar' feature flag", name);
481 None
482 }
483 }
484 "redis_stream" => {
485 #[cfg(feature = "redis")]
486 {
487 let url = if config.url.is_empty() {
488 config
489 .properties
490 .get("url")
491 .cloned()
492 .unwrap_or_else(|| "redis://localhost:6379".to_string())
493 } else {
494 config.url.clone()
495 };
496 let stream_key = topic_override
497 .map(|s| s.to_string())
498 .or_else(|| config.topic.clone())
499 .or_else(|| config.properties.get("stream_key").cloned())
500 .unwrap_or_else(|| format!("{}-output", name));
501
502 let mut rs_config = connector::RedisStreamConfig::new(&url, &stream_key);
503 if let Some(max_len) = config
504 .properties
505 .get("max_len")
506 .and_then(|v| v.parse().ok())
507 {
508 rs_config = rs_config.with_max_len(max_len);
509 }
510 let sink = connector::RedisStreamSinkStub::new(name, rs_config);
512 Some(Arc::new(SinkConnectorAdapter {
513 name: name.to_string(),
514 inner: tokio::sync::Mutex::new(Box::new(sink)),
515 }))
516 }
517 #[cfg(not(feature = "redis"))]
518 {
519 warn!(
520 "Redis Streams connector '{}' requires 'redis' feature flag",
521 name
522 );
523 None
524 }
525 }
526 other => {
527 debug!(
528 "Connector '{}' (type '{}') does not support sink output",
529 name, other
530 );
531 None
532 }
533 }
534}
535
536pub struct SinkRegistry {
543 cache: FxHashMap<String, Arc<dyn crate::sink::Sink>>,
544}
545
546impl SinkRegistry {
547 pub fn new() -> Self {
549 Self {
550 cache: FxHashMap::default(),
551 }
552 }
553
554 pub fn insert(&mut self, key: String, sink: Arc<dyn crate::sink::Sink>) {
556 self.cache.insert(key, sink);
557 }
558
559 pub fn cache(&self) -> &FxHashMap<String, Arc<dyn crate::sink::Sink>> {
561 &self.cache
562 }
563
564 pub fn cache_mut(&mut self) -> &mut FxHashMap<String, Arc<dyn crate::sink::Sink>> {
566 &mut self.cache
567 }
568
569 pub fn build_from_connectors(
575 &mut self,
576 connectors: &FxHashMap<String, connector::ConnectorConfig>,
577 referenced_keys: &FxHashSet<String>,
578 topic_overrides: &[(String, String, String)],
579 context_name: Option<&str>,
580 topic_prefix: Option<&str>,
581 ) {
582 for (name, config) in connectors {
584 if referenced_keys.contains(name) {
585 if let Some(sink) =
586 create_sink_from_config(name, config, None, context_name, topic_prefix)
587 {
588 self.cache.insert(name.clone(), sink);
589 }
590 }
591 }
592
593 for (sink_key, connector_name, topic) in topic_overrides {
595 if !self.cache.contains_key(sink_key) {
596 if let Some(config) = connectors.get(connector_name) {
597 if let Some(sink) = create_sink_from_config(
598 connector_name,
599 config,
600 Some(topic),
601 context_name,
602 topic_prefix,
603 ) {
604 self.cache.insert(sink_key.clone(), sink);
605 }
606 }
607 }
608 }
609 }
610
611 #[allow(dead_code)]
613 pub fn get(&self, key: &str) -> Option<&Arc<dyn crate::sink::Sink>> {
614 self.cache.get(key)
615 }
616
617 pub async fn connect_all(&self) -> Result<(), String> {
622 for (name, sink) in &self.cache {
623 if let Err(e) = sink.connect().await {
624 return Err(format!("Failed to connect sink '{name}': {e}"));
625 }
626 }
627 Ok(())
628 }
629
630 pub fn wrap_with_resilience(
636 &mut self,
637 cb_config: crate::circuit_breaker::CircuitBreakerConfig,
638 dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
639 metrics: Option<crate::metrics::Metrics>,
640 ) {
641 let old_cache = std::mem::take(&mut self.cache);
642 for (key, sink) in old_cache {
643 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
644 cb_config.clone(),
645 ));
646 let resilient = Arc::new(crate::sink::ResilientSink::new(
647 sink,
648 cb,
649 dlq.clone(),
650 metrics.clone(),
651 ));
652 self.cache.insert(key, resilient);
653 }
654 }
655}
656
657impl Default for SinkRegistry {
658 fn default() -> Self {
659 Self::new()
660 }
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666
667 #[test]
668 fn test_registry_new() {
669 let registry = SinkRegistry::new();
670 assert!(registry.cache.is_empty());
671 }
672
673 #[test]
674 fn test_console_sink_creation() {
675 let config = connector::ConnectorConfig::new("console", "");
676 let sink = create_sink_from_config("test_console", &config, None, None, None);
677 assert!(sink.is_some());
678 }
679
680 #[test]
681 fn test_unknown_connector_returns_none() {
682 let config = connector::ConnectorConfig::new("unknown_type", "");
683 let sink = create_sink_from_config("test", &config, None, None, None);
684 assert!(sink.is_none());
685 }
686
687 #[test]
688 fn test_connector_params_extracts_exactly_once() {
689 use varpulis_core::ast::{ConfigValue, ConnectorParam};
690
691 let params = vec![
692 ConnectorParam {
693 name: "brokers".to_string(),
694 value: ConfigValue::Str("localhost:9092".to_string()),
695 },
696 ConnectorParam {
697 name: "topic".to_string(),
698 value: ConfigValue::Str("my-topic".to_string()),
699 },
700 ConnectorParam {
701 name: "exactly_once".to_string(),
702 value: ConfigValue::Bool(true),
703 },
704 ];
705
706 let config = connector_params_to_config("kafka", ¶ms);
707 assert_eq!(config.url, "localhost:9092");
708 assert_eq!(config.topic, Some("my-topic".to_string()));
709 assert_eq!(
711 config.properties.get("exactly_once"),
712 Some(&"true".to_string())
713 );
714 }
715
716 #[test]
717 fn test_connector_params_extracts_transactional_id() {
718 use varpulis_core::ast::{ConfigValue, ConnectorParam};
719
720 let params = vec![
721 ConnectorParam {
722 name: "brokers".to_string(),
723 value: ConfigValue::Str("localhost:9092".to_string()),
724 },
725 ConnectorParam {
726 name: "transactional_id".to_string(),
727 value: ConfigValue::Str("my-app-txn".to_string()),
728 },
729 ];
730
731 let config = connector_params_to_config("kafka", ¶ms);
732 assert_eq!(
733 config.properties.get("transactional_id"),
734 Some(&"my-app-txn".to_string())
735 );
736 }
737}