Skip to main content

mq_bridge/
models.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use serde::{
7    de::{MapAccess, Visitor},
8    Deserialize, Deserializer, Serialize,
9};
10use std::{collections::HashMap, sync::Arc};
11
12use crate::traits::Handler;
13use tracing::trace;
14
15/// The top-level configuration is a map of named routes.
16/// The key is the route name (e.g., "kafka_to_nats").
17///
18/// # Examples
19///
20/// Deserializing a complex configuration from YAML:
21///
22/// ```
23/// use mq_bridge::models::{Config, EndpointType, Middleware};
24///
25/// let yaml = r#"
26/// kafka_to_nats:
27///   concurrency: 10
28///   input:
29///     middlewares:
30///       - deduplication:
31///           sled_path: "/tmp/mq-bridge/dedup_db"
32///           ttl_seconds: 3600
33///       - metrics: {}
34///     kafka:
35///       topic: "input-topic"
36///       url: "localhost:9092"
37///       group_id: "my-consumer-group"
38///   output:
39///     nats:
40///       subject: "output-subject"
41///       url: "nats://localhost:4222"
42/// "#;
43///
44/// let config: Config = serde_yaml_ng::from_str(yaml).unwrap();
45/// let route = config.get("kafka_to_nats").unwrap();
46///
47/// assert_eq!(route.options.concurrency, 10);
48/// // Check input middleware
49/// assert!(route.input.middlewares.iter().any(|m| matches!(m, Middleware::Deduplication(_))));
50/// // Check output endpoint
51/// assert!(matches!(route.output.endpoint_type, EndpointType::Nats(_)));
52/// ```
53pub type Config = HashMap<String, Route>;
54
55/// A configuration map for named publishers (endpoints).
56/// The key is the publisher name.
57pub type PublisherConfig = HashMap<String, Endpoint>;
58
59/// Defines a single message processing route from an input to an output.
60#[derive(Debug, Deserialize, Serialize, Clone)]
61#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
62#[serde(deny_unknown_fields)]
63pub struct Route {
64    /// The input/source endpoint for the route.
65    pub input: Endpoint,
66    /// The output/sink endpoint for the route.
67    #[serde(default = "default_output_endpoint")]
68    pub output: Endpoint,
69    /// (Optional) Fine-tuning options for the route's execution.
70    #[serde(flatten, default)]
71    pub options: RouteOptions,
72}
73
74impl Default for Route {
75    fn default() -> Self {
76        Self {
77            input: Endpoint::null(),
78            output: Endpoint::null(),
79            options: RouteOptions::default(),
80        }
81    }
82}
83
84/// Fine-tuning options for a route's execution.
85#[derive(Debug, Deserialize, Serialize, Clone)]
86#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
87#[serde(deny_unknown_fields)]
88pub struct RouteOptions {
89    /// (Optional) Number of concurrent processing tasks for this route. Defaults to 1.
90    #[serde(default = "default_concurrency")]
91    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
92    pub concurrency: usize,
93    /// (Optional) Number of messages to process in a single batch. Defaults to 1.
94    #[serde(default = "default_batch_size")]
95    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
96    pub batch_size: usize,
97    /// (Optional) The maximum number of concurrent commit tasks allowed. Defaults to 4096.
98    #[serde(default = "default_commit_concurrency_limit")]
99    pub commit_concurrency_limit: usize,
100}
101
102impl Default for RouteOptions {
103    fn default() -> Self {
104        Self {
105            concurrency: default_concurrency(),
106            batch_size: default_batch_size(),
107            commit_concurrency_limit: default_commit_concurrency_limit(),
108        }
109    }
110}
111
112pub(crate) fn default_concurrency() -> usize {
113    1
114}
115
116pub(crate) fn default_batch_size() -> usize {
117    1
118}
119
120pub(crate) fn default_commit_concurrency_limit() -> usize {
121    4096
122}
123
124fn default_output_endpoint() -> Endpoint {
125    Endpoint::new(EndpointType::Null)
126}
127
128fn default_retry_attempts() -> usize {
129    3
130}
131fn default_initial_interval_ms() -> u64 {
132    100
133}
134fn default_max_interval_ms() -> u64 {
135    5000
136}
137fn default_multiplier() -> f64 {
138    2.0
139}
140fn default_clean_session() -> bool {
141    false
142}
143
144fn is_known_endpoint_name(name: &str) -> bool {
145    matches!(
146        name,
147        "aws"
148            | "kafka"
149            | "nats"
150            | "file"
151            | "static"
152            | "memory"
153            | "amqp"
154            | "mongodb"
155            | "mqtt"
156            | "http"
157            | "ibm-mq"
158            | "ibmmq"
159            | "zeromq"
160            | "fanout"
161            | "switch"
162            | "response"
163    )
164}
165
166/// Represents a connection point for messages, which can be a source (input) or a sink (output).
167#[derive(Serialize, Clone, Default)]
168#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
169#[serde(deny_unknown_fields)]
170pub struct Endpoint {
171    /// (Optional) A list of middlewares to apply to the endpoint.
172    #[serde(default)]
173    pub middlewares: Vec<Middleware>,
174
175    /// The specific endpoint implementation, determined by the configuration key (e.g., "kafka", "nats").
176    #[serde(flatten)]
177    pub endpoint_type: EndpointType,
178
179    #[serde(skip_serializing)]
180    #[cfg_attr(feature = "schema", schemars(skip))]
181    /// Internal handler for processing messages (not serialized).
182    pub handler: Option<Arc<dyn Handler>>,
183}
184
185impl std::fmt::Debug for Endpoint {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.debug_struct("Endpoint")
188            .field("middlewares", &self.middlewares)
189            .field("endpoint_type", &self.endpoint_type)
190            .field(
191                "handler",
192                &if self.handler.is_some() {
193                    "Some(<Handler>)"
194                } else {
195                    "None"
196                },
197            )
198            .finish()
199    }
200}
201
202impl<'de> Deserialize<'de> for Endpoint {
203    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
204    where
205        D: Deserializer<'de>,
206    {
207        struct EndpointVisitor;
208
209        impl<'de> Visitor<'de> for EndpointVisitor {
210            type Value = Endpoint;
211
212            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213                formatter.write_str("a map representing an endpoint or null")
214            }
215
216            fn visit_unit<E>(self) -> Result<Self::Value, E>
217            where
218                E: serde::de::Error,
219            {
220                Ok(Endpoint::new(EndpointType::Null))
221            }
222
223            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
224            where
225                A: MapAccess<'de>,
226            {
227                // Buffer the map into a temporary serde_json::Map.
228                // This allows us to separate the `middlewares` field from the rest.
229                let mut temp_map = serde_json::Map::new();
230                let mut middlewares_val = None;
231
232                while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
233                    if key == "middlewares" {
234                        middlewares_val = Some(value);
235                    } else {
236                        temp_map.insert(key, value);
237                    }
238                }
239
240                // Deserialize the rest of the map into the flattened EndpointType.
241                let temp_val = serde_json::Value::Object(temp_map);
242                let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
243                    Ok(et) => et,
244                    Err(original_err) => {
245                        if let serde_json::Value::Object(map) = &temp_val {
246                            if map.len() == 1 {
247                                let (name, config) = map.iter().next().unwrap();
248                                if is_known_endpoint_name(name) {
249                                    return Err(serde::de::Error::custom(original_err));
250                                }
251                                trace!("Falling back to Custom endpoint for key: {}", name);
252                                EndpointType::Custom {
253                                    name: name.clone(),
254                                    config: config.clone(),
255                                }
256                            } else if map.is_empty() {
257                                EndpointType::Null
258                            } else {
259                                return Err(serde::de::Error::custom(
260                                    "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
261                                ));
262                            }
263                        } else {
264                            return Err(serde::de::Error::custom("Invalid endpoint configuration"));
265                        }
266                    }
267                };
268
269                // Deserialize the extracted middlewares value using the existing helper logic.
270                let middlewares = match middlewares_val {
271                    Some(val) => {
272                        deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
273                    }
274                    None => Vec::new(),
275                };
276
277                Ok(Endpoint {
278                    middlewares,
279                    endpoint_type,
280                    handler: None,
281                })
282            }
283        }
284
285        deserializer.deserialize_any(EndpointVisitor)
286    }
287}
288
289fn is_known_middleware_name(name: &str) -> bool {
290    matches!(
291        name,
292        "deduplication"
293            | "metrics"
294            | "dlq"
295            | "retry"
296            | "random_panic"
297            | "delay"
298            | "weak_join"
299            | "custom"
300    )
301}
302
303/// Deserialize middlewares from a generic serde_json::Value.
304///
305/// This logic was extracted from `deserialize_middlewares_from_map_or_seq` to be reused by the custom `Endpoint` deserializer.
306fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
307    let arr = match value {
308        serde_json::Value::Array(arr) => arr,
309        serde_json::Value::Object(map) => {
310            let mut middlewares: Vec<_> = map
311                .into_iter()
312                // The config crate can produce maps with numeric string keys ("0", "1", ...)
313                // from environment variables. We need to sort by these keys to maintain order.
314                .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
315                .collect();
316            middlewares.sort_by_key(|(index, _)| *index);
317
318            middlewares.into_iter().map(|(_, value)| value).collect()
319        }
320        _ => return Err(anyhow::anyhow!("Expected an array or object")),
321    };
322
323    let mut middlewares = Vec::new();
324    for item in arr {
325        // Check if it is a map with a single key that matches a known middleware
326        let known_name = if let serde_json::Value::Object(map) = &item {
327            if map.len() == 1 {
328                let (name, _) = map.iter().next().unwrap();
329                if is_known_middleware_name(name) {
330                    Some(name.clone())
331                } else {
332                    None
333                }
334            } else {
335                None
336            }
337        } else {
338            None
339        };
340
341        if let Some(name) = known_name {
342            match serde_json::from_value::<Middleware>(item.clone()) {
343                Ok(m) => middlewares.push(m),
344                Err(e) => {
345                    return Err(anyhow::anyhow!(
346                        "Failed to deserialize known middleware '{}': {}",
347                        name,
348                        e
349                    ))
350                }
351            }
352        } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
353            middlewares.push(m);
354        } else if let serde_json::Value::Object(map) = &item {
355            if map.len() == 1 {
356                let (name, config) = map.iter().next().unwrap();
357                middlewares.push(Middleware::Custom {
358                    name: name.clone(),
359                    config: config.clone(),
360                });
361            } else {
362                return Err(anyhow::anyhow!(
363                    "Invalid middleware configuration: {:?}",
364                    item
365                ));
366            }
367        } else {
368            return Err(anyhow::anyhow!(
369                "Invalid middleware configuration: {:?}",
370                item
371            ));
372        }
373    }
374    Ok(middlewares)
375}
376
377/// An enumeration of all supported endpoint types.
378/// `#[serde(rename_all = "lowercase")]` ensures that the keys in the config (e.g., "kafka")
379/// match the enum variants.
380///
381/// # Examples
382///
383/// Configuring a Fanout endpoint in YAML:
384/// ```
385/// use mq_bridge::models::{Endpoint, EndpointType};
386///
387/// let yaml = r#"
388/// fanout:
389///   - memory: { topic: "out1" }
390///   - memory: { topic: "out2" }
391/// "#;
392///
393/// let endpoint: Endpoint = serde_yaml_ng::from_str(yaml).unwrap();
394/// if let EndpointType::Fanout(targets) = endpoint.endpoint_type {
395///     assert_eq!(targets.len(), 2);
396/// }
397/// ```
398#[derive(Debug, Deserialize, Serialize, Clone, Default)]
399#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
400#[serde(rename_all = "lowercase")]
401pub enum EndpointType {
402    Aws(AwsConfig),
403    Kafka(KafkaConfig),
404    Nats(NatsConfig),
405    File(FileConfig),
406    Static(String),
407    Memory(MemoryConfig),
408    Amqp(AmqpConfig),
409    MongoDb(MongoDbConfig),
410    Mqtt(MqttConfig),
411    Http(HttpConfig),
412    IbmMq(IbmMqConfig),
413    ZeroMq(ZeroMqConfig),
414    Fanout(Vec<Endpoint>),
415    Switch(SwitchConfig),
416    Response(ResponseConfig),
417    Custom {
418        name: String,
419        config: serde_json::Value,
420    },
421    #[default]
422    Null,
423}
424
425impl EndpointType {
426    pub fn name(&self) -> &'static str {
427        match self {
428            EndpointType::Aws(_) => "aws",
429            EndpointType::Kafka(_) => "kafka",
430            EndpointType::Nats(_) => "nats",
431            EndpointType::File(_) => "file",
432            EndpointType::Static(_) => "static",
433            EndpointType::Memory(_) => "memory",
434            EndpointType::Amqp(_) => "amqp",
435            EndpointType::MongoDb(_) => "mongodb",
436            EndpointType::Mqtt(_) => "mqtt",
437            EndpointType::Http(_) => "http",
438            EndpointType::IbmMq(_) => "ibmmq",
439            EndpointType::ZeroMq(_) => "zeromq",
440            EndpointType::Fanout(_) => "fanout",
441            EndpointType::Switch(_) => "switch",
442            EndpointType::Response(_) => "response",
443            EndpointType::Custom { .. } => "custom",
444            EndpointType::Null => "null",
445        }
446    }
447
448    pub fn is_core(&self) -> bool {
449        matches!(
450            self,
451            EndpointType::File(_)
452                | EndpointType::Static(_)
453                | EndpointType::Memory(_)
454                | EndpointType::Fanout(_)
455                | EndpointType::Switch(_)
456                | EndpointType::Response(_)
457                | EndpointType::Custom { .. }
458                | EndpointType::Null
459        )
460    }
461}
462
463/// An enumeration of all supported middleware types.
464#[derive(Debug, Deserialize, Serialize, Clone)]
465#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
466#[serde(rename_all = "snake_case")]
467pub enum Middleware {
468    Deduplication(DeduplicationMiddleware),
469    Metrics(MetricsMiddleware),
470    Dlq(Box<DeadLetterQueueMiddleware>),
471    Retry(RetryMiddleware),
472    RandomPanic(RandomPanicMiddleware),
473    Delay(DelayMiddleware),
474    WeakJoin(WeakJoinMiddleware),
475    Custom {
476        name: String,
477        config: serde_json::Value,
478    },
479}
480
481/// Deduplication middleware configuration.
482#[derive(Debug, Deserialize, Serialize, Clone)]
483#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
484#[serde(deny_unknown_fields)]
485pub struct DeduplicationMiddleware {
486    /// Path to the Sled database directory.
487    pub sled_path: String,
488    /// Time-to-live for deduplication entries in seconds.
489    pub ttl_seconds: u64,
490}
491
492/// Metrics middleware configuration. It's currently a struct without fields
493/// but can be extended later. Its presence in the config enables the middleware.
494#[derive(Debug, Deserialize, Serialize, Clone)]
495#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
496#[serde(deny_unknown_fields)]
497pub struct MetricsMiddleware {}
498
499/// Dead-Letter Queue (DLQ) middleware configuration. It is recommended that the
500/// endpoint is also using a retry to avoid message loss
501#[derive(Debug, Deserialize, Serialize, Clone, Default)]
502#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
503#[serde(deny_unknown_fields)]
504pub struct DeadLetterQueueMiddleware {
505    /// The endpoint to send failed messages to.
506    pub endpoint: Endpoint,
507}
508
509#[derive(Debug, Deserialize, Serialize, Clone, Default)]
510#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
511#[serde(deny_unknown_fields)]
512pub struct RetryMiddleware {
513    /// Maximum number of retry attempts. Defaults to 3.
514    #[serde(default = "default_retry_attempts")]
515    pub max_attempts: usize,
516    /// Initial retry interval in milliseconds. Defaults to 100ms.
517    #[serde(default = "default_initial_interval_ms")]
518    pub initial_interval_ms: u64,
519    /// Maximum retry interval in milliseconds. Defaults to 5000ms.
520    #[serde(default = "default_max_interval_ms")]
521    pub max_interval_ms: u64,
522    /// Multiplier for exponential backoff. Defaults to 2.0.
523    #[serde(default = "default_multiplier")]
524    pub multiplier: f64,
525}
526
527#[derive(Debug, Deserialize, Serialize, Clone)]
528#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
529#[serde(deny_unknown_fields)]
530pub struct DelayMiddleware {
531    /// Delay duration in milliseconds.
532    pub delay_ms: u64,
533}
534
535#[derive(Debug, Deserialize, Serialize, Clone)]
536#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
537#[serde(deny_unknown_fields)]
538pub struct WeakJoinMiddleware {
539    /// The metadata key to group messages by (e.g., "correlation_id").
540    pub group_by: String,
541    /// The number of messages to wait for.
542    pub expected_count: usize,
543    /// Timeout in milliseconds.
544    pub timeout_ms: u64,
545}
546
547#[derive(Debug, Deserialize, Serialize, Clone)]
548#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
549#[serde(deny_unknown_fields)]
550pub struct RandomPanicMiddleware {
551    /// Probability of panic (0.0 to 1.0).
552    #[serde(deserialize_with = "deserialize_probability")]
553    pub probability: f64,
554}
555
556fn deserialize_probability<'de, D>(deserializer: D) -> Result<f64, D::Error>
557where
558    D: Deserializer<'de>,
559{
560    let value = f64::deserialize(deserializer)?;
561    if !(0.0..=1.0).contains(&value) {
562        return Err(serde::de::Error::custom(
563            "probability must be between 0.0 and 1.0",
564        ));
565    }
566    Ok(value)
567}
568
569fn deserialize_null_as_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
570where
571    D: Deserializer<'de>,
572{
573    let opt = Option::<bool>::deserialize(deserializer)?;
574    Ok(opt.unwrap_or(false))
575}
576
577// --- AWS Specific Configuration ---
578#[derive(Debug, Deserialize, Serialize, Clone, Default)]
579#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
580#[serde(deny_unknown_fields)]
581pub struct AwsConfig {
582    /// The SQS queue URL. Required for Consumer. Optional for Publisher if `topic_arn` is set.
583    pub queue_url: Option<String>,
584    /// (Publisher only) The SNS topic ARN.
585    pub topic_arn: Option<String>,
586    /// AWS Region (e.g., "us-east-1").
587    pub region: Option<String>,
588    /// Custom endpoint URL (e.g., for LocalStack).
589    pub endpoint_url: Option<String>,
590    /// AWS Access Key ID.
591    pub access_key: Option<String>,
592    /// AWS Secret Access Key.
593    pub secret_key: Option<String>,
594    /// AWS Session Token.
595    pub session_token: Option<String>,
596    /// (Consumer only) Maximum number of messages to receive in a batch (1-10).
597    #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
598    pub max_messages: Option<i32>,
599    /// (Consumer only) Wait time for long polling in seconds (0-20).
600    #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
601    pub wait_time_seconds: Option<i32>,
602}
603
604// --- Kafka Specific Configuration ---
605
606/// General Kafka connection configuration.
607#[derive(Debug, Deserialize, Serialize, Clone, Default)]
608#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
609#[serde(deny_unknown_fields)]
610pub struct KafkaConfig {
611    /// Comma-separated list of Kafka broker URLs.
612    #[serde(alias = "brokers")]
613    pub url: String,
614    /// The Kafka topic to produce to or consume from.
615    pub topic: Option<String>,
616    /// Optional username for SASL authentication.
617    pub username: Option<String>,
618    /// Optional password for SASL authentication.
619    pub password: Option<String>,
620    /// TLS configuration.
621    #[serde(default)]
622    pub tls: TlsConfig,
623    /// (Consumer only) Consumer group ID.
624    /// If not provided, the consumer acts in **Subscriber mode**: it generates a unique, ephemeral group ID and starts consuming from the latest offset.
625    pub group_id: Option<String>,
626    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
627    #[serde(default)]
628    pub delayed_ack: bool,
629    /// (Publisher only) Additional librdkafka producer configuration options (key-value pairs).
630    #[serde(default)]
631    pub producer_options: Option<Vec<(String, String)>>,
632    /// (Consumer only) Additional librdkafka consumer configuration options (key-value pairs).
633    #[serde(default)]
634    pub consumer_options: Option<Vec<(String, String)>>,
635}
636
637// --- File Specific Configuration ---
638
639#[derive(Debug, Serialize, Clone)]
640#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
641pub struct FileConfig {
642    /// Path to the file.
643    pub path: String,
644    /// (Consumer only) If true, acts in **Subscriber mode** (like `tail -f`), reading new lines as they are written.
645    /// If false (default), acts in Consumer mode, reading lines and removing them from the file (queue behavior).
646    #[serde(default)]
647    pub subscribe_mode: bool,
648    /// (Consumer only) If true, lines are removed from the file after being processed by all subscribers.
649    /// Defaults to true if subscribe_mode is false, and false if subscribe_mode is true.
650    pub delete: Option<bool>,
651}
652
653impl<'de> Deserialize<'de> for FileConfig {
654    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
655    where
656        D: Deserializer<'de>,
657    {
658        struct FileConfigVisitor;
659        impl<'de> Visitor<'de> for FileConfigVisitor {
660            type Value = FileConfig;
661            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
662                formatter.write_str("string or map")
663            }
664            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
665            where
666                E: serde::de::Error,
667            {
668                Ok(FileConfig {
669                    path: value.to_string(),
670                    subscribe_mode: false,
671                    delete: None,
672                })
673            }
674            fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
675            where
676                M: MapAccess<'de>,
677            {
678                let mut path = None;
679                let mut consume = true;
680                let mut subscribe_mode = None;
681                let mut delete = None;
682                while let Some(key) = map.next_key::<String>()? {
683                    match key.as_str() {
684                        "path" => {
685                            if path.is_some() {
686                                return Err(serde::de::Error::duplicate_field("path"));
687                            }
688                            path = Some(map.next_value()?);
689                        }
690                        "consume" => {
691                            consume = map.next_value()?;
692                        }
693                        "subscribe_mode" => {
694                            if subscribe_mode.is_some() {
695                                return Err(serde::de::Error::duplicate_field("subscribe_mode"));
696                            }
697                            subscribe_mode = Some(map.next_value()?);
698                        }
699                        "delete" => {
700                            if delete.is_some() {
701                                return Err(serde::de::Error::duplicate_field("delete"));
702                            }
703                            delete = map.next_value()?;
704                        }
705                        _ => {
706                            let _ = map.next_value::<serde::de::IgnoredAny>()?;
707                        }
708                    }
709                }
710                let path = path.ok_or_else(|| serde::de::Error::missing_field("path"))?;
711                Ok(FileConfig {
712                    path,
713                    subscribe_mode: subscribe_mode.unwrap_or(!consume),
714                    delete,
715                })
716            }
717        }
718        deserializer.deserialize_any(FileConfigVisitor)
719    }
720}
721
722// --- NATS Specific Configuration ---
723
724/// General NATS connection configuration.
725#[derive(Debug, Deserialize, Serialize, Clone, Default)]
726#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
727#[serde(deny_unknown_fields)]
728pub struct NatsConfig {
729    /// Comma-separated list of NATS server URLs (e.g., "nats://localhost:4222,nats://localhost:4223").
730    pub url: String,
731    /// The NATS subject to publish to or subscribe to.
732    pub subject: Option<String>,
733    /// (Consumer only). The JetStream stream name. Required for Consumers.
734    pub stream: Option<String>,
735    /// Optional username for authentication.
736    pub username: Option<String>,
737    /// Optional password for authentication.
738    pub password: Option<String>,
739    /// TLS configuration.
740    #[serde(default)]
741    pub tls: TlsConfig,
742    /// Optional token for authentication.
743    pub token: Option<String>,
744    /// (Publisher only) If true, the publisher uses the request-reply pattern.
745    /// It sends a request and waits for a response (using `core_client.request_with_headers()`).
746    /// Defaults to false.
747    #[serde(default)]
748    pub request_reply: bool,
749    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
750    pub request_timeout_ms: Option<u64>,
751    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
752    #[serde(default)]
753    pub delayed_ack: bool,
754    /// If no_jetstream: true, use Core NATS (fire-and-forget) instead of JetStream. Defaults to false.
755    #[serde(default)]
756    pub no_jetstream: bool,
757    /// (Consumer only) If true, use ephemeral **Subscriber mode**. Defaults to false (durable consumer).
758    #[serde(default)]
759    pub subscriber_mode: bool,
760    /// (Publisher only) Maximum number of messages in the stream (if created by the bridge). Defaults to 1,000,000.
761    pub stream_max_messages: Option<i64>,
762    /// (Publisher only) Maximum total bytes in the stream (if created by the bridge). Defaults to 1GB.
763    pub stream_max_bytes: Option<i64>,
764    /// (Consumer only) Number of messages to prefetch from the consumer. Defaults to 10000.
765    pub prefetch_count: Option<usize>,
766}
767
768#[derive(Debug, Serialize, Deserialize, Clone, Default)]
769#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
770#[serde(deny_unknown_fields)]
771pub struct MemoryConfig {
772    /// The topic name for the in-memory channel.
773    pub topic: String,
774    /// The capacity of the channel. Defaults to 100.
775    pub capacity: Option<usize>,
776    /// (Publisher only) If true, send() waits for a response.
777    #[serde(default)]
778    pub request_reply: bool,
779    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
780    pub request_timeout_ms: Option<u64>,
781    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false (queue).
782    #[serde(default)]
783    pub subscribe_mode: bool,
784    /// (Consumer only) If true, enables NACK support (re-queuing), which requires cloning messages. Defaults to false.
785    #[serde(default)]
786    pub enable_nack: bool,
787}
788
789impl MemoryConfig {
790    pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
791        Self {
792            topic: topic.into(),
793            capacity,
794            ..Default::default()
795        }
796    }
797    pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
798        Self {
799            subscribe_mode,
800            ..self
801        }
802    }
803}
804
805// --- AMQP Specific Configuration ---
806
807/// General AMQP connection configuration.
808#[derive(Debug, Deserialize, Serialize, Clone, Default)]
809#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
810#[serde(deny_unknown_fields)]
811pub struct AmqpConfig {
812    /// AMQP connection URI. The `lapin` client connects to a single host specified in the URI.
813    /// For high availability, provide the address of a load balancer or use DNS resolution
814    /// that points to multiple brokers. Example: "amqp://localhost:5672/vhost".
815    pub url: String,
816    /// The AMQP queue name.
817    pub queue: Option<String>,
818    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false.
819    #[serde(default)]
820    pub subscribe_mode: bool,
821    /// Optional username for authentication.
822    pub username: Option<String>,
823    /// Optional password for authentication.
824    pub password: Option<String>,
825    /// TLS configuration.
826    #[serde(default)]
827    pub tls: TlsConfig,
828    /// The exchange to publish to or bind the queue to.
829    pub exchange: Option<String>,
830    /// (Consumer only) Number of messages to prefetch. Defaults to 100.
831    pub prefetch_count: Option<u16>,
832    /// If true, declare queues as non-durable (transient). Defaults to false. Affects both Consumer (queue durability) and Publisher (message persistence).
833    #[serde(default)]
834    pub no_persistence: bool,
835    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
836    #[serde(default)]
837    pub delayed_ack: bool,
838}
839
840#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
841#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
842#[serde(rename_all = "lowercase")]
843pub enum MongoDbFormat {
844    #[default]
845    Normal,
846    Json,
847    Raw,
848}
849
850// --- MongoDB Specific Configuration ---
851
852/// General MongoDB connection configuration.
853#[derive(Debug, Deserialize, Serialize, Clone, Default)]
854#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
855#[serde(deny_unknown_fields)]
856pub struct MongoDbConfig {
857    /// MongoDB connection string URI. Can contain a comma-separated list of hosts for a replica set.
858    /// Credentials provided via the separate `username` and `password` fields take precedence over any credentials embedded in the URL.
859    pub url: String,
860    /// The MongoDB collection name.
861    pub collection: Option<String>,
862    /// Optional username. Takes precedence over any credentials embedded in the `url`.
863    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
864    pub username: Option<String>,
865    /// Optional password. Takes precedence over any credentials embedded in the `url`.
866    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
867    pub password: Option<String>,
868    /// TLS configuration.
869    #[serde(default)]
870    pub tls: TlsConfig,
871    /// The database name.
872    pub database: String,
873    /// (Consumer only) Polling interval in milliseconds for the consumer (when not using Change Streams). Defaults to 100ms.
874    pub polling_interval_ms: Option<u64>,
875    /// (Publisher only) Polling interval in milliseconds for the publisher when waiting for a reply. Defaults to 50ms.
876    pub reply_polling_ms: Option<u64>,
877    /// (Publisher only) If true, the publisher will wait for a response in a dedicated collection. Defaults to false.
878    #[serde(default)]
879    pub request_reply: bool,
880    /// (Consumer only) If true, use Change Streams (**Subscriber mode**). Defaults to false (polling/consumer mode).
881    #[serde(default)]
882    pub change_stream: bool,
883    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
884    pub request_timeout_ms: Option<u64>,
885    /// (Publisher only) TTL in seconds for documents created by the publisher. If set, a TTL index is created.
886    pub ttl_seconds: Option<u64>,
887    /// (Publisher only) If set, creates a capped collection with this size in bytes.
888    pub capped_size_bytes: Option<i64>,
889    /// Format for storing messages. Defaults to Normal.
890    #[serde(default)]
891    pub format: MongoDbFormat,
892    /// The ID used for the cursor in sequenced mode. If not provided, consumption starts from the current sequence (ephemeral).
893    pub cursor_id: Option<String>,
894}
895
896// --- MQTT Specific Configuration ---
897
898/// General MQTT connection configuration.
899#[derive(Debug, Deserialize, Serialize, Clone, Default)]
900#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
901#[serde(deny_unknown_fields)]
902pub struct MqttConfig {
903    /// MQTT broker URL (e.g., "tcp://localhost:1883"). Does not support multiple hosts.
904    pub url: String,
905    /// The MQTT topic.
906    pub topic: Option<String>,
907    /// Optional username for authentication.
908    pub username: Option<String>,
909    /// Optional password for authentication.
910    pub password: Option<String>,
911    /// TLS configuration.
912    #[serde(default)]
913    pub tls: TlsConfig,
914    /// Optional client ID. If not provided, one is generated or derived from route name.
915    pub client_id: Option<String>,
916    /// Capacity of the internal channel for incoming messages. Defaults to 100.
917    pub queue_capacity: Option<usize>,
918    /// Maximum number of inflight messages.
919    pub max_inflight: Option<u16>,
920    /// Quality of Service level (0, 1, or 2). Defaults to 1.
921    pub qos: Option<u8>,
922    /// (Consumer only) If true, start with a clean session. Defaults to false (persistent session). Setting this to true effectively enables **Subscriber mode** (ephemeral).
923    #[serde(default = "default_clean_session")]
924    pub clean_session: bool,
925    /// Keep-alive interval in seconds. Defaults to 20.
926    pub keep_alive_seconds: Option<u64>,
927    /// MQTT protocol version (V3 or V5). Defaults to V5.
928    #[serde(default)]
929    pub protocol: MqttProtocol,
930    /// Session expiry interval in seconds (MQTT v5 only).
931    pub session_expiry_interval: Option<u32>,
932    /// (Publisher only) If true, messages are acknowledged immediately upon receipt (auto-ack).
933    /// If false (default), messages are acknowledged after processing (manual-ack).
934    #[serde(default)]
935    pub delayed_ack: bool,
936}
937
938#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
939#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
940#[serde(rename_all = "lowercase")]
941pub enum MqttProtocol {
942    #[default]
943    V5,
944    V3,
945}
946
947// --- ZeroMQ Specific Configuration ---
948
949#[derive(Debug, Deserialize, Serialize, Clone, Default)]
950#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
951#[serde(deny_unknown_fields)]
952pub struct ZeroMqConfig {
953    /// The ZeroMQ URL (e.g., "tcp://127.0.0.1:5555").
954    pub url: String,
955    /// The socket type (PUSH, PULL, PUB, SUB, REQ, REP).
956    #[serde(default)]
957    pub socket_type: Option<ZeroMqSocketType>,
958    /// (Consumer only) The ZeroMQ topic (for SUB sockets).
959    pub topic: Option<String>,
960    /// If true, bind to the address. If false, connect.
961    #[serde(default)]
962    pub bind: bool,
963    /// Internal buffer size for the channel. Defaults to 128.
964    #[serde(default)]
965    pub internal_buffer_size: Option<usize>,
966}
967
968#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
969#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
970#[serde(rename_all = "lowercase")]
971pub enum ZeroMqSocketType {
972    Push,
973    Pull,
974    Pub,
975    Sub,
976    Req,
977    Rep,
978}
979
980// --- HTTP Specific Configuration ---
981
982/// General HTTP connection configuration.
983#[derive(Debug, Deserialize, Serialize, Clone, Default)]
984#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
985#[serde(deny_unknown_fields)]
986pub struct HttpConfig {
987    /// For consumers, the listen address (e.g., "0.0.0.0:8080"). For publishers, the target URL.
988    pub url: String,
989    /// TLS configuration.
990    #[serde(default)]
991    pub tls: TlsConfig,
992    /// (Consumer only) Number of worker threads to use. Defaults to 0 for unlimited.
993    pub workers: Option<usize>,
994    /// (Consumer only) Header key to extract the message ID from. Defaults to "message-id".
995    pub message_id_header: Option<String>,
996    /// (Consumer only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
997    pub request_timeout_ms: Option<u64>,
998    /// (Consumer only) Internal buffer size for the channel. Defaults to 100.
999    pub internal_buffer_size: Option<usize>,
1000    /// (Consumer only) If true, respond immediately with 202 Accepted without waiting for downstream processing. Defaults to false.
1001    #[serde(default)]
1002    pub fire_and_forget: bool,
1003}
1004
1005// --- IBM MQ Specific Configuration ---
1006
1007/// Connection settings for the IBM MQ Queue Manager.
1008#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1009#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1010#[serde(deny_unknown_fields)]
1011pub struct IbmMqConfig {
1012    /// Required. Connection URL in `host(port)` format. Supports comma-separated list for failover (e.g., `host1(1414),host2(1414)`).
1013    pub url: String,
1014    /// Target Queue name for point-to-point messaging. Optional if `topic` is set; defaults to route name if omitted.
1015    pub queue: Option<String>,
1016    /// Target Topic string for Publish/Subscribe. If set, enables **Subscriber mode** (Consumer) or publishes to a topic (Publisher). Optional if `queue` is set.
1017    pub topic: Option<String>,
1018    /// Required. Name of the Queue Manager to connect to (e.g., `QM1`).
1019    pub queue_manager: String,
1020    /// Required. Server Connection (SVRCONN) Channel name defined on the QM.
1021    pub channel: String,
1022    /// Username for authentication. Optional; required if the channel enforces authentication.
1023    pub username: Option<String>,
1024    /// Password for authentication. Optional; required if the channel enforces authentication.
1025    pub password: Option<String>,
1026    /// TLS CipherSpec (e.g., `ANY_TLS12`). Optional; required for encrypted connections.
1027    pub cipher_spec: Option<String>,
1028    /// TLS configuration settings (e.g., keystore paths). Optional.
1029    #[serde(default)]
1030    pub tls: TlsConfig,
1031    /// Maximum message size in bytes (default: 4MB). Optional.
1032    #[serde(default = "default_max_message_size")]
1033    pub max_message_size: usize,
1034    /// (Consumer only) Polling timeout in milliseconds (default: 1000ms). Optional.
1035    #[serde(default = "default_wait_timeout_ms")]
1036    pub wait_timeout_ms: i32,
1037    /// Internal buffer size for the channel. Defaults to 100.
1038    #[serde(default)]
1039    pub internal_buffer_size: Option<usize>,
1040}
1041
1042fn default_max_message_size() -> usize {
1043    4 * 1024 * 1024 // 4MB default
1044}
1045
1046fn default_wait_timeout_ms() -> i32 {
1047    1000 // 1 second default
1048}
1049
1050// --- Switch/Router Configuration ---
1051
1052#[derive(Debug, Deserialize, Serialize, Clone)]
1053#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1054#[serde(deny_unknown_fields)]
1055pub struct SwitchConfig {
1056    /// The metadata key to inspect for routing decisions.
1057    pub metadata_key: String,
1058    /// A map of values to endpoints.
1059    pub cases: HashMap<String, Endpoint>,
1060    /// The default endpoint if no case matches.
1061    pub default: Option<Box<Endpoint>>,
1062}
1063
1064// --- Response Endpoint Configuration ---
1065#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1066#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1067#[serde(deny_unknown_fields)]
1068pub struct ResponseConfig {
1069    // This struct is a marker and currently has no fields.
1070}
1071
1072// --- Common Configuration ---
1073
1074/// TLS configuration for secure connections.
1075#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1076#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1077#[serde(deny_unknown_fields)]
1078pub struct TlsConfig {
1079    /// If true, enable TLS/SSL.
1080    #[serde(default, deserialize_with = "deserialize_null_as_false")]
1081    pub required: bool,
1082    /// Path to the CA certificate file.
1083    pub ca_file: Option<String>,
1084    /// Path to the client certificate file (PEM).
1085    pub cert_file: Option<String>,
1086    /// Path to the client private key file (PEM).
1087    pub key_file: Option<String>,
1088    /// Password for the private key (if encrypted).
1089    pub cert_password: Option<String>,
1090    /// If true, disable server certificate verification (insecure).
1091    #[serde(default)]
1092    pub accept_invalid_certs: bool,
1093}
1094
1095impl TlsConfig {
1096    pub fn is_mtls_client_configured(&self) -> bool {
1097        self.required && self.cert_file.is_some() && self.key_file.is_some()
1098    }
1099    pub fn is_tls_server_configured(&self) -> bool {
1100        self.required && self.cert_file.is_some() && self.key_file.is_some()
1101    }
1102}
1103
1104#[cfg(test)]
1105mod tests {
1106    use super::*;
1107    use config::{Config as ConfigBuilder, Environment};
1108
1109    const TEST_YAML: &str = r#"
1110kafka_to_nats:
1111  concurrency: 10
1112  input:
1113    middlewares:
1114      - deduplication:
1115          sled_path: "/tmp/mq-bridge/dedup_db"
1116          ttl_seconds: 3600
1117      - metrics: {}
1118      - retry:
1119          max_attempts: 5
1120          initial_interval_ms: 200
1121      - random_panic:
1122          probability: 0.1
1123      - dlq:
1124          endpoint:
1125            nats:
1126              subject: "dlq-subject"
1127              url: "nats://localhost:4222"
1128    kafka:
1129      topic: "input-topic"
1130      url: "localhost:9092"
1131      group_id: "my-consumer-group"
1132      tls:
1133        required: true
1134        ca_file: "/path_to_ca"
1135        cert_file: "/path_to_cert"
1136        key_file: "/path_to_key"
1137        cert_password: "password"
1138        accept_invalid_certs: true
1139  output:
1140    middlewares:
1141      - metrics: {}
1142    nats:
1143      subject: "output-subject"
1144      url: "nats://localhost:4222"
1145"#;
1146
1147    fn assert_config_values(config: &Config) {
1148        assert_eq!(config.len(), 1);
1149        let route = config.get("kafka_to_nats").expect("Route should exist");
1150
1151        assert_eq!(route.options.concurrency, 10);
1152
1153        // --- Assert Input ---
1154        let input = &route.input;
1155        assert_eq!(input.middlewares.len(), 5);
1156
1157        let mut has_dedup = false;
1158        let mut has_metrics = false;
1159        let mut has_dlq = false;
1160        let mut has_retry = false;
1161        let mut has_random_panic = false;
1162        for middleware in &input.middlewares {
1163            match middleware {
1164                Middleware::Deduplication(dedup) => {
1165                    assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
1166                    assert_eq!(dedup.ttl_seconds, 3600);
1167                    has_dedup = true;
1168                }
1169                Middleware::Metrics(_) => {
1170                    has_metrics = true;
1171                }
1172                Middleware::Custom { .. } => {}
1173                Middleware::Dlq(dlq) => {
1174                    assert!(dlq.endpoint.middlewares.is_empty());
1175                    if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
1176                        assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
1177                        assert_eq!(nats_cfg.url, "nats://localhost:4222");
1178                    }
1179                    has_dlq = true;
1180                }
1181                Middleware::Retry(retry) => {
1182                    assert_eq!(retry.max_attempts, 5);
1183                    assert_eq!(retry.initial_interval_ms, 200);
1184                    has_retry = true;
1185                }
1186                Middleware::RandomPanic(rp) => {
1187                    assert!((rp.probability - 0.1).abs() < f64::EPSILON);
1188                    has_random_panic = true;
1189                }
1190                Middleware::Delay(_) => {}
1191                Middleware::WeakJoin(_) => {}
1192            }
1193        }
1194
1195        if let EndpointType::Kafka(kafka) = &input.endpoint_type {
1196            assert_eq!(kafka.topic, Some("input-topic".to_string()));
1197            assert_eq!(kafka.url, "localhost:9092");
1198            assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
1199            let tls = &kafka.tls;
1200            assert!(tls.required);
1201            assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
1202            assert!(tls.accept_invalid_certs);
1203        } else {
1204            panic!("Input endpoint should be Kafka");
1205        }
1206        assert!(has_dedup);
1207        assert!(has_metrics);
1208        assert!(has_dlq);
1209        assert!(has_retry);
1210        assert!(has_random_panic);
1211
1212        // --- Assert Output ---
1213        let output = &route.output;
1214        assert_eq!(output.middlewares.len(), 1);
1215        assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
1216
1217        if let EndpointType::Nats(nats) = &output.endpoint_type {
1218            assert_eq!(nats.subject, Some("output-subject".to_string()));
1219            assert_eq!(nats.url, "nats://localhost:4222");
1220        } else {
1221            panic!("Output endpoint should be NATS");
1222        }
1223    }
1224
1225    #[test]
1226    fn test_deserialize_from_yaml() {
1227        // We use serde_yaml directly here because the `config` crate's processing
1228        // can interfere with complex deserialization logic.
1229        let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
1230        println!("Deserialized from YAML: {:#?}", result);
1231        let config = result.expect("Failed to deserialize TEST_YAML");
1232        assert_config_values(&config);
1233    }
1234
1235    #[test]
1236    fn test_deserialize_from_env() {
1237        // Set environment variables based on README
1238        unsafe {
1239            std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
1240            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
1241            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
1242            std::env::set_var(
1243                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
1244                "my-consumer-group",
1245            );
1246            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
1247            std::env::set_var(
1248                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
1249                "/path_to_ca",
1250            );
1251            std::env::set_var(
1252                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
1253                "true",
1254            );
1255            std::env::set_var(
1256                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
1257                "output-subject",
1258            );
1259            std::env::set_var(
1260                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
1261                "nats://localhost:4222",
1262            );
1263            std::env::set_var(
1264                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
1265                "dlq-subject",
1266            );
1267            std::env::set_var(
1268                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
1269                "nats://localhost:4222",
1270            );
1271        }
1272
1273        let builder = ConfigBuilder::builder()
1274            // Enable automatic type parsing for values from environment variables.
1275            .add_source(
1276                Environment::with_prefix("MQB")
1277                    .separator("__")
1278                    .try_parsing(true),
1279            );
1280
1281        let config: Config = builder
1282            .build()
1283            .expect("Failed to build config")
1284            .try_deserialize()
1285            .expect("Failed to deserialize config");
1286
1287        // We can't test all values from env, but we can check the ones we set.
1288        assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
1289        if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
1290            assert_eq!(k.topic, Some("input-topic".to_string()));
1291            assert!(k.tls.required);
1292        } else {
1293            panic!("Expected Kafka endpoint");
1294        }
1295
1296        let input = &config.get("kafka_to_nats").unwrap().input;
1297        assert_eq!(input.middlewares.len(), 1);
1298        if let Middleware::Dlq(_) = &input.middlewares[0] {
1299            // Correctly parsed
1300        } else {
1301            panic!("Expected DLQ middleware");
1302        }
1303    }
1304}
1305
1306#[cfg(all(test, feature = "schema"))]
1307mod schema_tests {
1308    use super::*;
1309
1310    #[test]
1311    fn generate_json_schema() {
1312        let schema = schemars::schema_for!(Config);
1313        let schema_json = serde_json::to_string_pretty(&schema).unwrap();
1314
1315        let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1316        path.push("mq-bridge.schema.json");
1317        std::fs::write(path, schema_json).expect("Failed to write schema file");
1318    }
1319}