Skip to main content

mq_bridge/
models.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use serde::{
7    de::{MapAccess, Visitor},
8    Deserialize, Deserializer, Serialize,
9};
10use std::{collections::HashMap, sync::Arc};
11
12use crate::traits::Handler;
13use tracing::trace;
14
15/// The top-level configuration is a map of named routes.
16/// The key is the route name (e.g., "kafka_to_nats").
17///
18/// # Examples
19///
20/// Deserializing a complex configuration from YAML:
21///
22/// ```
23/// use mq_bridge::models::{Config, EndpointType, Middleware};
24///
25/// let yaml = r#"
26/// kafka_to_nats:
27///   concurrency: 10
28///   input:
29///     middlewares:
30///       - deduplication:
31///           sled_path: "/tmp/mq-bridge/dedup_db"
32///           ttl_seconds: 3600
33///       - metrics: {}
34///     kafka:
35///       topic: "input-topic"
36///       url: "localhost:9092"
37///       group_id: "my-consumer-group"
38///   output:
39///     nats:
40///       subject: "output-subject"
41///       url: "nats://localhost:4222"
42/// "#;
43///
44/// let config: Config = serde_yaml_ng::from_str(yaml).unwrap();
45/// let route = config.get("kafka_to_nats").unwrap();
46///
47/// assert_eq!(route.options.concurrency, 10);
48/// // Check input middleware
49/// assert!(route.input.middlewares.iter().any(|m| matches!(m, Middleware::Deduplication(_))));
50/// // Check output endpoint
51/// assert!(matches!(route.output.endpoint_type, EndpointType::Nats(_)));
52/// ```
53pub type Config = HashMap<String, Route>;
54
55/// A configuration map for named publishers (endpoints).
56/// The key is the publisher name.
57pub type PublisherConfig = HashMap<String, Endpoint>;
58
59/// Defines a single message processing route from an input to an output.
60#[derive(Debug, Deserialize, Serialize, Clone)]
61#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
62#[serde(deny_unknown_fields)]
63pub struct Route {
64    /// The input/source endpoint for the route.
65    pub input: Endpoint,
66    /// The output/sink endpoint for the route.
67    #[serde(default = "default_output_endpoint")]
68    pub output: Endpoint,
69    /// (Optional) Fine-tuning options for the route's execution.
70    #[serde(flatten, default)]
71    pub options: RouteOptions,
72}
73
74impl Default for Route {
75    fn default() -> Self {
76        Self {
77            input: Endpoint::null(),
78            output: Endpoint::null(),
79            options: RouteOptions::default(),
80        }
81    }
82}
83
84/// Fine-tuning options for a route's execution.
85#[derive(Debug, Deserialize, Serialize, Clone)]
86#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
87#[serde(deny_unknown_fields)]
88pub struct RouteOptions {
89    /// (Optional) Number of concurrent processing tasks for this route. Defaults to 1.
90    #[serde(default = "default_concurrency")]
91    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
92    pub concurrency: usize,
93    /// (Optional) Number of messages to process in a single batch. Defaults to 1.
94    #[serde(default = "default_batch_size")]
95    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
96    pub batch_size: usize,
97    /// (Optional) The maximum number of concurrent commit tasks allowed. Defaults to 4096.
98    #[serde(default = "default_commit_concurrency_limit")]
99    pub commit_concurrency_limit: usize,
100}
101
102impl Default for RouteOptions {
103    fn default() -> Self {
104        Self {
105            concurrency: default_concurrency(),
106            batch_size: default_batch_size(),
107            commit_concurrency_limit: default_commit_concurrency_limit(),
108        }
109    }
110}
111
112pub(crate) fn default_concurrency() -> usize {
113    1
114}
115
116pub(crate) fn default_batch_size() -> usize {
117    1
118}
119
120pub(crate) fn default_commit_concurrency_limit() -> usize {
121    4096
122}
123
124fn default_output_endpoint() -> Endpoint {
125    Endpoint::new(EndpointType::Null)
126}
127
128fn default_retry_attempts() -> usize {
129    3
130}
131fn default_initial_interval_ms() -> u64 {
132    100
133}
134fn default_max_interval_ms() -> u64 {
135    5000
136}
137fn default_multiplier() -> f64 {
138    2.0
139}
140fn default_clean_session() -> bool {
141    false
142}
143
144fn is_known_endpoint_name(name: &str) -> bool {
145    matches!(
146        name,
147        "aws"
148            | "kafka"
149            | "nats"
150            | "file"
151            | "static"
152            | "memory"
153            | "amqp"
154            | "mongodb"
155            | "mqtt"
156            | "http"
157            | "ibm-mq"
158            | "ibmmq"
159            | "zeromq"
160            | "fanout"
161            | "switch"
162            | "response"
163    )
164}
165
166/// Represents a connection point for messages, which can be a source (input) or a sink (output).
167#[derive(Serialize, Clone, Default)]
168#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
169#[serde(deny_unknown_fields)]
170pub struct Endpoint {
171    /// (Optional) A list of middlewares to apply to the endpoint.
172    #[serde(default)]
173    pub middlewares: Vec<Middleware>,
174
175    /// The specific endpoint implementation, determined by the configuration key (e.g., "kafka", "nats").
176    #[serde(flatten)]
177    pub endpoint_type: EndpointType,
178
179    #[serde(skip_serializing)]
180    #[cfg_attr(feature = "schema", schemars(skip))]
181    /// Internal handler for processing messages (not serialized).
182    pub handler: Option<Arc<dyn Handler>>,
183}
184
185impl std::fmt::Debug for Endpoint {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.debug_struct("Endpoint")
188            .field("middlewares", &self.middlewares)
189            .field("endpoint_type", &self.endpoint_type)
190            .field(
191                "handler",
192                &if self.handler.is_some() {
193                    "Some(<Handler>)"
194                } else {
195                    "None"
196                },
197            )
198            .finish()
199    }
200}
201
202impl<'de> Deserialize<'de> for Endpoint {
203    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
204    where
205        D: Deserializer<'de>,
206    {
207        struct EndpointVisitor;
208
209        impl<'de> Visitor<'de> for EndpointVisitor {
210            type Value = Endpoint;
211
212            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213                formatter.write_str("a map representing an endpoint or null")
214            }
215
216            fn visit_unit<E>(self) -> Result<Self::Value, E>
217            where
218                E: serde::de::Error,
219            {
220                Ok(Endpoint::new(EndpointType::Null))
221            }
222
223            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
224            where
225                A: MapAccess<'de>,
226            {
227                // Buffer the map into a temporary serde_json::Map.
228                // This allows us to separate the `middlewares` field from the rest.
229                let mut temp_map = serde_json::Map::new();
230                let mut middlewares_val = None;
231
232                while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
233                    if key == "middlewares" {
234                        middlewares_val = Some(value);
235                    } else {
236                        temp_map.insert(key, value);
237                    }
238                }
239
240                // Deserialize the rest of the map into the flattened EndpointType.
241                let temp_val = serde_json::Value::Object(temp_map);
242                let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
243                    Ok(et) => et,
244                    Err(original_err) => {
245                        if let serde_json::Value::Object(map) = &temp_val {
246                            if map.len() == 1 {
247                                let (name, config) = map.iter().next().unwrap();
248                                if is_known_endpoint_name(name) {
249                                    return Err(serde::de::Error::custom(original_err));
250                                }
251                                trace!("Falling back to Custom endpoint for key: {}", name);
252                                EndpointType::Custom {
253                                    name: name.clone(),
254                                    config: config.clone(),
255                                }
256                            } else if map.is_empty() {
257                                EndpointType::Null
258                            } else {
259                                return Err(serde::de::Error::custom(
260                                    "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
261                                ));
262                            }
263                        } else {
264                            return Err(serde::de::Error::custom("Invalid endpoint configuration"));
265                        }
266                    }
267                };
268
269                // Deserialize the extracted middlewares value using the existing helper logic.
270                let middlewares = match middlewares_val {
271                    Some(val) => {
272                        deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
273                    }
274                    None => Vec::new(),
275                };
276
277                Ok(Endpoint {
278                    middlewares,
279                    endpoint_type,
280                    handler: None,
281                })
282            }
283        }
284
285        deserializer.deserialize_any(EndpointVisitor)
286    }
287}
288
289fn is_known_middleware_name(name: &str) -> bool {
290    matches!(
291        name,
292        "deduplication"
293            | "metrics"
294            | "dlq"
295            | "retry"
296            | "random_panic"
297            | "delay"
298            | "weak_join"
299            | "custom"
300    )
301}
302
303/// Deserialize middlewares from a generic serde_json::Value.
304///
305/// This logic was extracted from `deserialize_middlewares_from_map_or_seq` to be reused by the custom `Endpoint` deserializer.
306fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
307    let arr = match value {
308        serde_json::Value::Array(arr) => arr,
309        serde_json::Value::Object(map) => {
310            let mut middlewares: Vec<_> = map
311                .into_iter()
312                // The config crate can produce maps with numeric string keys ("0", "1", ...)
313                // from environment variables. We need to sort by these keys to maintain order.
314                .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
315                .collect();
316            middlewares.sort_by_key(|(index, _)| *index);
317
318            middlewares.into_iter().map(|(_, value)| value).collect()
319        }
320        _ => return Err(anyhow::anyhow!("Expected an array or object")),
321    };
322
323    let mut middlewares = Vec::new();
324    for item in arr {
325        // Check if it is a map with a single key that matches a known middleware
326        let known_name = if let serde_json::Value::Object(map) = &item {
327            if map.len() == 1 {
328                let (name, _) = map.iter().next().unwrap();
329                if is_known_middleware_name(name) {
330                    Some(name.clone())
331                } else {
332                    None
333                }
334            } else {
335                None
336            }
337        } else {
338            None
339        };
340
341        if let Some(name) = known_name {
342            match serde_json::from_value::<Middleware>(item.clone()) {
343                Ok(m) => middlewares.push(m),
344                Err(e) => {
345                    return Err(anyhow::anyhow!(
346                        "Failed to deserialize known middleware '{}': {}",
347                        name,
348                        e
349                    ))
350                }
351            }
352        } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
353            middlewares.push(m);
354        } else if let serde_json::Value::Object(map) = &item {
355            if map.len() == 1 {
356                let (name, config) = map.iter().next().unwrap();
357                middlewares.push(Middleware::Custom {
358                    name: name.clone(),
359                    config: config.clone(),
360                });
361            } else {
362                return Err(anyhow::anyhow!(
363                    "Invalid middleware configuration: {:?}",
364                    item
365                ));
366            }
367        } else {
368            return Err(anyhow::anyhow!(
369                "Invalid middleware configuration: {:?}",
370                item
371            ));
372        }
373    }
374    Ok(middlewares)
375}
376
377/// An enumeration of all supported endpoint types.
378/// `#[serde(rename_all = "lowercase")]` ensures that the keys in the config (e.g., "kafka")
379/// match the enum variants.
380///
381/// # Examples
382///
383/// Configuring a Fanout endpoint in YAML:
384/// ```
385/// use mq_bridge::models::{Endpoint, EndpointType};
386///
387/// let yaml = r#"
388/// fanout:
389///   - memory: { topic: "out1" }
390///   - memory: { topic: "out2" }
391/// "#;
392///
393/// let endpoint: Endpoint = serde_yaml_ng::from_str(yaml).unwrap();
394/// if let EndpointType::Fanout(targets) = endpoint.endpoint_type {
395///     assert_eq!(targets.len(), 2);
396/// }
397/// ```
398#[derive(Debug, Deserialize, Serialize, Clone, Default)]
399#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
400#[serde(rename_all = "lowercase")]
401pub enum EndpointType {
402    Aws(AwsConfig),
403    Kafka(KafkaConfig),
404    Nats(NatsConfig),
405    File(FileConfig),
406    Static(String),
407    Memory(MemoryConfig),
408    Amqp(AmqpConfig),
409    MongoDb(MongoDbConfig),
410    Mqtt(MqttConfig),
411    Http(HttpConfig),
412    IbmMq(IbmMqConfig),
413    ZeroMq(ZeroMqConfig),
414    Fanout(Vec<Endpoint>),
415    Switch(SwitchConfig),
416    Response(ResponseConfig),
417    Custom {
418        name: String,
419        config: serde_json::Value,
420    },
421    #[default]
422    Null,
423}
424
425impl EndpointType {
426    pub fn name(&self) -> &'static str {
427        match self {
428            EndpointType::Aws(_) => "aws",
429            EndpointType::Kafka(_) => "kafka",
430            EndpointType::Nats(_) => "nats",
431            EndpointType::File(_) => "file",
432            EndpointType::Static(_) => "static",
433            EndpointType::Memory(_) => "memory",
434            EndpointType::Amqp(_) => "amqp",
435            EndpointType::MongoDb(_) => "mongodb",
436            EndpointType::Mqtt(_) => "mqtt",
437            EndpointType::Http(_) => "http",
438            EndpointType::IbmMq(_) => "ibmmq",
439            EndpointType::ZeroMq(_) => "zeromq",
440            EndpointType::Fanout(_) => "fanout",
441            EndpointType::Switch(_) => "switch",
442            EndpointType::Response(_) => "response",
443            EndpointType::Custom { .. } => "custom",
444            EndpointType::Null => "null",
445        }
446    }
447
448    pub fn is_core(&self) -> bool {
449        matches!(
450            self,
451            EndpointType::File(_)
452                | EndpointType::Static(_)
453                | EndpointType::Memory(_)
454                | EndpointType::Fanout(_)
455                | EndpointType::Switch(_)
456                | EndpointType::Response(_)
457                | EndpointType::Custom { .. }
458                | EndpointType::Null
459        )
460    }
461}
462
463/// An enumeration of all supported middleware types.
464#[derive(Debug, Deserialize, Serialize, Clone)]
465#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
466#[serde(rename_all = "snake_case")]
467pub enum Middleware {
468    Deduplication(DeduplicationMiddleware),
469    Metrics(MetricsMiddleware),
470    Dlq(Box<DeadLetterQueueMiddleware>),
471    Retry(RetryMiddleware),
472    RandomPanic(RandomPanicMiddleware),
473    Delay(DelayMiddleware),
474    WeakJoin(WeakJoinMiddleware),
475    Custom {
476        name: String,
477        config: serde_json::Value,
478    },
479}
480
481/// Deduplication middleware configuration.
482#[derive(Debug, Deserialize, Serialize, Clone)]
483#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
484#[serde(deny_unknown_fields)]
485pub struct DeduplicationMiddleware {
486    /// Path to the Sled database directory.
487    pub sled_path: String,
488    /// Time-to-live for deduplication entries in seconds.
489    pub ttl_seconds: u64,
490}
491
492/// Metrics middleware configuration. It's currently a struct without fields
493/// but can be extended later. Its presence in the config enables the middleware.
494#[derive(Debug, Deserialize, Serialize, Clone)]
495#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
496#[serde(deny_unknown_fields)]
497pub struct MetricsMiddleware {}
498
499/// Dead-Letter Queue (DLQ) middleware configuration. It is recommended that the
500/// endpoint is also using a retry to avoid message loss
501#[derive(Debug, Deserialize, Serialize, Clone, Default)]
502#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
503#[serde(deny_unknown_fields)]
504pub struct DeadLetterQueueMiddleware {
505    /// The endpoint to send failed messages to.
506    pub endpoint: Endpoint,
507}
508
509#[derive(Debug, Deserialize, Serialize, Clone, Default)]
510#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
511#[serde(deny_unknown_fields)]
512pub struct RetryMiddleware {
513    /// Maximum number of retry attempts. Defaults to 3.
514    #[serde(default = "default_retry_attempts")]
515    pub max_attempts: usize,
516    /// Initial retry interval in milliseconds. Defaults to 100ms.
517    #[serde(default = "default_initial_interval_ms")]
518    pub initial_interval_ms: u64,
519    /// Maximum retry interval in milliseconds. Defaults to 5000ms.
520    #[serde(default = "default_max_interval_ms")]
521    pub max_interval_ms: u64,
522    /// Multiplier for exponential backoff. Defaults to 2.0.
523    #[serde(default = "default_multiplier")]
524    pub multiplier: f64,
525}
526
527#[derive(Debug, Deserialize, Serialize, Clone)]
528#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
529#[serde(deny_unknown_fields)]
530pub struct DelayMiddleware {
531    /// Delay duration in milliseconds.
532    pub delay_ms: u64,
533}
534
535#[derive(Debug, Deserialize, Serialize, Clone)]
536#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
537#[serde(deny_unknown_fields)]
538pub struct WeakJoinMiddleware {
539    /// The metadata key to group messages by (e.g., "correlation_id").
540    pub group_by: String,
541    /// The number of messages to wait for.
542    pub expected_count: usize,
543    /// Timeout in milliseconds.
544    pub timeout_ms: u64,
545}
546
547#[derive(Debug, Deserialize, Serialize, Clone)]
548#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
549#[serde(deny_unknown_fields)]
550pub struct RandomPanicMiddleware {
551    /// Probability of panic (0.0 to 1.0).
552    #[serde(deserialize_with = "deserialize_probability")]
553    pub probability: f64,
554}
555
556fn deserialize_probability<'de, D>(deserializer: D) -> Result<f64, D::Error>
557where
558    D: Deserializer<'de>,
559{
560    let value = f64::deserialize(deserializer)?;
561    if !(0.0..=1.0).contains(&value) {
562        return Err(serde::de::Error::custom(
563            "probability must be between 0.0 and 1.0",
564        ));
565    }
566    Ok(value)
567}
568
569// --- AWS Specific Configuration ---
570#[derive(Debug, Deserialize, Serialize, Clone, Default)]
571#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
572#[serde(deny_unknown_fields)]
573pub struct AwsConfig {
574    /// The SQS queue URL. Required for Consumer. Optional for Publisher if `topic_arn` is set.
575    pub queue_url: Option<String>,
576    /// (Publisher only) The SNS topic ARN.
577    pub topic_arn: Option<String>,
578    /// AWS Region (e.g., "us-east-1").
579    pub region: Option<String>,
580    /// Custom endpoint URL (e.g., for LocalStack).
581    pub endpoint_url: Option<String>,
582    /// AWS Access Key ID.
583    pub access_key: Option<String>,
584    /// AWS Secret Access Key.
585    pub secret_key: Option<String>,
586    /// AWS Session Token.
587    pub session_token: Option<String>,
588    /// (Consumer only) Maximum number of messages to receive in a batch (1-10).
589    #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
590    pub max_messages: Option<i32>,
591    /// (Consumer only) Wait time for long polling in seconds (0-20).
592    #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
593    pub wait_time_seconds: Option<i32>,
594}
595
596// --- Kafka Specific Configuration ---
597
598/// General Kafka connection configuration.
599#[derive(Debug, Deserialize, Serialize, Clone, Default)]
600#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
601#[serde(deny_unknown_fields)]
602pub struct KafkaConfig {
603    /// Comma-separated list of Kafka broker URLs.
604    #[serde(alias = "brokers")]
605    pub url: String,
606    /// The Kafka topic to produce to or consume from.
607    pub topic: Option<String>,
608    /// Optional username for SASL authentication.
609    pub username: Option<String>,
610    /// Optional password for SASL authentication.
611    pub password: Option<String>,
612    /// TLS configuration.
613    #[serde(default)]
614    pub tls: TlsConfig,
615    /// (Consumer only) Consumer group ID.
616    /// If not provided, the consumer acts in **Subscriber mode**: it generates a unique, ephemeral group ID and starts consuming from the latest offset.
617    pub group_id: Option<String>,
618    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
619    #[serde(default)]
620    pub delayed_ack: bool,
621    /// (Publisher only) Additional librdkafka producer configuration options (key-value pairs).
622    #[serde(default)]
623    pub producer_options: Option<Vec<(String, String)>>,
624    /// (Consumer only) Additional librdkafka consumer configuration options (key-value pairs).
625    #[serde(default)]
626    pub consumer_options: Option<Vec<(String, String)>>,
627}
628
629// --- File Specific Configuration ---
630
631#[derive(Debug, Serialize, Clone)]
632#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
633pub struct FileConfig {
634    /// Path to the file.
635    pub path: String,
636    /// (Consumer only) If true, acts in **Subscriber mode** (like `tail -f`), reading new lines as they are written.
637    /// If false (default), acts in Consumer mode, reading lines and removing them from the file (queue behavior).
638    #[serde(default)]
639    pub subscribe_mode: bool,
640    /// (Consumer only) If true, lines are removed from the file after being processed by all subscribers.
641    /// Defaults to true if subscribe_mode is false, and false if subscribe_mode is true.
642    pub delete: Option<bool>,
643}
644
645impl<'de> Deserialize<'de> for FileConfig {
646    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
647    where
648        D: Deserializer<'de>,
649    {
650        struct FileConfigVisitor;
651        impl<'de> Visitor<'de> for FileConfigVisitor {
652            type Value = FileConfig;
653            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
654                formatter.write_str("string or map")
655            }
656            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
657            where
658                E: serde::de::Error,
659            {
660                Ok(FileConfig {
661                    path: value.to_string(),
662                    subscribe_mode: false,
663                    delete: None,
664                })
665            }
666            fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
667            where
668                M: MapAccess<'de>,
669            {
670                let mut path = None;
671                let mut consume = true;
672                let mut subscribe_mode = None;
673                let mut delete = None;
674                while let Some(key) = map.next_key::<String>()? {
675                    match key.as_str() {
676                        "path" => {
677                            if path.is_some() {
678                                return Err(serde::de::Error::duplicate_field("path"));
679                            }
680                            path = Some(map.next_value()?);
681                        }
682                        "consume" => {
683                            consume = map.next_value()?;
684                        }
685                        "subscribe_mode" => {
686                            if subscribe_mode.is_some() {
687                                return Err(serde::de::Error::duplicate_field("subscribe_mode"));
688                            }
689                            subscribe_mode = Some(map.next_value()?);
690                        }
691                        "delete" => {
692                            if delete.is_some() {
693                                return Err(serde::de::Error::duplicate_field("delete"));
694                            }
695                            delete = Some(map.next_value()?);
696                        }
697                        _ => {
698                            let _ = map.next_value::<serde::de::IgnoredAny>()?;
699                        }
700                    }
701                }
702                let path = path.ok_or_else(|| serde::de::Error::missing_field("path"))?;
703                Ok(FileConfig {
704                    path,
705                    subscribe_mode: subscribe_mode.unwrap_or(!consume),
706                    delete,
707                })
708            }
709        }
710        deserializer.deserialize_any(FileConfigVisitor)
711    }
712}
713
714// --- NATS Specific Configuration ---
715
716/// General NATS connection configuration.
717#[derive(Debug, Deserialize, Serialize, Clone, Default)]
718#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
719#[serde(deny_unknown_fields)]
720pub struct NatsConfig {
721    /// Comma-separated list of NATS server URLs (e.g., "nats://localhost:4222,nats://localhost:4223").
722    pub url: String,
723    /// The NATS subject to publish to or subscribe to.
724    pub subject: Option<String>,
725    /// (Consumer only). The JetStream stream name. Required for Consumers.
726    pub stream: Option<String>,
727    /// Optional username for authentication.
728    pub username: Option<String>,
729    /// Optional password for authentication.
730    pub password: Option<String>,
731    /// TLS configuration.
732    #[serde(default)]
733    pub tls: TlsConfig,
734    /// Optional token for authentication.
735    pub token: Option<String>,
736    /// (Publisher only) If true, the publisher uses the request-reply pattern.
737    /// It sends a request and waits for a response (using `core_client.request_with_headers()`).
738    /// Defaults to false.
739    #[serde(default)]
740    pub request_reply: bool,
741    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
742    pub request_timeout_ms: Option<u64>,
743    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
744    #[serde(default)]
745    pub delayed_ack: bool,
746    /// If no_jetstream: true, use Core NATS (fire-and-forget) instead of JetStream. Defaults to false.
747    #[serde(default)]
748    pub no_jetstream: bool,
749    /// (Consumer only) If true, use ephemeral **Subscriber mode**. Defaults to false (durable consumer).
750    #[serde(default)]
751    pub subscriber_mode: bool,
752    /// (Publisher only) Maximum number of messages in the stream (if created by the bridge). Defaults to 1,000,000.
753    pub stream_max_messages: Option<i64>,
754    /// (Publisher only) Maximum total bytes in the stream (if created by the bridge). Defaults to 1GB.
755    pub stream_max_bytes: Option<i64>,
756    /// (Consumer only) Number of messages to prefetch from the consumer. Defaults to 10000.
757    pub prefetch_count: Option<usize>,
758}
759
760#[derive(Debug, Serialize, Deserialize, Clone, Default)]
761#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
762#[serde(deny_unknown_fields)]
763pub struct MemoryConfig {
764    /// The topic name for the in-memory channel.
765    pub topic: String,
766    /// The capacity of the channel. Defaults to 100.
767    pub capacity: Option<usize>,
768    /// (Publisher only) If true, send() waits for a response.
769    #[serde(default)]
770    pub request_reply: bool,
771    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
772    pub request_timeout_ms: Option<u64>,
773    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false (queue).
774    #[serde(default)]
775    pub subscribe_mode: bool,
776    /// (Consumer only) If true, enables NACK support (re-queuing), which requires cloning messages. Defaults to false.
777    #[serde(default)]
778    pub enable_nack: bool,
779}
780
781impl MemoryConfig {
782    pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
783        Self {
784            topic: topic.into(),
785            capacity,
786            ..Default::default()
787        }
788    }
789    pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
790        Self {
791            subscribe_mode,
792            ..self
793        }
794    }
795}
796
797// --- AMQP Specific Configuration ---
798
799/// General AMQP connection configuration.
800#[derive(Debug, Deserialize, Serialize, Clone, Default)]
801#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
802#[serde(deny_unknown_fields)]
803pub struct AmqpConfig {
804    /// AMQP connection URI. The `lapin` client connects to a single host specified in the URI.
805    /// For high availability, provide the address of a load balancer or use DNS resolution
806    /// that points to multiple brokers. Example: "amqp://localhost:5672/vhost".
807    pub url: String,
808    /// The AMQP queue name.
809    pub queue: Option<String>,
810    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false.
811    #[serde(default)]
812    pub subscribe_mode: bool,
813    /// Optional username for authentication.
814    pub username: Option<String>,
815    /// Optional password for authentication.
816    pub password: Option<String>,
817    /// TLS configuration.
818    #[serde(default)]
819    pub tls: TlsConfig,
820    /// The exchange to publish to or bind the queue to.
821    pub exchange: Option<String>,
822    /// (Consumer only) Number of messages to prefetch. Defaults to 100.
823    pub prefetch_count: Option<u16>,
824    /// If true, declare queues as non-durable (transient). Defaults to false. Affects both Consumer (queue durability) and Publisher (message persistence).
825    #[serde(default)]
826    pub no_persistence: bool,
827    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
828    #[serde(default)]
829    pub delayed_ack: bool,
830}
831
832#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
833#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
834#[serde(rename_all = "lowercase")]
835pub enum MongoDbFormat {
836    #[default]
837    Normal,
838    Json,
839    Raw,
840}
841
842// --- MongoDB Specific Configuration ---
843
844/// General MongoDB connection configuration.
845#[derive(Debug, Deserialize, Serialize, Clone, Default)]
846#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
847#[serde(deny_unknown_fields)]
848pub struct MongoDbConfig {
849    /// MongoDB connection string URI. Can contain a comma-separated list of hosts for a replica set.
850    /// Credentials provided via the separate `username` and `password` fields take precedence over any credentials embedded in the URL.
851    pub url: String,
852    /// The MongoDB collection name.
853    pub collection: Option<String>,
854    /// Optional username. Takes precedence over any credentials embedded in the `url`.
855    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
856    pub username: Option<String>,
857    /// Optional password. Takes precedence over any credentials embedded in the `url`.
858    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
859    pub password: Option<String>,
860    /// TLS configuration.
861    #[serde(default)]
862    pub tls: TlsConfig,
863    /// The database name.
864    pub database: String,
865    /// (Consumer only) Polling interval in milliseconds for the consumer (when not using Change Streams). Defaults to 100ms.
866    pub polling_interval_ms: Option<u64>,
867    /// (Publisher only) Polling interval in milliseconds for the publisher when waiting for a reply. Defaults to 50ms.
868    pub reply_polling_ms: Option<u64>,
869    /// (Publisher only) If true, the publisher will wait for a response in a dedicated collection. Defaults to false.
870    #[serde(default)]
871    pub request_reply: bool,
872    /// (Consumer only) If true, use Change Streams (**Subscriber mode**). Defaults to false (polling/consumer mode).
873    #[serde(default)]
874    pub change_stream: bool,
875    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
876    pub request_timeout_ms: Option<u64>,
877    /// (Publisher only) TTL in seconds for documents created by the publisher. If set, a TTL index is created.
878    pub ttl_seconds: Option<u64>,
879    /// (Publisher only) If set, creates a capped collection with this size in bytes.
880    pub capped_size_bytes: Option<i64>,
881    /// Format for storing messages. Defaults to Normal.
882    #[serde(default)]
883    pub format: MongoDbFormat,
884    /// The ID used for the cursor in sequenced mode. If not provided, consumption starts from the current sequence (ephemeral).
885    pub cursor_id: Option<String>,
886}
887
888// --- MQTT Specific Configuration ---
889
890/// General MQTT connection configuration.
891#[derive(Debug, Deserialize, Serialize, Clone, Default)]
892#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
893#[serde(deny_unknown_fields)]
894pub struct MqttConfig {
895    /// MQTT broker URL (e.g., "tcp://localhost:1883"). Does not support multiple hosts.
896    pub url: String,
897    /// The MQTT topic.
898    pub topic: Option<String>,
899    /// Optional username for authentication.
900    pub username: Option<String>,
901    /// Optional password for authentication.
902    pub password: Option<String>,
903    /// TLS configuration.
904    #[serde(default)]
905    pub tls: TlsConfig,
906    /// Optional client ID. If not provided, one is generated or derived from route name.
907    pub client_id: Option<String>,
908    /// Capacity of the internal channel for incoming messages. Defaults to 100.
909    pub queue_capacity: Option<usize>,
910    /// Maximum number of inflight messages.
911    pub max_inflight: Option<u16>,
912    /// Quality of Service level (0, 1, or 2). Defaults to 1.
913    pub qos: Option<u8>,
914    /// (Consumer only) If true, start with a clean session. Defaults to false (persistent session). Setting this to true effectively enables **Subscriber mode** (ephemeral).
915    #[serde(default = "default_clean_session")]
916    pub clean_session: bool,
917    /// Keep-alive interval in seconds. Defaults to 20.
918    pub keep_alive_seconds: Option<u64>,
919    /// MQTT protocol version (V3 or V5). Defaults to V5.
920    #[serde(default)]
921    pub protocol: MqttProtocol,
922    /// Session expiry interval in seconds (MQTT v5 only).
923    pub session_expiry_interval: Option<u32>,
924    /// (Publisher only) If true, messages are acknowledged immediately upon receipt (auto-ack).
925    /// If false (default), messages are acknowledged after processing (manual-ack).
926    #[serde(default)]
927    pub delayed_ack: bool,
928}
929
930#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
931#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
932#[serde(rename_all = "lowercase")]
933pub enum MqttProtocol {
934    #[default]
935    V5,
936    V3,
937}
938
939// --- ZeroMQ Specific Configuration ---
940
941#[derive(Debug, Deserialize, Serialize, Clone, Default)]
942#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
943#[serde(deny_unknown_fields)]
944pub struct ZeroMqConfig {
945    /// The ZeroMQ URL (e.g., "tcp://127.0.0.1:5555").
946    pub url: String,
947    /// The socket type (PUSH, PULL, PUB, SUB, REQ, REP).
948    #[serde(default)]
949    pub socket_type: Option<ZeroMqSocketType>,
950    /// (Consumer only) The ZeroMQ topic (for SUB sockets).
951    pub topic: Option<String>,
952    /// If true, bind to the address. If false, connect.
953    #[serde(default)]
954    pub bind: bool,
955    /// Internal buffer size for the channel. Defaults to 128.
956    #[serde(default)]
957    pub internal_buffer_size: Option<usize>,
958}
959
960#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
961#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
962#[serde(rename_all = "lowercase")]
963pub enum ZeroMqSocketType {
964    Push,
965    Pull,
966    Pub,
967    Sub,
968    Req,
969    Rep,
970}
971
972// --- HTTP Specific Configuration ---
973
974/// General HTTP connection configuration.
975#[derive(Debug, Deserialize, Serialize, Clone, Default)]
976#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
977#[serde(deny_unknown_fields)]
978pub struct HttpConfig {
979    /// For consumers, the listen address (e.g., "0.0.0.0:8080"). For publishers, the target URL.
980    pub url: String,
981    /// TLS configuration.
982    #[serde(default)]
983    pub tls: TlsConfig,
984    /// (Consumer only) Number of worker threads to use. Defaults to 0 for unlimited.
985    pub workers: Option<usize>,
986    /// (Consumer only) Header key to extract the message ID from. Defaults to "message-id".
987    pub message_id_header: Option<String>,
988    /// (Consumer only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
989    pub request_timeout_ms: Option<u64>,
990}
991
992// --- IBM MQ Specific Configuration ---
993
994/// Connection settings for the IBM MQ Queue Manager.
995#[derive(Debug, Deserialize, Serialize, Clone, Default)]
996#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
997#[serde(deny_unknown_fields)]
998pub struct IbmMqConfig {
999    /// Required. Connection URL in `host(port)` format. Supports comma-separated list for failover (e.g., `host1(1414),host2(1414)`).
1000    pub url: String,
1001    /// Target Queue name for point-to-point messaging. Optional if `topic` is set; defaults to route name if omitted.
1002    pub queue: Option<String>,
1003    /// Target Topic string for Publish/Subscribe. If set, enables **Subscriber mode** (Consumer) or publishes to a topic (Publisher). Optional if `queue` is set.
1004    pub topic: Option<String>,
1005    /// Required. Name of the Queue Manager to connect to (e.g., `QM1`).
1006    pub queue_manager: String,
1007    /// Required. Server Connection (SVRCONN) Channel name defined on the QM.
1008    pub channel: String,
1009    /// Username for authentication. Optional; required if the channel enforces authentication.
1010    pub username: Option<String>,
1011    /// Password for authentication. Optional; required if the channel enforces authentication.
1012    pub password: Option<String>,
1013    /// TLS CipherSpec (e.g., `ANY_TLS12`). Optional; required for encrypted connections.
1014    pub cipher_spec: Option<String>,
1015    /// TLS configuration settings (e.g., keystore paths). Optional.
1016    #[serde(default)]
1017    pub tls: TlsConfig,
1018    /// Maximum message size in bytes (default: 4MB). Optional.
1019    #[serde(default = "default_max_message_size")]
1020    pub max_message_size: usize,
1021    /// (Consumer only) Polling timeout in milliseconds (default: 1000ms). Optional.
1022    #[serde(default = "default_wait_timeout_ms")]
1023    pub wait_timeout_ms: i32,
1024    /// Internal buffer size for the channel. Defaults to 100.
1025    #[serde(default)]
1026    pub internal_buffer_size: Option<usize>,
1027}
1028
1029fn default_max_message_size() -> usize {
1030    4 * 1024 * 1024 // 4MB default
1031}
1032
1033fn default_wait_timeout_ms() -> i32 {
1034    1000 // 1 second default
1035}
1036
1037// --- Switch/Router Configuration ---
1038
1039#[derive(Debug, Deserialize, Serialize, Clone)]
1040#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1041#[serde(deny_unknown_fields)]
1042pub struct SwitchConfig {
1043    /// The metadata key to inspect for routing decisions.
1044    pub metadata_key: String,
1045    /// A map of values to endpoints.
1046    pub cases: HashMap<String, Endpoint>,
1047    /// The default endpoint if no case matches.
1048    pub default: Option<Box<Endpoint>>,
1049}
1050
1051// --- Response Endpoint Configuration ---
1052#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1053#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1054#[serde(deny_unknown_fields)]
1055pub struct ResponseConfig {
1056    // This struct is a marker and currently has no fields.
1057}
1058
1059// --- Common Configuration ---
1060
1061/// TLS configuration for secure connections.
1062#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1063#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1064#[serde(deny_unknown_fields)]
1065pub struct TlsConfig {
1066    /// If true, enable TLS/SSL.
1067    pub required: bool,
1068    /// Path to the CA certificate file.
1069    pub ca_file: Option<String>,
1070    /// Path to the client certificate file (PEM).
1071    pub cert_file: Option<String>,
1072    /// Path to the client private key file (PEM).
1073    pub key_file: Option<String>,
1074    /// Password for the private key (if encrypted).
1075    pub cert_password: Option<String>,
1076    /// If true, disable server certificate verification (insecure).
1077    #[serde(default)]
1078    pub accept_invalid_certs: bool,
1079}
1080
1081impl TlsConfig {
1082    pub fn is_mtls_client_configured(&self) -> bool {
1083        self.required && self.cert_file.is_some() && self.key_file.is_some()
1084    }
1085    pub fn is_tls_server_configured(&self) -> bool {
1086        self.required && self.cert_file.is_some() && self.key_file.is_some()
1087    }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use super::*;
1093    use config::{Config as ConfigBuilder, Environment};
1094
1095    const TEST_YAML: &str = r#"
1096kafka_to_nats:
1097  concurrency: 10
1098  input:
1099    middlewares:
1100      - deduplication:
1101          sled_path: "/tmp/mq-bridge/dedup_db"
1102          ttl_seconds: 3600
1103      - metrics: {}
1104      - retry:
1105          max_attempts: 5
1106          initial_interval_ms: 200
1107      - random_panic:
1108          probability: 0.1
1109      - dlq:
1110          endpoint:
1111            nats:
1112              subject: "dlq-subject"
1113              url: "nats://localhost:4222"
1114    kafka:
1115      topic: "input-topic"
1116      url: "localhost:9092"
1117      group_id: "my-consumer-group"
1118      tls:
1119        required: true
1120        ca_file: "/path_to_ca"
1121        cert_file: "/path_to_cert"
1122        key_file: "/path_to_key"
1123        cert_password: "password"
1124        accept_invalid_certs: true
1125  output:
1126    middlewares:
1127      - metrics: {}
1128    nats:
1129      subject: "output-subject"
1130      url: "nats://localhost:4222"
1131"#;
1132
1133    fn assert_config_values(config: &Config) {
1134        assert_eq!(config.len(), 1);
1135        let route = config.get("kafka_to_nats").expect("Route should exist");
1136
1137        assert_eq!(route.options.concurrency, 10);
1138
1139        // --- Assert Input ---
1140        let input = &route.input;
1141        assert_eq!(input.middlewares.len(), 5);
1142
1143        let mut has_dedup = false;
1144        let mut has_metrics = false;
1145        let mut has_dlq = false;
1146        let mut has_retry = false;
1147        let mut has_random_panic = false;
1148        for middleware in &input.middlewares {
1149            match middleware {
1150                Middleware::Deduplication(dedup) => {
1151                    assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
1152                    assert_eq!(dedup.ttl_seconds, 3600);
1153                    has_dedup = true;
1154                }
1155                Middleware::Metrics(_) => {
1156                    has_metrics = true;
1157                }
1158                Middleware::Custom { .. } => {}
1159                Middleware::Dlq(dlq) => {
1160                    assert!(dlq.endpoint.middlewares.is_empty());
1161                    if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
1162                        assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
1163                        assert_eq!(nats_cfg.url, "nats://localhost:4222");
1164                    }
1165                    has_dlq = true;
1166                }
1167                Middleware::Retry(retry) => {
1168                    assert_eq!(retry.max_attempts, 5);
1169                    assert_eq!(retry.initial_interval_ms, 200);
1170                    has_retry = true;
1171                }
1172                Middleware::RandomPanic(rp) => {
1173                    assert!((rp.probability - 0.1).abs() < f64::EPSILON);
1174                    has_random_panic = true;
1175                }
1176                Middleware::Delay(_) => {}
1177                Middleware::WeakJoin(_) => {}
1178            }
1179        }
1180
1181        if let EndpointType::Kafka(kafka) = &input.endpoint_type {
1182            assert_eq!(kafka.topic, Some("input-topic".to_string()));
1183            assert_eq!(kafka.url, "localhost:9092");
1184            assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
1185            let tls = &kafka.tls;
1186            assert!(tls.required);
1187            assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
1188            assert!(tls.accept_invalid_certs);
1189        } else {
1190            panic!("Input endpoint should be Kafka");
1191        }
1192        assert!(has_dedup);
1193        assert!(has_metrics);
1194        assert!(has_dlq);
1195        assert!(has_retry);
1196        assert!(has_random_panic);
1197
1198        // --- Assert Output ---
1199        let output = &route.output;
1200        assert_eq!(output.middlewares.len(), 1);
1201        assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
1202
1203        if let EndpointType::Nats(nats) = &output.endpoint_type {
1204            assert_eq!(nats.subject, Some("output-subject".to_string()));
1205            assert_eq!(nats.url, "nats://localhost:4222");
1206        } else {
1207            panic!("Output endpoint should be NATS");
1208        }
1209    }
1210
1211    #[test]
1212    fn test_deserialize_from_yaml() {
1213        // We use serde_yaml directly here because the `config` crate's processing
1214        // can interfere with complex deserialization logic.
1215        let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
1216        println!("Deserialized from YAML: {:#?}", result);
1217        let config = result.expect("Failed to deserialize TEST_YAML");
1218        assert_config_values(&config);
1219    }
1220
1221    #[test]
1222    fn test_deserialize_from_env() {
1223        // Set environment variables based on README
1224        unsafe {
1225            std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
1226            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
1227            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
1228            std::env::set_var(
1229                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
1230                "my-consumer-group",
1231            );
1232            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
1233            std::env::set_var(
1234                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
1235                "/path_to_ca",
1236            );
1237            std::env::set_var(
1238                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
1239                "true",
1240            );
1241            std::env::set_var(
1242                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
1243                "output-subject",
1244            );
1245            std::env::set_var(
1246                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
1247                "nats://localhost:4222",
1248            );
1249            std::env::set_var(
1250                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
1251                "dlq-subject",
1252            );
1253            std::env::set_var(
1254                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
1255                "nats://localhost:4222",
1256            );
1257        }
1258
1259        let builder = ConfigBuilder::builder()
1260            // Enable automatic type parsing for values from environment variables.
1261            .add_source(
1262                Environment::with_prefix("MQB")
1263                    .separator("__")
1264                    .try_parsing(true),
1265            );
1266
1267        let config: Config = builder
1268            .build()
1269            .expect("Failed to build config")
1270            .try_deserialize()
1271            .expect("Failed to deserialize config");
1272
1273        // We can't test all values from env, but we can check the ones we set.
1274        assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
1275        if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
1276            assert_eq!(k.topic, Some("input-topic".to_string()));
1277            assert!(k.tls.required);
1278        } else {
1279            panic!("Expected Kafka endpoint");
1280        }
1281
1282        let input = &config.get("kafka_to_nats").unwrap().input;
1283        assert_eq!(input.middlewares.len(), 1);
1284        if let Middleware::Dlq(_) = &input.middlewares[0] {
1285            // Correctly parsed
1286        } else {
1287            panic!("Expected DLQ middleware");
1288        }
1289    }
1290}
1291
1292#[cfg(all(test, feature = "schema"))]
1293mod schema_tests {
1294    use super::*;
1295
1296    #[test]
1297    fn generate_json_schema() {
1298        let schema = schemars::schema_for!(Config);
1299        let schema_json = serde_json::to_string_pretty(&schema).unwrap();
1300
1301        let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1302        path.push("mq-bridge.schema.json");
1303        std::fs::write(path, schema_json).expect("Failed to write schema file");
1304    }
1305}