Skip to main content

varpulis_runtime/engine/
sink_factory.rs

1//! Sink construction and management for the Varpulis engine
2//!
3//! This module provides functionality to create sinks from connector configurations
4//! and manage a registry of active sinks.
5
6use std::sync::Arc;
7
8use indexmap::IndexMap;
9use rustc_hash::{FxHashMap, FxHashSet};
10use tracing::{debug, warn};
11use varpulis_core::ast::ConnectorParam;
12
13use crate::connector;
14#[cfg(feature = "kafka")]
15use crate::connector::SinkConnector;
16
17/// Convert AST ConnectorParams to a runtime ConnectorConfig
18pub fn connector_params_to_config(
19    connector_type: &str,
20    params: &[ConnectorParam],
21) -> connector::ConnectorConfig {
22    let mut url = String::new();
23    let mut topic = None;
24    let mut properties = IndexMap::new();
25
26    for param in params {
27        let value_str = match &param.value {
28            varpulis_core::ast::ConfigValue::Str(s) => s.clone(),
29            varpulis_core::ast::ConfigValue::Ident(s) => s.clone(),
30            varpulis_core::ast::ConfigValue::Int(i) => i.to_string(),
31            varpulis_core::ast::ConfigValue::Float(f) => f.to_string(),
32            varpulis_core::ast::ConfigValue::Bool(b) => b.to_string(),
33            varpulis_core::ast::ConfigValue::Duration(d) => format!("{d}ns"),
34            varpulis_core::ast::ConfigValue::Array(_) => continue,
35            varpulis_core::ast::ConfigValue::Map(_) => continue,
36        };
37        match param.name.as_str() {
38            "url" | "host" | "brokers" | "servers" => url = value_str,
39            "topic" => topic = Some(value_str),
40            other => {
41                properties.insert(other.to_string(), value_str);
42            }
43        }
44    }
45
46    let mut config = connector::ConnectorConfig::new(connector_type, &url);
47    if let Some(t) = topic {
48        config = config.with_topic(&t);
49    }
50    config.properties = properties;
51    config
52}
53
54/// Adapter: wraps a SinkConnector as a Sink for use in the sink registry
55pub struct SinkConnectorAdapter {
56    name: String,
57    inner: tokio::sync::Mutex<Box<dyn connector::SinkConnector>>,
58}
59
60impl std::fmt::Debug for SinkConnectorAdapter {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("SinkConnectorAdapter")
63            .field("name", &self.name)
64            .finish_non_exhaustive()
65    }
66}
67
68impl SinkConnectorAdapter {
69    /// Create a new adapter wrapping a SinkConnector.
70    pub fn new(name: &str, connector: Box<dyn connector::SinkConnector>) -> Self {
71        Self {
72            name: name.to_string(),
73            inner: tokio::sync::Mutex::new(connector),
74        }
75    }
76}
77
78#[async_trait::async_trait]
79impl crate::sink::Sink for SinkConnectorAdapter {
80    fn name(&self) -> &str {
81        &self.name
82    }
83    async fn connect(&self) -> Result<(), crate::sink::SinkError> {
84        let mut inner = self.inner.lock().await;
85        inner.connect().await.map_err(crate::sink::SinkError::from)
86    }
87    async fn send(&self, event: &crate::event::Event) -> Result<(), crate::sink::SinkError> {
88        let inner = self.inner.lock().await;
89        inner
90            .send(event)
91            .await
92            .map_err(crate::sink::SinkError::from)
93    }
94    async fn send_batch(
95        &self,
96        events: &[std::sync::Arc<crate::event::Event>],
97    ) -> Result<(), crate::sink::SinkError> {
98        // Acquire lock once for the entire batch
99        let inner = self.inner.lock().await;
100        for event in events {
101            inner
102                .send(event)
103                .await
104                .map_err(crate::sink::SinkError::from)?;
105        }
106        Ok(())
107    }
108    async fn flush(&self) -> Result<(), crate::sink::SinkError> {
109        let inner = self.inner.lock().await;
110        inner.flush().await.map_err(crate::sink::SinkError::from)
111    }
112    async fn close(&self) -> Result<(), crate::sink::SinkError> {
113        let inner = self.inner.lock().await;
114        inner.close().await.map_err(crate::sink::SinkError::from)
115    }
116}
117
118/// Adapter for Kafka sinks using transactional (exactly-once) delivery.
119///
120/// Wraps `KafkaSinkFull` directly (not via the `SinkConnector` trait) to access
121/// the transactional batch API. Single sends and batches are both wrapped in
122/// Kafka transactions so that consumers with `isolation.level=read_committed`
123/// see atomic, exactly-once delivery.
124#[cfg(feature = "kafka")]
125pub struct TransactionalKafkaSinkAdapter {
126    name: String,
127    inner: tokio::sync::Mutex<connector::KafkaSinkFull>,
128}
129
130#[cfg(feature = "kafka")]
131impl TransactionalKafkaSinkAdapter {
132    pub fn new(name: &str, sink: connector::KafkaSinkFull) -> Self {
133        Self {
134            name: name.to_string(),
135            inner: tokio::sync::Mutex::new(sink),
136        }
137    }
138}
139
140#[cfg(feature = "kafka")]
141#[async_trait::async_trait]
142impl crate::sink::Sink for TransactionalKafkaSinkAdapter {
143    fn name(&self) -> &str {
144        &self.name
145    }
146    async fn connect(&self) -> Result<(), crate::sink::SinkError> {
147        Ok(()) // Producer is connected at construction time
148    }
149    async fn send(&self, event: &crate::event::Event) -> Result<(), crate::sink::SinkError> {
150        let inner = self.inner.lock().await;
151        inner
152            .send(event)
153            .await
154            .map_err(crate::sink::SinkError::from)
155    }
156    async fn send_batch(
157        &self,
158        events: &[std::sync::Arc<crate::event::Event>],
159    ) -> Result<(), crate::sink::SinkError> {
160        let inner = self.inner.lock().await;
161        inner
162            .send_batch_transactional(events)
163            .await
164            .map_err(crate::sink::SinkError::from)
165    }
166    async fn flush(&self) -> Result<(), crate::sink::SinkError> {
167        let inner = self.inner.lock().await;
168        inner.flush().await.map_err(crate::sink::SinkError::from)
169    }
170    async fn close(&self) -> Result<(), crate::sink::SinkError> {
171        let inner = self.inner.lock().await;
172        inner.close().await.map_err(crate::sink::SinkError::from)
173    }
174}
175
176/// Adapter for Kafka sinks using concurrent batch delivery (non-transactional).
177///
178/// Wraps `KafkaSinkFull` directly (not via the `SinkConnector` trait) to access
179/// the concurrent `send_batch()` API. This enqueues all events into librdkafka's
180/// internal buffer, then awaits all delivery futures at once — avoiding the
181/// per-event `.await` bottleneck that limits throughput to ~166 events/s.
182#[cfg(feature = "kafka")]
183pub struct BatchKafkaSinkAdapter {
184    name: String,
185    inner: tokio::sync::Mutex<connector::KafkaSinkFull>,
186}
187
188#[cfg(feature = "kafka")]
189impl BatchKafkaSinkAdapter {
190    pub fn new(name: &str, sink: connector::KafkaSinkFull) -> Self {
191        Self {
192            name: name.to_string(),
193            inner: tokio::sync::Mutex::new(sink),
194        }
195    }
196}
197
198#[cfg(feature = "kafka")]
199#[async_trait::async_trait]
200impl crate::sink::Sink for BatchKafkaSinkAdapter {
201    fn name(&self) -> &str {
202        &self.name
203    }
204    async fn connect(&self) -> Result<(), crate::sink::SinkError> {
205        Ok(()) // Producer is connected at construction time
206    }
207    async fn send(&self, event: &crate::event::Event) -> Result<(), crate::sink::SinkError> {
208        let inner = self.inner.lock().await;
209        inner
210            .send(event)
211            .await
212            .map_err(crate::sink::SinkError::from)
213    }
214    async fn send_batch(
215        &self,
216        events: &[std::sync::Arc<crate::event::Event>],
217    ) -> Result<(), crate::sink::SinkError> {
218        let inner = self.inner.lock().await;
219        inner
220            .send_batch(events)
221            .await
222            .map_err(crate::sink::SinkError::from)
223    }
224    async fn flush(&self) -> Result<(), crate::sink::SinkError> {
225        let inner = self.inner.lock().await;
226        inner.flush().await.map_err(crate::sink::SinkError::from)
227    }
228    async fn close(&self) -> Result<(), crate::sink::SinkError> {
229        let inner = self.inner.lock().await;
230        inner.close().await.map_err(crate::sink::SinkError::from)
231    }
232}
233
234/// Create a sink from a ConnectorConfig, with an optional topic override from .to() params.
235///
236/// Tries the inventory-based `find_factory()` first, then falls back to the
237/// match-arm dispatch for connectors that haven't been migrated yet.
238#[allow(unused_variables)]
239pub fn create_sink_from_config(
240    name: &str,
241    config: &connector::ConnectorConfig,
242    topic_override: Option<&str>,
243    context_name: Option<&str>,
244    topic_prefix: Option<&str>,
245) -> Option<Arc<dyn crate::sink::Sink>> {
246    // Try inventory-based factory first
247    if let Some(factory) = connector::component::find_factory(&config.connector_type) {
248        if factory.info().supports_sink {
249            match factory.create_engine_sink(name, config, topic_override, context_name) {
250                Ok(sink) => return Some(sink),
251                Err(connector::ConnectorError::NotAvailable(_)) => {} // fall through
252                Err(e) => {
253                    warn!("Factory error creating sink '{}': {}", name, e);
254                    return None;
255                }
256            }
257        }
258    }
259
260    // Fallback to match-arm dispatch
261    match config.connector_type.as_str() {
262        "console" => Some(Arc::new(crate::sink::ConsoleSink::new(name))),
263        "file" => {
264            let path = if config.url.is_empty() {
265                config
266                    .properties
267                    .get("path")
268                    .cloned()
269                    .unwrap_or_else(|| format!("{name}.jsonl"))
270            } else {
271                config.url.clone()
272            };
273            match crate::sink::FileSink::new(name, &path) {
274                Ok(sink) => Some(Arc::new(sink)),
275                Err(e) => {
276                    warn!("Failed to create file sink '{}': {}", name, e);
277                    None
278                }
279            }
280        }
281        "http" => {
282            let url = config.url.clone();
283            if url.is_empty() {
284                warn!("HTTP connector '{}' has no URL configured", name);
285                None
286            } else {
287                Some(Arc::new(crate::sink::HttpSink::new(name, &url)))
288            }
289        }
290        "kafka" => {
291            #[cfg(feature = "kafka")]
292            {
293                let brokers = config
294                    .properties
295                    .get("brokers")
296                    .cloned()
297                    .unwrap_or_else(|| config.url.clone());
298                let base_topic = topic_override
299                    .map(|s| s.to_string())
300                    .or_else(|| config.topic.clone())
301                    .unwrap_or_else(|| format!("{}-output", name));
302                // Apply tenant topic prefix for multi-tenant isolation
303                let topic = match topic_prefix {
304                    Some(prefix) => format!("{prefix}.{base_topic}"),
305                    None => base_topic,
306                };
307
308                // Extract transactional_id from properties or auto-generate when
309                // exactly_once is requested
310                let transactional_id =
311                    config
312                        .properties
313                        .get("transactional_id")
314                        .cloned()
315                        .or_else(|| {
316                            config
317                                .properties
318                                .get("exactly_once")
319                                .filter(|v| v == &"true")
320                                .map(|_| format!("varpulis-{}", name))
321                        });
322
323                // Translate VPL-friendly underscore names to rdkafka dot-notation
324                let mut properties = config.properties.clone();
325                let param_mapping: &[(&str, &str)] = &[
326                    ("batch_size", "batch.size"),
327                    ("linger_ms", "linger.ms"),
328                    ("compression_type", "compression.type"),
329                    ("message_timeout_ms", "message.timeout.ms"),
330                    // SASL/SCRAM authentication
331                    ("security_protocol", "security.protocol"),
332                    ("sasl_mechanism", "sasl.mechanism"),
333                    ("sasl_username", "sasl.username"),
334                    ("sasl_password", "sasl.password"),
335                    // TLS
336                    ("ssl_ca_location", "ssl.ca.location"),
337                    ("ssl_certificate_location", "ssl.certificate.location"),
338                    ("ssl_key_location", "ssl.key.location"),
339                ];
340                for &(vpl_name, rdkafka_name) in param_mapping {
341                    if let Some(value) = properties.swap_remove(vpl_name) {
342                        properties.insert(rdkafka_name.to_string(), value);
343                    }
344                }
345
346                let mut kafka_config =
347                    connector::KafkaConfig::new(&brokers, &topic).with_properties(properties);
348                if let Some(tid) = transactional_id {
349                    kafka_config = kafka_config.with_transactional_id(&tid);
350                }
351
352                match connector::KafkaSinkFull::new(name, kafka_config) {
353                    Ok(sink) => {
354                        if sink.is_transactional() {
355                            Some(Arc::new(TransactionalKafkaSinkAdapter::new(name, sink)))
356                        } else {
357                            Some(Arc::new(BatchKafkaSinkAdapter::new(name, sink)))
358                        }
359                    }
360                    Err(e) => {
361                        warn!("Failed to create Kafka sink '{}': {}", name, e);
362                        None
363                    }
364                }
365            }
366            #[cfg(not(feature = "kafka"))]
367            {
368                warn!("Kafka connector '{}' requires 'kafka' feature flag", name);
369                None
370            }
371        }
372        "mqtt" => {
373            #[cfg(feature = "mqtt")]
374            {
375                let broker = config.url.clone();
376                let port: u16 = config
377                    .properties
378                    .get("port")
379                    .and_then(|v| v.parse().ok())
380                    .unwrap_or(1883);
381                let base_topic = topic_override
382                    .map(|s| s.to_string())
383                    .or_else(|| config.topic.clone())
384                    .unwrap_or_else(|| format!("{name}-output"));
385                // Apply tenant topic prefix for multi-tenant isolation
386                let topic = match topic_prefix {
387                    Some(prefix) => format!("{prefix}.{base_topic}"),
388                    None => base_topic,
389                };
390                let base_id = config
391                    .properties
392                    .get("client_id")
393                    .cloned()
394                    .unwrap_or_else(|| name.to_string());
395                let client_id = match context_name {
396                    Some(ctx) => format!("{base_id}-{ctx}"),
397                    None => base_id,
398                };
399                let mqtt_config = connector::MqttConfig::new(&broker, &topic)
400                    .with_port(port)
401                    .with_client_id(&client_id);
402                let sink = connector::MqttSink::new(name, mqtt_config);
403                Some(Arc::new(SinkConnectorAdapter {
404                    name: name.to_string(),
405                    inner: tokio::sync::Mutex::new(Box::new(sink)),
406                }))
407            }
408            #[cfg(not(feature = "mqtt"))]
409            {
410                warn!("MQTT connector '{}' requires 'mqtt' feature flag", name);
411                None
412            }
413        }
414        "nats" => {
415            #[cfg(feature = "nats")]
416            {
417                let servers = if config.url.is_empty() {
418                    config
419                        .properties
420                        .get("servers")
421                        .cloned()
422                        .unwrap_or_else(|| "nats://localhost:4222".to_string())
423                } else {
424                    config.url.clone()
425                };
426                let base_subject = topic_override
427                    .map(|s| s.to_string())
428                    .or_else(|| config.topic.clone())
429                    .unwrap_or_else(|| format!("{}-output", name));
430                let subject = match topic_prefix {
431                    Some(prefix) => format!("{prefix}.{base_subject}"),
432                    None => base_subject,
433                };
434                let nats_config = connector::NatsConfig::new(&servers, &subject);
435                let sink = connector::NatsSink::new(name, nats_config);
436                Some(Arc::new(SinkConnectorAdapter {
437                    name: name.to_string(),
438                    inner: tokio::sync::Mutex::new(Box::new(sink)),
439                }))
440            }
441            #[cfg(not(feature = "nats"))]
442            {
443                warn!("NATS connector '{}' requires 'nats' feature flag", name);
444                None
445            }
446        }
447        "pulsar" => {
448            #[cfg(feature = "pulsar")]
449            {
450                let service_url = if config.url.is_empty() {
451                    config
452                        .properties
453                        .get("url")
454                        .cloned()
455                        .unwrap_or_else(|| "pulsar://localhost:6650".to_string())
456                } else {
457                    config.url.clone()
458                };
459                let base_topic = topic_override
460                    .map(|s| s.to_string())
461                    .or_else(|| config.topic.clone())
462                    .unwrap_or_else(|| format!("{}-output", name));
463                let topic = match topic_prefix {
464                    Some(prefix) => format!("{prefix}.{base_topic}"),
465                    None => base_topic,
466                };
467
468                let mut pulsar_config = connector::PulsarConfig::new(&service_url, &topic);
469                if let Some(token) = config.properties.get("token") {
470                    pulsar_config = pulsar_config.with_token(token);
471                }
472                let sink = connector::PulsarSink::new(name, pulsar_config);
473                Some(Arc::new(SinkConnectorAdapter {
474                    name: name.to_string(),
475                    inner: tokio::sync::Mutex::new(Box::new(sink)),
476                }))
477            }
478            #[cfg(not(feature = "pulsar"))]
479            {
480                warn!("Pulsar connector '{}' requires 'pulsar' feature flag", name);
481                None
482            }
483        }
484        "redis_stream" => {
485            #[cfg(feature = "redis")]
486            {
487                let url = if config.url.is_empty() {
488                    config
489                        .properties
490                        .get("url")
491                        .cloned()
492                        .unwrap_or_else(|| "redis://localhost:6379".to_string())
493                } else {
494                    config.url.clone()
495                };
496                let stream_key = topic_override
497                    .map(|s| s.to_string())
498                    .or_else(|| config.topic.clone())
499                    .or_else(|| config.properties.get("stream_key").cloned())
500                    .unwrap_or_else(|| format!("{}-output", name));
501
502                let mut rs_config = connector::RedisStreamConfig::new(&url, &stream_key);
503                if let Some(max_len) = config
504                    .properties
505                    .get("max_len")
506                    .and_then(|v| v.parse().ok())
507                {
508                    rs_config = rs_config.with_max_len(max_len);
509                }
510                // Use deferred-connect stub: actual connection happens on first send
511                let sink = connector::RedisStreamSinkStub::new(name, rs_config);
512                Some(Arc::new(SinkConnectorAdapter {
513                    name: name.to_string(),
514                    inner: tokio::sync::Mutex::new(Box::new(sink)),
515                }))
516            }
517            #[cfg(not(feature = "redis"))]
518            {
519                warn!(
520                    "Redis Streams connector '{}' requires 'redis' feature flag",
521                    name
522                );
523                None
524            }
525        }
526        other => {
527            debug!(
528                "Connector '{}' (type '{}') does not support sink output",
529                name, other
530            );
531            None
532        }
533    }
534}
535
536/// Registry for managing sink instances.
537///
538/// Handles:
539/// - Building sinks from connector configurations
540/// - Caching created sinks by their keys
541/// - Connecting all registered sinks
542pub struct SinkRegistry {
543    cache: FxHashMap<String, Arc<dyn crate::sink::Sink>>,
544}
545
546impl SinkRegistry {
547    /// Create a new empty sink registry
548    pub fn new() -> Self {
549        Self {
550            cache: FxHashMap::default(),
551        }
552    }
553
554    /// Insert a pre-built sink into the registry.
555    pub fn insert(&mut self, key: String, sink: Arc<dyn crate::sink::Sink>) {
556        self.cache.insert(key, sink);
557    }
558
559    /// Get the internal cache (for compatibility with existing code)
560    pub fn cache(&self) -> &FxHashMap<String, Arc<dyn crate::sink::Sink>> {
561        &self.cache
562    }
563
564    /// Get the internal cache mutably (for hot reload)
565    pub fn cache_mut(&mut self) -> &mut FxHashMap<String, Arc<dyn crate::sink::Sink>> {
566        &mut self.cache
567    }
568
569    /// Build sinks from connector declarations, only for referenced sink keys.
570    ///
571    /// Creating unreferenced sinks (e.g. a base connector entry when only
572    /// topic-override entries are used) wastes resources and — for MQTT —
573    /// causes duplicate client_id conflicts that disconnect the useful sink.
574    pub fn build_from_connectors(
575        &mut self,
576        connectors: &FxHashMap<String, connector::ConnectorConfig>,
577        referenced_keys: &FxHashSet<String>,
578        topic_overrides: &[(String, String, String)],
579        context_name: Option<&str>,
580        topic_prefix: Option<&str>,
581    ) {
582        // Create sinks for directly referenced connectors
583        for (name, config) in connectors {
584            if referenced_keys.contains(name) {
585                if let Some(sink) =
586                    create_sink_from_config(name, config, None, context_name, topic_prefix)
587                {
588                    self.cache.insert(name.clone(), sink);
589                }
590            }
591        }
592
593        // Create sinks for topic-override keys
594        for (sink_key, connector_name, topic) in topic_overrides {
595            if !self.cache.contains_key(sink_key) {
596                if let Some(config) = connectors.get(connector_name) {
597                    if let Some(sink) = create_sink_from_config(
598                        connector_name,
599                        config,
600                        Some(topic),
601                        context_name,
602                        topic_prefix,
603                    ) {
604                        self.cache.insert(sink_key.clone(), sink);
605                    }
606                }
607            }
608        }
609    }
610
611    /// Get a sink by its key
612    #[allow(dead_code)]
613    pub fn get(&self, key: &str) -> Option<&Arc<dyn crate::sink::Sink>> {
614        self.cache.get(key)
615    }
616
617    /// Connect all sinks that require explicit connection.
618    ///
619    /// Call this after building sinks to establish connections to external systems
620    /// like MQTT brokers, databases, etc.
621    pub async fn connect_all(&self) -> Result<(), String> {
622        for (name, sink) in &self.cache {
623            if let Err(e) = sink.connect().await {
624                return Err(format!("Failed to connect sink '{name}': {e}"));
625            }
626        }
627        Ok(())
628    }
629
630    /// Wrap all registered sinks with circuit breaker + DLQ protection.
631    ///
632    /// Call after `build_from_connectors()` to add resilience to every sink.
633    /// Events that fail delivery (or are rejected by the circuit breaker)
634    /// are routed to the DLQ file instead of being silently dropped.
635    pub fn wrap_with_resilience(
636        &mut self,
637        cb_config: crate::circuit_breaker::CircuitBreakerConfig,
638        dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
639        metrics: Option<crate::metrics::Metrics>,
640    ) {
641        let old_cache = std::mem::take(&mut self.cache);
642        for (key, sink) in old_cache {
643            let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
644                cb_config.clone(),
645            ));
646            let resilient = Arc::new(crate::sink::ResilientSink::new(
647                sink,
648                cb,
649                dlq.clone(),
650                metrics.clone(),
651            ));
652            self.cache.insert(key, resilient);
653        }
654    }
655}
656
657impl Default for SinkRegistry {
658    fn default() -> Self {
659        Self::new()
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666
667    #[test]
668    fn test_registry_new() {
669        let registry = SinkRegistry::new();
670        assert!(registry.cache.is_empty());
671    }
672
673    #[test]
674    fn test_console_sink_creation() {
675        let config = connector::ConnectorConfig::new("console", "");
676        let sink = create_sink_from_config("test_console", &config, None, None, None);
677        assert!(sink.is_some());
678    }
679
680    #[test]
681    fn test_unknown_connector_returns_none() {
682        let config = connector::ConnectorConfig::new("unknown_type", "");
683        let sink = create_sink_from_config("test", &config, None, None, None);
684        assert!(sink.is_none());
685    }
686
687    #[test]
688    fn test_connector_params_extracts_exactly_once() {
689        use varpulis_core::ast::{ConfigValue, ConnectorParam};
690
691        let params = vec![
692            ConnectorParam {
693                name: "brokers".to_string(),
694                value: ConfigValue::Str("localhost:9092".to_string()),
695            },
696            ConnectorParam {
697                name: "topic".to_string(),
698                value: ConfigValue::Str("my-topic".to_string()),
699            },
700            ConnectorParam {
701                name: "exactly_once".to_string(),
702                value: ConfigValue::Bool(true),
703            },
704        ];
705
706        let config = connector_params_to_config("kafka", &params);
707        assert_eq!(config.url, "localhost:9092");
708        assert_eq!(config.topic, Some("my-topic".to_string()));
709        // exactly_once=true should be stored in properties
710        assert_eq!(
711            config.properties.get("exactly_once"),
712            Some(&"true".to_string())
713        );
714    }
715
716    #[test]
717    fn test_connector_params_extracts_transactional_id() {
718        use varpulis_core::ast::{ConfigValue, ConnectorParam};
719
720        let params = vec![
721            ConnectorParam {
722                name: "brokers".to_string(),
723                value: ConfigValue::Str("localhost:9092".to_string()),
724            },
725            ConnectorParam {
726                name: "transactional_id".to_string(),
727                value: ConfigValue::Str("my-app-txn".to_string()),
728            },
729        ];
730
731        let config = connector_params_to_config("kafka", &params);
732        assert_eq!(
733            config.properties.get("transactional_id"),
734            Some(&"my-app-txn".to_string())
735        );
736    }
737}