Skip to main content

mq_bridge/
models.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use serde::{
7    de::{MapAccess, Visitor},
8    Deserialize, Deserializer, Serialize,
9};
10use std::{
11    collections::HashMap,
12    sync::{atomic::AtomicUsize, Arc},
13};
14
15use crate::traits::Handler;
16use tracing::trace;
17
18/// The top-level configuration is a map of named routes.
19/// The key is the route name (e.g., "kafka_to_nats").
20///
21/// # Examples
22///
23/// Deserializing a complex configuration from YAML:
24///
25/// ```
26/// use mq_bridge::models::{Config, EndpointType, Middleware};
27///
28/// let yaml = r#"
29/// kafka_to_nats:
30///   concurrency: 10
31///   input:
32///     middlewares:
33///       - deduplication:
34///           sled_path: "/tmp/mq-bridge/dedup_db"
35///           ttl_seconds: 3600
36///       - metrics: {}
37///       - retry:
38///           max_attempts: 5
39///           initial_interval_ms: 200
40///       - random_panic:
41///           mode: nack
42///       - dlq:
43///           endpoint:
44///             nats:
45///               subject: "dlq-subject"
46///               url: "nats://localhost:4222"
47///     kafka:
48///       topic: "input-topic"
49///       url: "localhost:9092"
50///       group_id: "my-consumer-group"
51///       tls:
52///         required: true
53///         ca_file: "/path_to_ca"
54///         cert_file: "/path_to_cert"
55///         key_file: "/path_to_key"
56///         cert_password: "password"
57///         accept_invalid_certs: true
58///   output:
59///     middlewares:
60///       - metrics: {}
61///       - dlq:
62///           endpoint:
63///             file:
64///               path: "error.out"
65///     nats:
66///       subject: "output-subject"
67///       url: "nats://localhost:4222"
68/// "#;
69///
70/// let config: Config = serde_yaml_ng::from_str(yaml).unwrap();
71/// let route = config.get("kafka_to_nats").unwrap();
72///
73/// assert_eq!(route.options.concurrency, 10);
74/// // Check input middleware
75/// assert!(route.input.middlewares.iter().any(|m| matches!(m, Middleware::Deduplication(_))));
76/// // Check output endpoint
77/// assert!(matches!(route.output.endpoint_type, EndpointType::Nats(_)));
78/// ```
79pub type Config = HashMap<String, Route>;
80
81/// A configuration map for named publishers (endpoints).
82/// The key is the publisher name.
83pub type PublisherConfig = HashMap<String, Endpoint>;
84
85/// Defines a single message processing route from an input to an output.
86#[derive(Debug, Deserialize, Serialize, Clone)]
87#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
88#[serde(deny_unknown_fields)]
89pub struct Route {
90    /// The input/source endpoint for the route.
91    pub input: Endpoint,
92    /// The output/sink endpoint for the route.
93    #[serde(default = "default_output_endpoint")]
94    pub output: Endpoint,
95    /// (Optional) Fine-tuning options for the route's execution.
96    #[serde(flatten, default)]
97    pub options: RouteOptions,
98}
99
100impl Default for Route {
101    fn default() -> Self {
102        Self {
103            input: Endpoint::null(),
104            output: Endpoint::null(),
105            options: RouteOptions::default(),
106        }
107    }
108}
109
110/// Fine-tuning options for a route's execution.
111///
112/// These options control concurrency, batching, and commit behavior for message processing.
113///
114/// # Examples
115///
116/// ```
117/// use mq_bridge::models::RouteOptions;
118///
119/// let options = RouteOptions {
120///     description: "My Route".to_string(),
121///     concurrency: 10,
122///     batch_size: 5,
123///     commit_concurrency_limit: 1024,
124/// };
125/// ```
126#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
127#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
128#[serde(deny_unknown_fields)]
129pub struct RouteOptions {
130    /// A human-readable description of the route's purpose. Defaults to an empty string.
131    #[serde(default, skip_serializing_if = "String::is_empty")]
132    pub description: String,
133    /// (Optional) Number of concurrent processing tasks for this route. Defaults to 1.
134    #[serde(default = "default_concurrency")]
135    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
136    pub concurrency: usize,
137    /// (Optional) Number of messages to process in a single batch. Defaults to 1.
138    #[serde(default = "default_batch_size")]
139    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
140    pub batch_size: usize,
141    /// (Optional) The maximum number of in-flight commit requests queued for ordered sequencing.
142    /// Lower values apply backpressure earlier; higher values allow larger commit backlogs.
143    /// Defaults to 4096.
144    #[serde(default = "default_commit_concurrency_limit")]
145    pub commit_concurrency_limit: usize,
146}
147
148impl Default for RouteOptions {
149    fn default() -> Self {
150        Self {
151            description: String::new(),
152            concurrency: default_concurrency(),
153            batch_size: default_batch_size(),
154            commit_concurrency_limit: default_commit_concurrency_limit(),
155        }
156    }
157}
158
159pub(crate) fn default_concurrency() -> usize {
160    1
161}
162
163pub(crate) fn default_batch_size() -> usize {
164    1
165}
166
167pub(crate) fn default_commit_concurrency_limit() -> usize {
168    4096
169}
170
171fn default_output_endpoint() -> Endpoint {
172    Endpoint::new(EndpointType::Null)
173}
174
175fn default_retry_attempts() -> usize {
176    3
177}
178fn default_initial_interval_ms() -> u64 {
179    100
180}
181fn default_max_interval_ms() -> u64 {
182    5000
183}
184fn default_multiplier() -> f64 {
185    2.0
186}
187fn default_clean_session() -> bool {
188    false
189}
190
191fn is_known_endpoint_name(name: &str) -> bool {
192    matches!(
193        name,
194        "aws"
195            | "kafka"
196            | "nats"
197            | "file"
198            | "static"
199            | "memory"
200            | "sled"
201            | "amqp"
202            | "mongodb"
203            | "mqtt"
204            | "http"
205            | "ibmmq"
206            | "zeromq"
207            | "grpc"
208            | "fanout"
209            | "ref"
210            | "switch"
211            | "response"
212            | "sqlx"
213    )
214}
215
216/// Represents a connection point for messages, which can be a source (input) or a sink (output).
217#[derive(Serialize, Clone, Default)]
218#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
219#[serde(deny_unknown_fields)]
220pub struct Endpoint {
221    /// (Optional) A list of middlewares to apply to the endpoint.
222    #[serde(default)]
223    pub middlewares: Vec<Middleware>,
224
225    /// The specific endpoint implementation, determined by the configuration key (e.g., "kafka", "nats").
226    #[serde(flatten)]
227    pub endpoint_type: EndpointType,
228
229    #[serde(skip_serializing)]
230    #[cfg_attr(feature = "schema", schemars(skip))]
231    /// Internal handler for processing messages (not serialized).
232    pub handler: Option<Arc<dyn Handler>>,
233}
234
235impl std::fmt::Debug for Endpoint {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        f.debug_struct("Endpoint")
238            .field("middlewares", &self.middlewares)
239            .field("endpoint_type", &self.endpoint_type)
240            .field(
241                "handler",
242                &if self.handler.is_some() {
243                    "Some(<Handler>)"
244                } else {
245                    "None"
246                },
247            )
248            .finish()
249    }
250}
251
252impl<'de> Deserialize<'de> for Endpoint {
253    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
254    where
255        D: Deserializer<'de>,
256    {
257        struct EndpointVisitor;
258
259        impl<'de> Visitor<'de> for EndpointVisitor {
260            type Value = Endpoint;
261
262            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
263                formatter.write_str("a map representing an endpoint or null")
264            }
265
266            fn visit_unit<E>(self) -> Result<Self::Value, E>
267            where
268                E: serde::de::Error,
269            {
270                Ok(Endpoint::new(EndpointType::Null))
271            }
272
273            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
274            where
275                A: MapAccess<'de>,
276            {
277                // Buffer the map into a temporary serde_json::Map.
278                // This allows us to separate the `middlewares` field from the rest.
279                let mut temp_map = serde_json::Map::new();
280                let mut middlewares_val = None;
281
282                while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
283                    if key == "middlewares" {
284                        middlewares_val = Some(value);
285                    } else {
286                        temp_map.insert(key, value);
287                    }
288                }
289
290                // Deserialize the rest of the map into the flattened EndpointType.
291                let temp_val = serde_json::Value::Object(temp_map);
292                let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
293                    Ok(et) => et,
294                    Err(original_err) => {
295                        if let serde_json::Value::Object(map) = &temp_val {
296                            if map.len() == 1 {
297                                let (name, config) = map.iter().next().unwrap();
298                                if is_known_endpoint_name(name) {
299                                    return Err(serde::de::Error::custom(original_err));
300                                }
301                                trace!("Falling back to Custom endpoint for key: {}", name);
302                                EndpointType::Custom {
303                                    name: name.clone(),
304                                    config: config.clone(),
305                                }
306                            } else if map.is_empty() {
307                                EndpointType::Null
308                            } else {
309                                return Err(serde::de::Error::custom(
310                                    "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
311                                ));
312                            }
313                        } else {
314                            return Err(serde::de::Error::custom("Invalid endpoint configuration"));
315                        }
316                    }
317                };
318
319                // Deserialize the extracted middlewares value using the existing helper logic.
320                let middlewares = match middlewares_val {
321                    Some(val) => {
322                        deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
323                    }
324                    None => Vec::new(),
325                };
326
327                Ok(Endpoint {
328                    middlewares,
329                    endpoint_type,
330                    handler: None,
331                })
332            }
333        }
334
335        deserializer.deserialize_any(EndpointVisitor)
336    }
337}
338
339fn is_known_middleware_name(name: &str) -> bool {
340    matches!(
341        name,
342        "deduplication"
343            | "metrics"
344            | "dlq"
345            | "retry"
346            | "random_panic"
347            | "delay"
348            | "weak_join"
349            | "custom"
350    )
351}
352
353/// Deserialize middlewares from a generic serde_json::Value.
354///
355/// This logic was extracted from `deserialize_middlewares_from_map_or_seq` to be reused by the custom `Endpoint` deserializer.
356fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
357    let arr = match value {
358        serde_json::Value::Array(arr) => arr,
359        serde_json::Value::Object(map) => {
360            let mut middlewares: Vec<_> = map
361                .into_iter()
362                // The config crate can produce maps with numeric string keys ("0", "1", ...)
363                // from environment variables. We need to sort by these keys to maintain order.
364                .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
365                .collect();
366            middlewares.sort_by_key(|(index, _)| *index);
367
368            middlewares.into_iter().map(|(_, value)| value).collect()
369        }
370        _ => return Err(anyhow::anyhow!("Expected an array or object")),
371    };
372
373    let mut middlewares = Vec::new();
374    for item in arr {
375        // Check if it is a map with a single key that matches a known middleware
376        let known_name = if let serde_json::Value::Object(map) = &item {
377            if map.len() == 1 {
378                let (name, _) = map.iter().next().unwrap();
379                if is_known_middleware_name(name) {
380                    Some(name.clone())
381                } else {
382                    None
383                }
384            } else {
385                None
386            }
387        } else {
388            None
389        };
390
391        if let Some(name) = known_name {
392            match serde_json::from_value::<Middleware>(item.clone()) {
393                Ok(m) => middlewares.push(m),
394                Err(e) => {
395                    return Err(anyhow::anyhow!(
396                        "Failed to deserialize known middleware '{}': {}",
397                        name,
398                        e
399                    ))
400                }
401            }
402        } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
403            middlewares.push(m);
404        } else if let serde_json::Value::Object(map) = &item {
405            if map.len() == 1 {
406                let (name, config) = map.iter().next().unwrap();
407                middlewares.push(Middleware::Custom {
408                    name: name.clone(),
409                    config: config.clone(),
410                });
411            } else {
412                return Err(anyhow::anyhow!(
413                    "Invalid middleware configuration: {:?}",
414                    item
415                ));
416            }
417        } else {
418            return Err(anyhow::anyhow!(
419                "Invalid middleware configuration: {:?}",
420                item
421            ));
422        }
423    }
424    Ok(middlewares)
425}
426
427/// An enumeration of all supported endpoint types.
428/// `#[serde(rename_all = "lowercase")]` ensures that the keys in the config (e.g., "kafka")
429/// match the enum variants.
430///
431/// # Examples
432///
433/// Configuring a Fanout endpoint in YAML:
434/// ```
435/// use mq_bridge::models::{Endpoint, EndpointType};
436///
437/// let yaml = r#"
438/// fanout:
439///   - memory: { topic: "out1" }
440///   - memory: { topic: "out2" }
441/// "#;
442///
443/// let endpoint: Endpoint = serde_yaml_ng::from_str(yaml).unwrap();
444/// if let EndpointType::Fanout(targets) = endpoint.endpoint_type {
445///     assert_eq!(targets.len(), 2);
446/// }
447/// ```
448#[derive(Debug, Deserialize, Serialize, Clone, Default)]
449#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
450#[serde(rename_all = "lowercase")]
451pub enum EndpointType {
452    Aws(AwsConfig),
453    Kafka(KafkaConfig),
454    Nats(NatsConfig),
455    File(FileConfig),
456    Static(String),
457    Ref(String),
458    Memory(MemoryConfig),
459    Sled(SledConfig),
460    Amqp(AmqpConfig),
461    MongoDb(MongoDbConfig),
462    Mqtt(MqttConfig),
463    Http(HttpConfig),
464    IbmMq(IbmMqConfig),
465    ZeroMq(ZeroMqConfig),
466    Grpc(GrpcConfig),
467    Sqlx(SqlxConfig),
468    Fanout(Vec<Endpoint>),
469    Switch(SwitchConfig),
470    Response(ResponseConfig),
471    Reader(Box<Endpoint>),
472    Custom {
473        name: String,
474        config: serde_json::Value,
475    },
476    #[default]
477    Null,
478}
479
480impl EndpointType {
481    pub fn name(&self) -> &'static str {
482        match self {
483            EndpointType::Aws(_) => "aws",
484            EndpointType::Kafka(_) => "kafka",
485            EndpointType::Nats(_) => "nats",
486            EndpointType::File(_) => "file",
487            EndpointType::Static(_) => "static",
488            EndpointType::Ref(_) => "ref",
489            EndpointType::Memory(_) => "memory",
490            EndpointType::Sled(_) => "sled",
491            EndpointType::Amqp(_) => "amqp",
492            EndpointType::MongoDb(_) => "mongodb",
493            EndpointType::Mqtt(_) => "mqtt",
494            EndpointType::Http(_) => "http",
495            EndpointType::IbmMq(_) => "ibmmq",
496            EndpointType::ZeroMq(_) => "zeromq",
497            EndpointType::Grpc(_) => "grpc",
498            EndpointType::Sqlx(_) => "sqlx",
499            EndpointType::Fanout(_) => "fanout",
500            EndpointType::Switch(_) => "switch",
501            EndpointType::Response(_) => "response",
502            EndpointType::Reader(_) => "reader",
503            EndpointType::Custom { .. } => "custom",
504            EndpointType::Null => "null",
505        }
506    }
507
508    pub fn is_core(&self) -> bool {
509        matches!(
510            self,
511            EndpointType::File(_)
512                | EndpointType::Static(_)
513                | EndpointType::Ref(_)
514                | EndpointType::Memory(_)
515                | EndpointType::Fanout(_)
516                | EndpointType::Switch(_)
517                | EndpointType::Response(_)
518                | EndpointType::Reader(_)
519                | EndpointType::Custom { .. }
520                | EndpointType::Null
521        )
522    }
523}
524
525/// An enumeration of all supported middleware types.
526#[derive(Debug, Deserialize, Serialize, Clone)]
527#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
528#[serde(rename_all = "snake_case")]
529pub enum Middleware {
530    Deduplication(DeduplicationMiddleware),
531    Metrics(MetricsMiddleware),
532    Dlq(Box<DeadLetterQueueMiddleware>),
533    Retry(RetryMiddleware),
534    RandomPanic(RandomPanicMiddleware),
535    Delay(DelayMiddleware),
536    WeakJoin(WeakJoinMiddleware),
537    Custom {
538        name: String,
539        config: serde_json::Value,
540    },
541}
542
543/// Deduplication middleware configuration.
544///
545/// Prevents duplicate messages from being processed using a Sled-backed database.
546/// Messages are identified by their deduplication key and removed after the TTL expires.
547#[derive(Debug, Deserialize, Serialize, Clone)]
548#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
549#[serde(deny_unknown_fields)]
550pub struct DeduplicationMiddleware {
551    /// Path to the Sled database directory.
552    pub sled_path: String,
553    /// Time-to-live for deduplication entries in seconds.
554    pub ttl_seconds: u64,
555}
556
557/// Metrics middleware configuration.
558///
559/// Enables collection and reporting of message processing metrics such as throughput,
560/// latency, and error rates. The presence of this middleware in the configuration
561/// enables metrics collection for the endpoint.
562///
563/// Metrics are typically exported via Prometheus or similar monitoring systems.
564#[derive(Debug, Deserialize, Serialize, Clone)]
565#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
566#[serde(deny_unknown_fields)]
567pub struct MetricsMiddleware {}
568
569/// Dead-Letter Queue (DLQ) middleware configuration.
570///
571/// Routes failed messages to a designated endpoint for later analysis and recovery.
572/// It is recommended to pair this with the Retry middleware to avoid message loss.
573///
574/// Failed messages are sent to the configured endpoint when they are exhausted after retry attempts.
575#[derive(Debug, Deserialize, Serialize, Clone, Default)]
576#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
577#[serde(deny_unknown_fields)]
578pub struct DeadLetterQueueMiddleware {
579    /// The endpoint to send failed messages to.
580    pub endpoint: Endpoint,
581}
582
583/// Retry middleware configuration.
584///
585/// Implements exponential backoff retry logic for failed message processing.
586/// Failed messages are automatically retried with increasing delays between attempts.
587#[derive(Debug, Deserialize, Serialize, Clone, Default)]
588#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
589#[serde(deny_unknown_fields)]
590pub struct RetryMiddleware {
591    /// Maximum number of retry attempts. Defaults to 3.
592    #[serde(default = "default_retry_attempts")]
593    pub max_attempts: usize,
594    /// Initial retry interval in milliseconds. Defaults to 100ms.
595    #[serde(default = "default_initial_interval_ms")]
596    pub initial_interval_ms: u64,
597    /// Maximum retry interval in milliseconds. Defaults to 5000ms.
598    #[serde(default = "default_max_interval_ms")]
599    pub max_interval_ms: u64,
600    /// Multiplier for exponential backoff. Defaults to 2.0.
601    #[serde(default = "default_multiplier")]
602    pub multiplier: f64,
603}
604
605/// Delay middleware configuration.
606///
607/// Introduces a fixed delay before processing each message.
608/// Useful for rate limiting, testing, or allowing time for dependent systems to become ready.
609#[derive(Debug, Deserialize, Serialize, Clone)]
610#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
611#[serde(deny_unknown_fields)]
612pub struct DelayMiddleware {
613    /// Delay duration in milliseconds.
614    pub delay_ms: u64,
615}
616
617/// Weak Join middleware configuration.
618///
619/// Groups and correlates messages based on a metadata key, waiting for a specified number
620/// of messages within a timeout window before processing them as a batch.
621/// Messages that exceed the timeout are processed individually.
622#[derive(Debug, Deserialize, Serialize, Clone)]
623#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
624#[serde(deny_unknown_fields)]
625pub struct WeakJoinMiddleware {
626    /// The metadata key to group messages by (e.g., "correlation_id").
627    pub group_by: String,
628    /// The number of messages to wait for.
629    pub expected_count: usize,
630    /// Timeout in milliseconds.
631    pub timeout_ms: u64,
632}
633
634/// Fault injection modes for testing error handling and recovery mechanisms.
635#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
636#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
637#[serde(rename_all = "snake_case")]
638pub enum FaultMode {
639    /// Trigger a thread panic.
640    #[default]
641    Panic,
642    /// Simulate a connection/network error (retryable).
643    Disconnect,
644    /// Simulate a timeout error (retryable).
645    Timeout,
646    /// Simulate a JSON format error (non-retryable).
647    JsonFormatError,
648    /// Return a negative acknowledgement (for handlers).
649    Nack,
650}
651
652impl std::fmt::Display for FaultMode {
653    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654        match self {
655            FaultMode::Panic => write!(f, "panic"),
656            FaultMode::Disconnect => write!(f, "disconnect"),
657            FaultMode::Timeout => write!(f, "timeout"),
658            FaultMode::JsonFormatError => write!(f, "json_format_error"),
659            FaultMode::Nack => write!(f, "nack"),
660        }
661    }
662}
663
664/// Middleware for fault injection testing.
665///
666/// Allows testing error handling and recovery mechanisms by injecting faults
667/// at specific points in the message processing pipeline.
668///
669/// # Examples
670///
671/// ```yaml
672/// random_panic:
673///   mode: panic
674///   trigger_on_message: 3  # Trigger on the 3rd message
675/// ```
676#[derive(Debug, Clone, Serialize, Deserialize, Default)]
677#[serde(deny_unknown_fields)]
678#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
679pub struct RandomPanicMiddleware {
680    /// The type of fault to inject.
681    #[serde(default)]
682    pub mode: FaultMode,
683    /// Trigger the fault on the Nth message (1-indexed). None = trigger on every message.
684    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
685    #[serde(default)]
686    pub trigger_on_message: Option<usize>,
687    /// Enable/disable the fault injection without removing the configuration.
688    #[serde(default = "default_true")]
689    pub enabled: bool,
690    #[serde(skip, default = "default_atomic_usize_arc")]
691    #[cfg_attr(feature = "schema", schemars(skip))]
692    pub message_count: Arc<AtomicUsize>,
693}
694
695fn default_true() -> bool {
696    true
697}
698
699fn default_atomic_usize_arc() -> Arc<AtomicUsize> {
700    Arc::new(AtomicUsize::new(0))
701}
702
703fn deserialize_null_as_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
704where
705    D: Deserializer<'de>,
706{
707    let opt = Option::<bool>::deserialize(deserializer)?;
708    Ok(opt.unwrap_or(false))
709}
710
711// --- AWS Specific Configuration ---
712#[derive(Debug, Deserialize, Serialize, Clone, Default)]
713#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
714#[serde(deny_unknown_fields)]
715pub struct AwsConfig {
716    /// The SQS queue URL. Required for Consumer. Optional for Publisher if `topic_arn` is set.
717    pub queue_url: Option<String>,
718    /// (Publisher only) The SNS topic ARN.
719    pub topic_arn: Option<String>,
720    /// AWS Region (e.g., "us-east-1").
721    pub region: Option<String>,
722    /// Custom endpoint URL (e.g., for LocalStack).
723    pub endpoint_url: Option<String>,
724    /// AWS Access Key ID.
725    pub access_key: Option<String>,
726    /// AWS Secret Access Key.
727    pub secret_key: Option<String>,
728    /// AWS Session Token.
729    pub session_token: Option<String>,
730    /// (Consumer only) Maximum number of messages to receive in a batch (1-10).
731    #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
732    pub max_messages: Option<i32>,
733    /// (Consumer only) Wait time for long polling in seconds (0-20).
734    #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
735    pub wait_time_seconds: Option<i32>,
736}
737
738impl AwsConfig {
739    /// Creates a new AWS configuration with default settings.
740    pub fn new() -> Self {
741        Self::default()
742    }
743
744    pub fn with_queue_url(mut self, queue_url: impl Into<String>) -> Self {
745        self.queue_url = Some(queue_url.into());
746        self
747    }
748
749    pub fn with_topic_arn(mut self, topic_arn: impl Into<String>) -> Self {
750        self.topic_arn = Some(topic_arn.into());
751        self
752    }
753
754    pub fn with_region(mut self, region: impl Into<String>) -> Self {
755        self.region = Some(region.into());
756        self
757    }
758
759    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
760        self.endpoint_url = Some(endpoint_url.into());
761        self
762    }
763
764    pub fn with_credentials(
765        mut self,
766        access_key: impl Into<String>,
767        secret_key: impl Into<String>,
768    ) -> Self {
769        self.access_key = Some(access_key.into());
770        self.secret_key = Some(secret_key.into());
771        self
772    }
773}
774
775// --- Kafka Specific Configuration ---
776
777/// General Kafka connection configuration.
778#[derive(Debug, Deserialize, Serialize, Clone, Default)]
779#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
780#[serde(deny_unknown_fields)]
781pub struct KafkaConfig {
782    /// Comma-separated list of Kafka broker URLs.
783    #[serde(alias = "brokers")]
784    pub url: String,
785    /// The Kafka topic to produce to or consume from.
786    pub topic: Option<String>,
787    /// Optional username for SASL authentication.
788    pub username: Option<String>,
789    /// Optional password for SASL authentication.
790    pub password: Option<String>,
791    /// TLS configuration.
792    #[serde(default)]
793    pub tls: TlsConfig,
794    /// (Consumer only) Consumer group ID.
795    /// If not provided, the consumer acts in **Subscriber mode**: it generates a unique, ephemeral group ID and starts consuming from the latest offset.
796    pub group_id: Option<String>,
797    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
798    #[serde(default)]
799    pub delayed_ack: bool,
800    /// (Publisher only) Additional librdkafka producer configuration options (key-value pairs).
801    #[serde(default)]
802    pub producer_options: Option<Vec<(String, String)>>,
803    /// (Consumer only) Additional librdkafka consumer configuration options (key-value pairs).
804    #[serde(default)]
805    pub consumer_options: Option<Vec<(String, String)>>,
806}
807
808impl KafkaConfig {
809    /// Creates a new Kafka configuration with the specified broker URL.
810    pub fn new(url: impl Into<String>) -> Self {
811        Self {
812            url: url.into(),
813            ..Default::default()
814        }
815    }
816
817    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
818        self.topic = Some(topic.into());
819        self
820    }
821
822    pub fn with_group_id(mut self, group_id: impl Into<String>) -> Self {
823        self.group_id = Some(group_id.into());
824        self
825    }
826
827    pub fn with_credentials(
828        mut self,
829        username: impl Into<String>,
830        password: impl Into<String>,
831    ) -> Self {
832        self.username = Some(username.into());
833        self.password = Some(password.into());
834        self
835    }
836
837    pub fn with_producer_option(
838        mut self,
839        key: impl Into<String>,
840        value: impl Into<String>,
841    ) -> Self {
842        let options = self.producer_options.get_or_insert_with(Vec::new);
843        options.push((key.into(), value.into()));
844        self
845    }
846
847    pub fn with_consumer_option(
848        mut self,
849        key: impl Into<String>,
850        value: impl Into<String>,
851    ) -> Self {
852        let options = self.consumer_options.get_or_insert_with(Vec::new);
853        options.push((key.into(), value.into()));
854        self
855    }
856}
857
858// --- Sled Specific Configuration ---
859
860/// General Sled database configuration
861#[derive(Debug, Deserialize, Serialize, Clone, Default)]
862#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
863#[serde(deny_unknown_fields)]
864pub struct SledConfig {
865    /// Path to the Sled database directory.
866    pub path: String,
867    /// The tree name to use as a queue. Defaults to "default".
868    pub tree: Option<String>,
869    /// (Consumer only) If true, start reading from the beginning of the tree.
870    #[serde(default)]
871    pub read_from_start: bool,
872    /// (Consumer only) If true, delete messages after processing (Queue mode).
873    #[serde(default)]
874    pub delete_after_read: bool,
875}
876
877impl SledConfig {
878    /// Creates a new Sled configuration with the specified database path.
879    pub fn new(path: impl Into<String>) -> Self {
880        Self {
881            path: path.into(),
882            ..Default::default()
883        }
884    }
885
886    pub fn with_tree(mut self, tree: impl Into<String>) -> Self {
887        self.tree = Some(tree.into());
888        self
889    }
890
891    pub fn with_read_from_start(mut self, read_from_start: bool) -> Self {
892        self.read_from_start = read_from_start;
893        self
894    }
895}
896
897/// Format for messages written to or read from a file.
898#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
899#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
900#[serde(rename_all = "snake_case")]
901pub enum FileFormat {
902    /// The full `CanonicalMessage` is serialized to JSON. Payload is a byte array.
903    #[default]
904    Normal,
905    /// The full `CanonicalMessage` is serialized to JSON. Payload is rendered as a JSON value if possible.
906    Json,
907    /// The full `CanonicalMessage` is serialized to JSON. Payload is rendered as a string if possible.
908    Text,
909    /// The raw payload of the message is written. For consumers, the line is read as raw bytes.
910    Raw,
911}
912
913// --- File Specific Configuration ---
914
915#[derive(Debug, Clone, Serialize, Deserialize, Default)]
916#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
917pub struct FileConfig {
918    /// Path to the file.
919    pub path: String,
920    /// Optional delimiter for messages. Defaults to newline ("\n").
921    /// Can be a string or a hex sequence (e.g. "0x00").
922    /// Currently only single-byte delimiters are supported.
923    pub delimiter: Option<String>,
924    /// The consumption mode. If not specified, defaults to `consume`.
925    /// For publishers, this setting is ignored.
926    #[serde(flatten, default)]
927    pub mode: Option<FileConsumerMode>,
928    /// The format for writing messages to the file (Publisher) or interpreting them (Consumer). Defaults to `normal`.
929    #[serde(default)]
930    pub format: FileFormat,
931}
932
933#[derive(Debug, Clone, Deserialize, Serialize)]
934#[serde(tag = "mode", rename_all = "snake_case")]
935#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
936pub enum FileConsumerMode {
937    /// **Queue Mode**: Standard point-to-point consumption. Reads from the start
938    /// of the file. If `delete` is true, processed lines are physically removed
939    /// from the file once they are successfully acknowledged.
940    Consume {
941        #[serde(default)]
942        delete: bool,
943    },
944    /// **Broadcast Mode**: Pub-sub style consumption. Tails the file by starting
945    /// at the current end. If `delete` is true, lines are removed only after
946    /// all local application subscribers for this specific file have acknowledged them.
947    Subscribe {
948        #[serde(default)]
949        delete: bool,
950    },
951    /// **Persistent Mode**: Consumption with external offset tracking.
952    /// Saves the last read byte position to a `.offset` file identified by the `group_id`.
953    /// This allows the consumer to resume exactly where it left off after a restart
954    /// without deleting data or requiring the bridge to stay running.
955    GroupSubscribe {
956        /// The consumer group ID that is used for offset tracking. Should be unique.
957        group_id: String,
958        /// If true, starts reading from the end of the file if no offset is stored.
959        /// If false, starts reading from the beginning.
960        #[serde(default)]
961        read_from_tail: bool,
962    },
963}
964
965impl Default for FileConsumerMode {
966    fn default() -> Self {
967        Self::Consume { delete: false }
968    }
969}
970
971impl FileConfig {
972    /// Creates a new File configuration with the specified path.
973    pub fn new(path: impl Into<String>) -> Self {
974        Self {
975            path: path.into(),
976            mode: Some(FileConsumerMode::default()),
977            delimiter: None,
978            format: FileFormat::default(),
979        }
980    }
981
982    pub fn with_mode(mut self, mode: FileConsumerMode) -> Self {
983        self.mode = Some(mode);
984        self
985    }
986
987    /// Returns the effective consumer mode, defaulting to `Consume` if not set.
988    pub fn effective_mode(&self) -> FileConsumerMode {
989        self.mode.clone().unwrap_or_default()
990    }
991}
992
993// --- NATS Specific Configuration ---
994
995/// General NATS connection configuration.
996#[derive(Debug, Deserialize, Serialize, Clone, Default)]
997#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
998#[serde(deny_unknown_fields)]
999pub struct NatsConfig {
1000    /// Comma-separated list of NATS server URLs (e.g., "nats://localhost:4222,nats://localhost:4223").
1001    pub url: String,
1002    /// The NATS subject to publish to or subscribe to.
1003    pub subject: Option<String>,
1004    /// (Consumer only). The JetStream stream name. Required for Consumers.
1005    pub stream: Option<String>,
1006    /// Optional username for authentication.
1007    pub username: Option<String>,
1008    /// Optional password for authentication.
1009    pub password: Option<String>,
1010    /// TLS configuration.
1011    #[serde(default)]
1012    pub tls: TlsConfig,
1013    /// Optional token for authentication.
1014    pub token: Option<String>,
1015    /// (Publisher only) If true, the publisher uses the request-reply pattern.
1016    /// It sends a request and waits for a response (using `core_client.request_with_headers()`).
1017    /// Defaults to false.
1018    #[serde(default)]
1019    pub request_reply: bool,
1020    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
1021    pub request_timeout_ms: Option<u64>,
1022    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
1023    #[serde(default)]
1024    pub delayed_ack: bool,
1025    /// If no_jetstream: true, use Core NATS (fire-and-forget) instead of JetStream. Defaults to false.
1026    #[serde(default)]
1027    pub no_jetstream: bool,
1028    /// (Consumer only) If true, use ephemeral **Subscriber mode**. Defaults to false (durable consumer).
1029    #[serde(default)]
1030    pub subscriber_mode: bool,
1031    /// (Publisher only) Maximum number of messages in the stream (if created by the bridge). Defaults to 1,000,000.
1032    pub stream_max_messages: Option<i64>,
1033    /// (Consumer only) The delivery policy for the consumer. Defaults to "all".
1034    pub deliver_policy: Option<NatsDeliverPolicy>,
1035    /// (Publisher only) Maximum total bytes in the stream (if created by the bridge). Defaults to 1GB.
1036    pub stream_max_bytes: Option<i64>,
1037    /// (Consumer only) Number of messages to prefetch from the consumer. Defaults to 10000.
1038    pub prefetch_count: Option<usize>,
1039}
1040
1041#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1042#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1043#[serde(rename_all = "snake_case")]
1044pub enum NatsDeliverPolicy {
1045    #[default]
1046    All,
1047    Last,
1048    New,
1049    LastPerSubject,
1050}
1051
1052impl NatsConfig {
1053    /// Creates a new NATS configuration with the specified server URL.
1054    pub fn new(url: impl Into<String>) -> Self {
1055        Self {
1056            url: url.into(),
1057            ..Default::default()
1058        }
1059    }
1060
1061    pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
1062        self.subject = Some(subject.into());
1063        self
1064    }
1065
1066    pub fn with_stream(mut self, stream: impl Into<String>) -> Self {
1067        self.stream = Some(stream.into());
1068        self
1069    }
1070
1071    pub fn with_deliver_policy(mut self, policy: NatsDeliverPolicy) -> Self {
1072        self.deliver_policy = Some(policy);
1073        self
1074    }
1075
1076    pub fn with_credentials(
1077        mut self,
1078        username: impl Into<String>,
1079        password: impl Into<String>,
1080    ) -> Self {
1081        self.username = Some(username.into());
1082        self.password = Some(password.into());
1083        self
1084    }
1085}
1086
1087#[derive(Debug, Serialize, Deserialize, Clone, Default)]
1088#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1089#[serde(deny_unknown_fields)]
1090pub struct MemoryConfig {
1091    /// The topic name for the in-memory channel.
1092    pub topic: String,
1093    /// The capacity of the channel. Defaults to 100.
1094    pub capacity: Option<usize>,
1095    /// (Publisher only) If true, send() waits for a response.
1096    #[serde(default)]
1097    pub request_reply: bool,
1098    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
1099    pub request_timeout_ms: Option<u64>,
1100    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false (queue).
1101    #[serde(default)]
1102    pub subscribe_mode: bool,
1103    /// (Consumer only) If true, enables NACK support (re-queuing), which requires cloning messages. Defaults to false.
1104    #[serde(default)]
1105    pub enable_nack: bool,
1106}
1107
1108impl MemoryConfig {
1109    pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
1110        Self {
1111            topic: topic.into(),
1112            capacity,
1113            ..Default::default()
1114        }
1115    }
1116    pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
1117        Self {
1118            subscribe_mode,
1119            ..self
1120        }
1121    }
1122
1123    pub fn with_request_reply(mut self, request_reply: bool) -> Self {
1124        self.request_reply = request_reply;
1125        self
1126    }
1127}
1128
1129// --- AMQP Specific Configuration ---
1130
1131/// General AMQP connection configuration.
1132#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1133#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1134#[serde(deny_unknown_fields)]
1135pub struct AmqpConfig {
1136    /// AMQP connection URI. The `lapin` client connects to a single host specified in the URI.
1137    /// For high availability, provide the address of a load balancer or use DNS resolution
1138    /// that points to multiple brokers. Example: "amqp://localhost:5672/vhost".
1139    pub url: String,
1140    /// The AMQP queue name.
1141    pub queue: Option<String>,
1142    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false.
1143    #[serde(default)]
1144    pub subscribe_mode: bool,
1145    /// Optional username for authentication.
1146    pub username: Option<String>,
1147    /// Optional password for authentication.
1148    pub password: Option<String>,
1149    /// TLS configuration.
1150    #[serde(default)]
1151    pub tls: TlsConfig,
1152    /// The exchange to publish to or bind the queue to.
1153    pub exchange: Option<String>,
1154    /// (Consumer only) Number of messages to prefetch. Defaults to 100.
1155    pub prefetch_count: Option<u16>,
1156    /// If true, declare queues as non-durable (transient). Defaults to false. Affects both Consumer (queue durability) and Publisher (message persistence).
1157    #[serde(default)]
1158    pub no_persistence: bool,
1159    /// (Publisher only) If true, do not attempt to declare the queue. Assumes the queue already exists. Defaults to false.
1160    #[serde(default)]
1161    pub no_declare_queue: bool,
1162    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
1163    #[serde(default)]
1164    pub delayed_ack: bool,
1165}
1166
1167impl AmqpConfig {
1168    /// Creates a new AMQP configuration with the specified connection URL.
1169    pub fn new(url: impl Into<String>) -> Self {
1170        Self {
1171            url: url.into(),
1172            ..Default::default()
1173        }
1174    }
1175
1176    pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1177        self.queue = Some(queue.into());
1178        self
1179    }
1180
1181    pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
1182        self.exchange = Some(exchange.into());
1183        self
1184    }
1185
1186    pub fn with_credentials(
1187        mut self,
1188        username: impl Into<String>,
1189        password: impl Into<String>,
1190    ) -> Self {
1191        self.username = Some(username.into());
1192        self.password = Some(password.into());
1193        self
1194    }
1195}
1196
1197/// MongoDB message storage format.
1198///
1199/// Determines how messages are stored and retrieved from MongoDB collections.
1200#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1201#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1202#[serde(rename_all = "lowercase")]
1203pub enum MongoDbFormat {
1204    #[default]
1205    Normal,
1206    Json,
1207    Text,
1208    Raw,
1209}
1210
1211// --- MongoDB Specific Configuration ---
1212
1213/// General MongoDB connection configuration.
1214#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1215#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1216#[serde(deny_unknown_fields)]
1217pub struct MongoDbConfig {
1218    /// MongoDB connection string URI. Can contain a comma-separated list of hosts for a replica set.
1219    /// Credentials provided via the separate `username` and `password` fields take precedence over any credentials embedded in the URL.
1220    pub url: String,
1221    /// The MongoDB collection name.
1222    pub collection: Option<String>,
1223    /// Optional username. Takes precedence over any credentials embedded in the `url`.
1224    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
1225    pub username: Option<String>,
1226    /// Optional password. Takes precedence over any credentials embedded in the `url`.
1227    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
1228    pub password: Option<String>,
1229    /// TLS configuration.
1230    #[serde(default)]
1231    pub tls: TlsConfig,
1232    /// The database name.
1233    pub database: String,
1234    /// (Consumer only) Polling interval in milliseconds for the consumer (when not using Change Streams). Defaults to 100ms.
1235    pub polling_interval_ms: Option<u64>,
1236    /// (Publisher only) Polling interval in milliseconds for the publisher when waiting for a reply. Defaults to 50ms.
1237    pub reply_polling_ms: Option<u64>,
1238    /// (Publisher only) If true, the publisher will wait for a response in a dedicated collection. Defaults to false.
1239    #[serde(default)]
1240    pub request_reply: bool,
1241    /// (Consumer only) If true, use Change Streams (**Subscriber mode**). Defaults to false (polling/consumer mode).
1242    #[serde(default)]
1243    pub change_stream: bool,
1244    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
1245    pub request_timeout_ms: Option<u64>,
1246    /// (Publisher only) TTL in seconds for documents created by the publisher. If set, a TTL index is created.
1247    pub ttl_seconds: Option<u64>,
1248    /// (Publisher only) If set, creates a capped collection with this size in bytes.
1249    pub capped_size_bytes: Option<i64>,
1250    /// Format for storing messages. Defaults to Normal.
1251    #[serde(default)]
1252    pub format: MongoDbFormat,
1253    /// The ID used for the cursor in sequenced mode. If not provided, consumption starts from the current sequence (ephemeral).
1254    pub cursor_id: Option<String>,
1255}
1256
1257impl MongoDbConfig {
1258    /// Creates a new MongoDB configuration with the specified URL and database name.
1259    pub fn new(url: impl Into<String>, database: impl Into<String>) -> Self {
1260        Self {
1261            url: url.into(),
1262            database: database.into(),
1263            ..Default::default()
1264        }
1265    }
1266
1267    pub fn with_collection(mut self, collection: impl Into<String>) -> Self {
1268        self.collection = Some(collection.into());
1269        self
1270    }
1271
1272    pub fn with_credentials(
1273        mut self,
1274        username: impl Into<String>,
1275        password: impl Into<String>,
1276    ) -> Self {
1277        self.username = Some(username.into());
1278        self.password = Some(password.into());
1279        self
1280    }
1281
1282    pub fn with_change_stream(mut self, change_stream: bool) -> Self {
1283        self.change_stream = change_stream;
1284        self
1285    }
1286}
1287
1288// --- MQTT Specific Configuration ---
1289
1290/// General MQTT connection configuration.
1291#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1292#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1293#[serde(deny_unknown_fields)]
1294pub struct MqttConfig {
1295    /// MQTT broker URL (e.g., "tcp://localhost:1883"). Does not support multiple hosts.
1296    pub url: String,
1297    /// The MQTT topic.
1298    pub topic: Option<String>,
1299    /// Optional username for authentication.
1300    pub username: Option<String>,
1301    /// Optional password for authentication.
1302    pub password: Option<String>,
1303    /// TLS configuration.
1304    #[serde(default)]
1305    pub tls: TlsConfig,
1306    /// Optional client ID. If not provided, one is generated or derived from route name.
1307    pub client_id: Option<String>,
1308    /// Capacity of the internal channel for incoming messages. Defaults to 100.
1309    pub queue_capacity: Option<usize>,
1310    /// Maximum number of inflight messages.
1311    pub max_inflight: Option<u16>,
1312    /// Quality of Service level (0, 1, or 2). Defaults to 1.
1313    pub qos: Option<u8>,
1314    /// (Consumer only) If true, start with a clean session. Defaults to false (persistent session). Setting this to true effectively enables **Subscriber mode** (ephemeral).
1315    #[serde(default = "default_clean_session")]
1316    pub clean_session: bool,
1317    /// Keep-alive interval in seconds. Defaults to 20.
1318    pub keep_alive_seconds: Option<u64>,
1319    /// MQTT protocol version (V3 or V5). Defaults to V5.
1320    #[serde(default)]
1321    pub protocol: MqttProtocol,
1322    /// Session expiry interval in seconds (MQTT v5 only).
1323    pub session_expiry_interval: Option<u32>,
1324    /// (Consumer only) If true, messages are acknowledged immediately upon receipt (auto-ack).
1325    /// If false (default), messages are acknowledged after processing (manual-ack).
1326    /// Note: This setting does not currently enable synchronous publishing (waiting for PubAck) for the MQTT publisher.
1327    #[serde(default)]
1328    pub delayed_ack: bool,
1329}
1330
1331impl MqttConfig {
1332    /// Creates a new MQTT configuration with the specified broker URL.
1333    pub fn new(url: impl Into<String>) -> Self {
1334        Self {
1335            url: url.into(),
1336            ..Default::default()
1337        }
1338    }
1339
1340    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1341        self.topic = Some(topic.into());
1342        self
1343    }
1344
1345    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
1346        self.client_id = Some(client_id.into());
1347        self
1348    }
1349
1350    pub fn with_credentials(
1351        mut self,
1352        username: impl Into<String>,
1353        password: impl Into<String>,
1354    ) -> Self {
1355        self.username = Some(username.into());
1356        self.password = Some(password.into());
1357        self
1358    }
1359}
1360
1361/// MQTT protocol version.
1362///
1363/// Specifies which version of the MQTT protocol to use for connections.
1364#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1365#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1366#[serde(rename_all = "lowercase")]
1367pub enum MqttProtocol {
1368    #[default]
1369    V5,
1370    V3,
1371}
1372
1373// --- ZeroMQ Specific Configuration ---
1374
1375#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1376#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1377#[serde(deny_unknown_fields)]
1378pub struct ZeroMqConfig {
1379    /// The ZeroMQ URL (e.g., "tcp://127.0.0.1:5555").
1380    pub url: String,
1381    /// The socket type (PUSH, PULL, PUB, SUB, REQ, REP).
1382    #[serde(default)]
1383    pub socket_type: Option<ZeroMqSocketType>,
1384    /// (Consumer only) The ZeroMQ topic (for SUB sockets).
1385    pub topic: Option<String>,
1386    /// If true, bind to the address. If false, connect.
1387    #[serde(default)]
1388    pub bind: bool,
1389    /// Internal buffer size for the channel. Defaults to 128.
1390    #[serde(default)]
1391    pub internal_buffer_size: Option<usize>,
1392}
1393
1394impl ZeroMqConfig {
1395    /// Creates a new ZeroMQ configuration with the specified URL.
1396    pub fn new(url: impl Into<String>) -> Self {
1397        Self {
1398            url: url.into(),
1399            ..Default::default()
1400        }
1401    }
1402
1403    pub fn with_socket_type(mut self, socket_type: ZeroMqSocketType) -> Self {
1404        self.socket_type = Some(socket_type);
1405        self
1406    }
1407
1408    pub fn with_bind(mut self, bind: bool) -> Self {
1409        self.bind = bind;
1410        self
1411    }
1412}
1413
1414/// ZeroMQ socket type.
1415///
1416/// Defines the messaging pattern for ZeroMQ connections.
1417/// Different patterns support different communication paradigms (request-reply, publish-subscribe, etc.).
1418#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
1419#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1420#[serde(rename_all = "lowercase")]
1421pub enum ZeroMqSocketType {
1422    Push,
1423    Pull,
1424    Pub,
1425    Sub,
1426    Req,
1427    Rep,
1428}
1429
1430// --- gRPC Specific Configuration ---
1431
1432#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1433#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1434#[serde(deny_unknown_fields)]
1435pub struct GrpcConfig {
1436    /// The gRPC server URL (e.g., "http://localhost:50051" for client or "0.0.0.0:50051" for server mode).
1437    pub url: String,
1438    /// Topic / subject used for both subscribe and publish paths.
1439    pub topic: Option<String>,
1440    /// Timeout in milliseconds.
1441    /// - Client mode: used as the connection timeout and per-request deadline.
1442    /// - Server mode: applied as the per-request deadline on the embedded server.
1443    pub timeout_ms: Option<u64>,
1444    /// TLS configuration.
1445    #[serde(default)]
1446    pub tls: TlsConfig,
1447    /// If `true`, start an embedded tonic gRPC server that accepts incoming `Publish` /
1448    /// `PublishBatch` RPCs. If `false` (the default), connect to a remote server as a client.
1449    #[serde(default)]
1450    pub server_mode: bool,
1451    /// HTTP/2 stream-level initial window size in bytes. **Server-mode only.**
1452    #[serde(default)]
1453    pub initial_stream_window_size: Option<u32>,
1454    /// HTTP/2 connection-level initial window size in bytes. **Server-mode only.**
1455    #[serde(default)]
1456    pub initial_connection_window_size: Option<u32>,
1457    /// Maximum number of concurrent requests handled per connection. **Server-mode only.**
1458    #[serde(default)]
1459    pub concurrency_limit_per_connection: Option<usize>,
1460    /// HTTP/2 keepalive ping interval in milliseconds. **Server-mode only.** Default disabled
1461    #[serde(default)]
1462    pub http2_keepalive_interval_ms: Option<u64>,
1463    /// Timeout for a keepalive ping acknowledgement in milliseconds. **Server-mode only.**
1464    #[serde(default)]
1465    pub http2_keepalive_timeout_ms: Option<u64>,
1466    /// Maximum size of a decoded incoming message in bytes. **Server-mode only.** Default 4 MiB.
1467    #[serde(default)]
1468    pub max_decoding_message_size: Option<usize>,
1469}
1470
1471impl GrpcConfig {
1472    /// Creates a new gRPC configuration with the specified server URL.
1473    pub fn new(url: impl Into<String>) -> Self {
1474        Self {
1475            url: url.into(),
1476            ..Default::default()
1477        }
1478    }
1479
1480    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1481        self.topic = Some(topic.into());
1482        self
1483    }
1484
1485    /// Enable or disable server mode for this gRPC endpoint.
1486    pub fn with_server_mode(mut self, server_mode: bool) -> Self {
1487        self.server_mode = server_mode;
1488        self
1489    }
1490}
1491
1492// --- HTTP Specific Configuration ---
1493
1494/// General HTTP connection configuration.
1495#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1496#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1497#[serde(deny_unknown_fields)]
1498pub struct HttpConfig {
1499    /// For consumers, the listen address (e.g., "0.0.0.0:8080"). For publishers, the target URL.
1500    pub url: String,
1501    /// (Consumer only) Optional request path filter. If set, only requests whose URI path matches exactly are delivered to this consumer.
1502    pub path: Option<String>,
1503    /// (Optional) HTTP method. For publishers: the method to use (defaults to POST). For consumers: restrict to this method (others return 405).
1504    pub method: Option<String>,
1505    /// TLS configuration.
1506    #[serde(default)]
1507    pub tls: TlsConfig,
1508    /// (Consumer only) Number of worker threads to use. Defaults to 0 for unlimited.
1509    pub workers: Option<usize>,
1510    /// (Consumer only) Header key to extract the message ID from. Defaults to "message-id".
1511    pub message_id_header: Option<String>,
1512    /// Timeout for HTTP requests in milliseconds. For consumers, it's the request-reply timeout. For publishers, it's the timeout for each individual request. Defaults to 30000ms.
1513    pub request_timeout_ms: Option<u64>,
1514    /// (Consumer only) Internal buffer size for the channel. Defaults to 100.
1515    pub internal_buffer_size: Option<usize>,
1516    /// (Consumer only) If true, respond immediately with 202 Accepted without waiting for downstream processing. Defaults to false.
1517    #[serde(default)]
1518    pub fire_and_forget: bool,
1519    /// (Publisher only) The number of concurrent HTTP requests to send in a batch. Defaults to 20.
1520    #[serde(default, skip_serializing_if = "Option::is_none")]
1521    pub batch_concurrency: Option<usize>,
1522    /// (Publisher only) TCP keepalive timeout for the underlying connection pool in milliseconds. Defaults to 60000ms.
1523    #[serde(default, skip_serializing_if = "Option::is_none")]
1524    pub tcp_keepalive_ms: Option<u64>,
1525    /// (Publisher only) Timeout for idle connections in the connection pool in milliseconds. Defaults to 90000ms.
1526    #[serde(default, skip_serializing_if = "Option::is_none")]
1527    pub pool_idle_timeout_ms: Option<u64>,
1528    /// Enable gzip compression for request/response bodies exceeding the threshold. Defaults to false.
1529    #[serde(default)]
1530    pub compression_enabled: bool,
1531    /// Minimum message size in bytes to compress. Messages smaller than this are sent uncompressed. Defaults to 1024 bytes.
1532    #[serde(default)]
1533    pub compression_threshold_bytes: Option<usize>,
1534    /// HTTP Basic Authentication credentials (username, password). For consumers: validates incoming requests. For publishers: adds Authorization header.
1535    /// (Consumer only) Maximum number of concurrent requests to handle. Defaults to 100.
1536    pub concurrency_limit: Option<usize>,
1537    #[serde(
1538        default,
1539        skip_serializing_if = "Option::is_none",
1540        deserialize_with = "deserialize_basic_auth"
1541    )]
1542    pub basic_auth: Option<(String, String)>,
1543    /// Custom headers as key-value pairs (e.g., {"X-API-Key": "token123"}). Added to outgoing HTTP headers for both consumers and publishers.
1544    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
1545    pub custom_headers: HashMap<String, String>,
1546}
1547
1548fn deserialize_basic_auth<'de, D>(deserializer: D) -> Result<Option<(String, String)>, D::Error>
1549where
1550    D: Deserializer<'de>,
1551{
1552    let val = serde_json::Value::deserialize(deserializer)?;
1553    match val {
1554        serde_json::Value::Null => Ok(None),
1555        serde_json::Value::Array(arr) => {
1556            if arr.len() != 2 {
1557                return Err(serde::de::Error::custom("basic_auth must have 2 elements"));
1558            }
1559            let u = arr[0]
1560                .as_str()
1561                .ok_or_else(|| serde::de::Error::custom("basic_auth[0] must be string"))?
1562                .to_string();
1563            let p = arr[1]
1564                .as_str()
1565                .ok_or_else(|| serde::de::Error::custom("basic_auth[1] must be string"))?
1566                .to_string();
1567            Ok(Some((u, p)))
1568        }
1569        serde_json::Value::Object(map) => {
1570            let u = map
1571                .get("0")
1572                .and_then(|v| v.as_str())
1573                .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '0'"))?
1574                .to_string();
1575            let p = map
1576                .get("1")
1577                .and_then(|v| v.as_str())
1578                .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '1'"))?
1579                .to_string();
1580            Ok(Some((u, p)))
1581        }
1582        _ => Err(serde::de::Error::custom("invalid type for basic_auth")),
1583    }
1584}
1585
1586impl HttpConfig {
1587    /// Creates a new HTTP configuration with the specified URL.
1588    pub fn new(url: impl Into<String>) -> Self {
1589        Self {
1590            url: url.into(),
1591            ..Default::default()
1592        }
1593    }
1594
1595    pub fn with_workers(mut self, workers: usize) -> Self {
1596        self.workers = Some(workers);
1597        self
1598    }
1599
1600    pub fn with_method(mut self, method: impl Into<String>) -> Self {
1601        self.method = Some(method.into());
1602        self
1603    }
1604
1605    pub fn with_path(mut self, path: impl Into<String>) -> Self {
1606        self.path = Some(path.into());
1607        self
1608    }
1609}
1610
1611// --- IBM MQ Specific Configuration ---
1612
1613/// Connection settings for the IBM MQ Queue Manager.
1614#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1615#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1616#[serde(deny_unknown_fields)]
1617pub struct IbmMqConfig {
1618    /// Required. Connection URL in `host(port)` format. Supports comma-separated list for failover (e.g., `host1(1414),host2(1414)`).
1619    pub url: String,
1620    /// Target Queue name for point-to-point messaging. Optional if `topic` is set; defaults to route name if omitted.
1621    pub queue: Option<String>,
1622    /// Target Topic string for Publish/Subscribe. If set, enables **Subscriber mode** (Consumer) or publishes to a topic (Publisher). Optional if `queue` is set.
1623    pub topic: Option<String>,
1624    /// Required. Name of the Queue Manager to connect to (e.g., `QM1`).
1625    pub queue_manager: String,
1626    /// Required. Server Connection (SVRCONN) Channel name defined on the QM.
1627    pub channel: String,
1628    /// Username for authentication. Optional; required if the channel enforces authentication.
1629    pub username: Option<String>,
1630    /// Password for authentication. Optional; required if the channel enforces authentication.
1631    pub password: Option<String>,
1632    /// TLS CipherSpec (e.g., `ANY_TLS12`). Optional; required for encrypted connections.
1633    pub cipher_spec: Option<String>,
1634    /// TLS configuration settings (e.g., keystore paths). Optional.
1635    #[serde(default)]
1636    pub tls: TlsConfig,
1637    /// Maximum message size in bytes (default: 4MB). Optional.
1638    #[serde(default = "default_max_message_size")]
1639    pub max_message_size: usize,
1640    /// (Consumer only) Polling timeout in milliseconds (default: 1000ms). Optional.
1641    #[serde(default = "default_wait_timeout_ms")]
1642    pub wait_timeout_ms: i32,
1643    /// Internal buffer size for the channel. Defaults to 100.
1644    #[serde(default)]
1645    pub internal_buffer_size: Option<usize>,
1646    /// If false, attempt to open the queue with INQUIRE permissions to fetch queue depth for status checks. Defaults to false.
1647    #[serde(default)]
1648    pub disable_status_inq: bool,
1649}
1650
1651impl IbmMqConfig {
1652    /// Creates a new IBM MQ configuration with the specified connection URL, queue manager, and channel.
1653    pub fn new(
1654        url: impl Into<String>,
1655        queue_manager: impl Into<String>,
1656        channel: impl Into<String>,
1657    ) -> Self {
1658        Self {
1659            url: url.into(),
1660            queue_manager: queue_manager.into(),
1661            channel: channel.into(),
1662            disable_status_inq: false,
1663            ..Default::default()
1664        }
1665    }
1666
1667    pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1668        self.queue = Some(queue.into());
1669        self
1670    }
1671
1672    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1673        self.topic = Some(topic.into());
1674        self
1675    }
1676
1677    pub fn with_credentials(
1678        mut self,
1679        username: impl Into<String>,
1680        password: impl Into<String>,
1681    ) -> Self {
1682        self.username = Some(username.into());
1683        self.password = Some(password.into());
1684        self
1685    }
1686}
1687
1688fn default_max_message_size() -> usize {
1689    4 * 1024 * 1024 // 4MB default
1690}
1691
1692fn default_wait_timeout_ms() -> i32 {
1693    1000 // 1 second default
1694}
1695
1696// --- Switch/Router Configuration ---
1697
1698#[derive(Debug, Deserialize, Serialize, Clone)]
1699#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1700#[serde(deny_unknown_fields)]
1701pub struct SwitchConfig {
1702    /// The metadata key to inspect for routing decisions.
1703    pub metadata_key: String,
1704    /// A map of values to endpoints.
1705    pub cases: HashMap<String, Endpoint>,
1706    /// The default endpoint if no case matches.
1707    pub default: Option<Box<Endpoint>>,
1708}
1709
1710// --- Response Endpoint Configuration ---
1711#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1712#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1713#[serde(deny_unknown_fields)]
1714pub struct ResponseConfig {
1715    // This struct is a marker and currently has no fields.
1716}
1717
1718// --- SQLx Specific Configuration ---
1719
1720/// General SQLx connection configuration.
1721#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1722#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1723#[serde(deny_unknown_fields)]
1724pub struct SqlxConfig {
1725    /// Database connection URL.
1726    pub url: String,
1727    /// Optional username. Takes precedence over any credentials embedded in the `url`.
1728    #[serde(default)]
1729    pub username: Option<String>,
1730    /// Optional password. Takes precedence over any credentials embedded in the `url`.
1731    #[serde(default)]
1732    pub password: Option<String>,
1733    /// The table to interact with.
1734    pub table: String,
1735    /// (Publisher only) Optional. A custom SQL INSERT query. Use `?` as a placeholder for the payload.
1736    /// If not provided, a default `INSERT INTO {table} (payload) VALUES (?)` is used.
1737    pub insert_query: Option<String>,
1738    /// (Consumer only) Optional. A custom SQL SELECT query to fetch messages. This is only supported for PostgreSQL and Microsoft SQL Server.
1739    /// The query must include a placeholder for the batch size (`$1` for PostgreSQL, `@p1` for SQL Server).
1740    /// The bridge will bind the route's `batch_size` to this placeholder.
1741    pub select_query: Option<String>,
1742    /// (Consumer only) If true, delete messages after processing.
1743    #[serde(default)]
1744    pub delete_after_read: bool,
1745    /// (Publisher only) If true, automatically create the table and indexes if they don't exist. Defaults to false.
1746    #[serde(default)]
1747    pub auto_create_table: bool,
1748    /// (Consumer only) Polling interval in milliseconds. Defaults to 100ms.
1749    pub polling_interval_ms: Option<u64>,
1750    /// TLS configuration for the database connection.
1751    #[serde(default)]
1752    pub tls: TlsConfig,
1753    /// Maximum number of connections in the pool. Defaults to 10.
1754    pub max_connections: Option<u32>,
1755    /// Minimum number of connections to keep in the pool. Defaults to 0.
1756    pub min_connections: Option<u32>,
1757    /// Timeout for acquiring a connection from the pool in milliseconds. Defaults to 30000ms.
1758    pub acquire_timeout_ms: Option<u64>,
1759    /// Maximum idle time for a connection in milliseconds. Defaults to 600000ms (10 minutes).
1760    pub idle_timeout_ms: Option<u64>,
1761    /// Maximum lifetime of a connection in milliseconds. Defaults to 1800000ms (30 minutes).
1762    pub max_lifetime_ms: Option<u64>,
1763}
1764
1765// --- Common Configuration ---
1766
1767/// TLS configuration for secure connections.
1768///
1769/// Configures Transport Layer Security (TLS/SSL) for encrypted communication.
1770/// Supports both client certificate (mutual TLS) and server certificate validation.
1771///
1772/// # Examples
1773///
1774/// ```
1775/// use mq_bridge::models::TlsConfig;
1776///
1777/// let tls = TlsConfig {
1778///     required: true,
1779///     ca_file: Some("/path/to/ca.pem".to_string()),
1780///     cert_file: Some("/path/to/cert.pem".to_string()),
1781///     key_file: Some("/path/to/key.pem".to_string()),
1782///     ..Default::default()
1783/// };
1784/// ```
1785#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq, Hash)]
1786#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1787#[serde(deny_unknown_fields)]
1788pub struct TlsConfig {
1789    /// If true, enable TLS/SSL.
1790    #[serde(default, deserialize_with = "deserialize_null_as_false")]
1791    pub required: bool,
1792    /// Path to the CA certificate file.
1793    pub ca_file: Option<String>,
1794    /// Path to the client certificate file (PEM).
1795    pub cert_file: Option<String>,
1796    /// Path to the client private key file (PEM).
1797    pub key_file: Option<String>,
1798    /// Password for the private key (if encrypted).
1799    pub cert_password: Option<String>,
1800    /// If true, disable server certificate verification (insecure).
1801    #[serde(default)]
1802    pub accept_invalid_certs: bool,
1803}
1804
1805impl TlsConfig {
1806    /// Creates a new TLS configuration with default settings (TLS not required).
1807    pub fn new() -> Self {
1808        Self::default()
1809    }
1810
1811    pub fn with_ca_file(mut self, ca_file: impl Into<String>) -> Self {
1812        self.ca_file = Some(ca_file.into());
1813        self.required = true;
1814        self
1815    }
1816
1817    pub fn with_client_cert(
1818        mut self,
1819        cert_file: impl Into<String>,
1820        key_file: impl Into<String>,
1821    ) -> Self {
1822        self.cert_file = Some(cert_file.into());
1823        self.key_file = Some(key_file.into());
1824        self.required = true;
1825        self
1826    }
1827
1828    pub fn with_insecure(mut self, accept_invalid_certs: bool) -> Self {
1829        self.accept_invalid_certs = accept_invalid_certs;
1830        self
1831    }
1832
1833    /// Checks if mutual TLS (mTLS) client authentication is configured.
1834    pub fn is_mtls_client_configured(&self) -> bool {
1835        self.required && self.cert_file.is_some() && self.key_file.is_some()
1836    }
1837
1838    /// Checks if TLS server certificate authentication is configured.
1839    pub fn is_tls_server_configured(&self) -> bool {
1840        self.required && self.cert_file.is_some() && self.key_file.is_some()
1841    }
1842
1843    /// Checks if the TLS configuration is sufficient to make a TLS client connection.
1844    pub fn is_tls_client_configured(&self) -> bool {
1845        self.required
1846            || self.ca_file.is_some()
1847            || (self.cert_file.is_some() && self.key_file.is_some())
1848    }
1849
1850    /// Helper to normalize a URL by adding the appropriate scheme prefix (http:// or https://) if missing.
1851    pub fn normalize_url(&self, url: &str) -> String {
1852        if url
1853            .get(..7)
1854            .is_some_and(|prefix| prefix.eq_ignore_ascii_case("http://"))
1855            || url
1856                .get(..8)
1857                .is_some_and(|prefix| prefix.eq_ignore_ascii_case("https://"))
1858        {
1859            url.to_string()
1860        } else {
1861            let is_tls = self.required;
1862            let scheme = if is_tls { "https" } else { "http" };
1863            format!("{}://{}", scheme, url)
1864        }
1865    }
1866}
1867
1868/// Trait for extracting secrets from configuration structures.
1869pub trait SecretExtractor {
1870    /// Extracts secrets into the provided map using the given prefix, and clears them from self.
1871    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>);
1872}
1873
1874fn extract_sensitive_string_map_entries(
1875    values: &mut HashMap<String, String>,
1876    prefix: &str,
1877    field_name: &str,
1878    secrets: &mut HashMap<String, String>,
1879) {
1880    let secret_keys = values
1881        .keys()
1882        .filter(|key| {
1883            let key = key.to_ascii_lowercase();
1884            key.contains("key") || key.contains("token") || key.contains("auth")
1885        })
1886        .cloned()
1887        .collect::<Vec<_>>();
1888
1889    for key in secret_keys {
1890        if let Some(value) = values.remove(&key) {
1891            secrets.insert(
1892                sanitize_secret_key(&format!("{}__{}__{}", prefix, field_name, key)),
1893                value,
1894            );
1895        }
1896    }
1897}
1898
1899fn url_has_userinfo(url: &str) -> bool {
1900    let Some(authority_start) = url.find("://").map(|idx| idx + 3) else {
1901        return false;
1902    };
1903    let authority_end = url[authority_start..]
1904        .find(['/', '?', '#'])
1905        .map(|idx| authority_start + idx)
1906        .unwrap_or(url.len());
1907    url[authority_start..authority_end].contains('@')
1908}
1909
1910fn sanitize_secret_key(key: &str) -> String {
1911    key.chars()
1912        .map(|ch| {
1913            let ch = ch.to_ascii_uppercase();
1914            if ch.is_ascii_alphanumeric() || ch == '_' {
1915                ch
1916            } else {
1917                '_'
1918            }
1919        })
1920        .collect()
1921}
1922
1923fn extract_sensitive_url(
1924    url: &mut String,
1925    prefix: &str,
1926    field_name: &str,
1927    secrets: &mut HashMap<String, String>,
1928) {
1929    if !url.is_empty() && url_has_userinfo(url) {
1930        secrets.insert(
1931            sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
1932            std::mem::take(url),
1933        );
1934    }
1935}
1936
1937fn extract_sensitive_optional_url(
1938    url: &mut Option<String>,
1939    prefix: &str,
1940    field_name: &str,
1941    secrets: &mut HashMap<String, String>,
1942) {
1943    if url.as_ref().is_some_and(|url| url_has_userinfo(url)) {
1944        if let Some(url) = url.take() {
1945            secrets.insert(
1946                sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
1947                url,
1948            );
1949        }
1950    }
1951}
1952
1953impl SecretExtractor for Route {
1954    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
1955        self.input
1956            .extract_secrets(&format!("{}__{}", prefix, "INPUT"), secrets);
1957        self.output
1958            .extract_secrets(&format!("{}__{}", prefix, "OUTPUT"), secrets);
1959    }
1960}
1961
1962impl SecretExtractor for Endpoint {
1963    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
1964        for (i, middleware) in self.middlewares.iter_mut().enumerate() {
1965            middleware.extract_secrets(&format!("{}__{}__{}", prefix, "MIDDLEWARES", i), secrets);
1966        }
1967        self.endpoint_type.extract_secrets(prefix, secrets);
1968    }
1969}
1970
1971impl SecretExtractor for EndpointType {
1972    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
1973        match self {
1974            EndpointType::Aws(cfg) => {
1975                cfg.extract_secrets(&format!("{}__{}", prefix, "AWS"), secrets)
1976            }
1977            EndpointType::Kafka(cfg) => {
1978                cfg.extract_secrets(&format!("{}__{}", prefix, "KAFKA"), secrets)
1979            }
1980            EndpointType::Nats(cfg) => {
1981                cfg.extract_secrets(&format!("{}__{}", prefix, "NATS"), secrets)
1982            }
1983            EndpointType::Amqp(cfg) => {
1984                cfg.extract_secrets(&format!("{}__{}", prefix, "AMQP"), secrets)
1985            }
1986            EndpointType::MongoDb(cfg) => {
1987                cfg.extract_secrets(&format!("{}__{}", prefix, "MONGODB"), secrets)
1988            }
1989            EndpointType::Mqtt(cfg) => {
1990                cfg.extract_secrets(&format!("{}__{}", prefix, "MQTT"), secrets)
1991            }
1992            EndpointType::Http(cfg) => {
1993                cfg.extract_secrets(&format!("{}__{}", prefix, "HTTP"), secrets)
1994            }
1995            EndpointType::IbmMq(cfg) => {
1996                cfg.extract_secrets(&format!("{}__{}", prefix, "IBMMQ"), secrets)
1997            }
1998            EndpointType::ZeroMq(cfg) => {
1999                cfg.extract_secrets(&format!("{}__{}", prefix, "ZEROMQ"), secrets)
2000            }
2001            EndpointType::Sqlx(cfg) => {
2002                cfg.extract_secrets(&format!("{}__{}", prefix, "SQLX"), secrets)
2003            }
2004            EndpointType::Grpc(cfg) => {
2005                cfg.extract_secrets(&format!("{}__{}", prefix, "GRPC"), secrets)
2006            }
2007            EndpointType::Fanout(endpoints) => {
2008                for (i, ep) in endpoints.iter_mut().enumerate() {
2009                    ep.extract_secrets(&format!("{}__{}__{}", prefix, "FANOUT", i), secrets);
2010                }
2011            }
2012            EndpointType::Switch(cfg) => {
2013                for (key, ep) in cfg.cases.iter_mut() {
2014                    ep.extract_secrets(
2015                        &format!(
2016                            "{}__{}__{}",
2017                            prefix,
2018                            "SWITCH__CASES",
2019                            sanitize_secret_key(key)
2020                        ),
2021                        secrets,
2022                    );
2023                }
2024                if let Some(default) = &mut cfg.default {
2025                    default.extract_secrets(&format!("{}__{}", prefix, "SWITCH__DEFAULT"), secrets);
2026                }
2027            }
2028            EndpointType::Reader(ep) => {
2029                ep.extract_secrets(&format!("{}__{}", prefix, "READER"), secrets)
2030            }
2031            _ => {}
2032        }
2033    }
2034}
2035
2036impl SecretExtractor for Middleware {
2037    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2038        if let Middleware::Dlq(cfg) = self {
2039            cfg.endpoint
2040                .extract_secrets(&format!("{}__{}__{}", prefix, "DLQ", "ENDPOINT"), secrets);
2041        }
2042    }
2043}
2044
2045impl SecretExtractor for AwsConfig {
2046    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2047        if let Some(val) = self.access_key.take() {
2048            secrets.insert(format!("{}__{}", prefix, "ACCESS_KEY"), val);
2049        }
2050        if let Some(val) = self.secret_key.take() {
2051            secrets.insert(format!("{}__{}", prefix, "SECRET_KEY"), val);
2052        }
2053        if let Some(val) = self.session_token.take() {
2054            secrets.insert(format!("{}__{}", prefix, "SESSION_TOKEN"), val);
2055        }
2056        extract_sensitive_optional_url(&mut self.queue_url, prefix, "QUEUE_URL", secrets);
2057        extract_sensitive_optional_url(&mut self.endpoint_url, prefix, "ENDPOINT_URL", secrets);
2058    }
2059}
2060
2061impl SecretExtractor for KafkaConfig {
2062    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2063        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2064        if let Some(val) = self.username.take() {
2065            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2066        }
2067        if let Some(val) = self.password.take() {
2068            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2069        }
2070        self.tls
2071            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2072    }
2073}
2074
2075impl SecretExtractor for NatsConfig {
2076    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2077        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2078        if let Some(val) = self.username.take() {
2079            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2080        }
2081        if let Some(val) = self.password.take() {
2082            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2083        }
2084        if let Some(val) = self.token.take() {
2085            secrets.insert(format!("{}__{}", prefix, "TOKEN"), val);
2086        }
2087        self.tls
2088            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2089    }
2090}
2091
2092impl SecretExtractor for AmqpConfig {
2093    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2094        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2095        if let Some(val) = self.username.take() {
2096            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2097        }
2098        if let Some(val) = self.password.take() {
2099            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2100        }
2101        self.tls
2102            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2103    }
2104}
2105
2106impl SecretExtractor for MongoDbConfig {
2107    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2108        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2109        if let Some(val) = self.username.take() {
2110            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2111        }
2112        if let Some(val) = self.password.take() {
2113            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2114        }
2115        self.tls
2116            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2117    }
2118}
2119
2120impl SecretExtractor for MqttConfig {
2121    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2122        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2123        if let Some(val) = self.username.take() {
2124            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2125        }
2126        if let Some(val) = self.password.take() {
2127            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2128        }
2129        self.tls
2130            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2131    }
2132}
2133
2134impl SecretExtractor for HttpConfig {
2135    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2136        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2137        if let Some((u, p)) = self.basic_auth.take() {
2138            secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 0), u);
2139            secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 1), p);
2140        }
2141        extract_sensitive_string_map_entries(
2142            &mut self.custom_headers,
2143            prefix,
2144            "CUSTOM_HEADERS",
2145            secrets,
2146        );
2147        self.tls
2148            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2149    }
2150}
2151
2152impl SecretExtractor for IbmMqConfig {
2153    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2154        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2155        if let Some(val) = self.username.take() {
2156            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2157        }
2158        if let Some(val) = self.password.take() {
2159            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2160        }
2161        self.tls
2162            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2163    }
2164}
2165
2166impl SecretExtractor for ZeroMqConfig {
2167    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2168        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2169    }
2170}
2171
2172impl SecretExtractor for SqlxConfig {
2173    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2174        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2175        if let Some(val) = self.username.take() {
2176            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2177        }
2178        if let Some(val) = self.password.take() {
2179            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2180        }
2181        self.tls
2182            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2183    }
2184}
2185
2186impl SecretExtractor for GrpcConfig {
2187    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2188        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2189        self.tls
2190            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2191    }
2192}
2193
2194impl SecretExtractor for TlsConfig {
2195    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2196        if let Some(val) = self.cert_password.take() {
2197            secrets.insert(format!("{}__{}", prefix, "CERT_PASSWORD"), val);
2198        }
2199    }
2200}
2201
2202/// Extracts sensitive values (passwords, keys, tokens) from the configuration
2203/// and returns them as a map of environment variables (key-value pairs).
2204/// The extracted fields in the configuration are set to `None`.
2205///
2206/// The keys in the returned map follow the `MQB__{ROUTE}__{ENDPOINT}__{FIELD}` pattern
2207/// compatible with the `config` crate's environment variable override mechanism.
2208pub fn extract_config_secrets(config: &mut Config) -> HashMap<String, String> {
2209    let mut secrets = HashMap::new();
2210    for (route_name, route) in config.iter_mut() {
2211        let prefix = sanitize_secret_key(&format!("MQB__{}", route_name));
2212        route.extract_secrets(&prefix, &mut secrets);
2213    }
2214    secrets
2215}
2216
2217#[cfg(test)]
2218mod tests {
2219    use super::*;
2220    use config::{Config as ConfigBuilder, Environment};
2221
2222    const TEST_YAML: &str = r#"
2223kafka_to_nats:
2224  concurrency: 10
2225  input:
2226    middlewares:
2227      - deduplication:
2228          sled_path: "/tmp/mq-bridge/dedup_db"
2229          ttl_seconds: 3600
2230      - metrics: {}
2231      - retry:
2232          max_attempts: 5
2233          initial_interval_ms: 200
2234      - random_panic:
2235          mode: nack
2236      - dlq:
2237          endpoint:
2238            nats:
2239              subject: "dlq-subject"
2240              url: "nats://localhost:4222"
2241    kafka:
2242      topic: "input-topic"
2243      url: "localhost:9092"
2244      group_id: "my-consumer-group"
2245      tls:
2246        required: true
2247        ca_file: "/path_to_ca"
2248        cert_file: "/path_to_cert"
2249        key_file: "/path_to_key"
2250        cert_password: "password"
2251        accept_invalid_certs: true
2252  output:
2253    middlewares:
2254      - metrics: {}
2255      - dlq:
2256          endpoint:
2257            file:
2258              path: "error.out"
2259    nats:
2260      subject: "output-subject"
2261      url: "nats://localhost:4222"
2262"#;
2263
2264    fn assert_config_values(config: &Config) {
2265        assert_eq!(config.len(), 1);
2266        let route = config.get("kafka_to_nats").expect("Route should exist");
2267
2268        assert_eq!(route.options.concurrency, 10);
2269
2270        // --- Assert Input ---
2271        let input = &route.input;
2272        assert_eq!(input.middlewares.len(), 5);
2273
2274        let mut has_dedup = false;
2275        let mut has_metrics = false;
2276        let mut has_dlq = false;
2277        let mut has_retry = false;
2278        let mut has_random_panic = false;
2279        for middleware in &input.middlewares {
2280            match middleware {
2281                Middleware::Deduplication(dedup) => {
2282                    assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
2283                    assert_eq!(dedup.ttl_seconds, 3600);
2284                    has_dedup = true;
2285                }
2286                Middleware::Metrics(_) => {
2287                    has_metrics = true;
2288                }
2289                Middleware::Custom { .. } => {}
2290                Middleware::Dlq(dlq) => {
2291                    assert!(dlq.endpoint.middlewares.is_empty());
2292                    if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
2293                        assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
2294                        assert_eq!(nats_cfg.url, "nats://localhost:4222");
2295                    }
2296                    has_dlq = true;
2297                }
2298                Middleware::Retry(retry) => {
2299                    assert_eq!(retry.max_attempts, 5);
2300                    assert_eq!(retry.initial_interval_ms, 200);
2301                    has_retry = true;
2302                }
2303                Middleware::RandomPanic(rp) => {
2304                    assert!(rp.mode == FaultMode::Nack);
2305                    has_random_panic = true;
2306                }
2307                Middleware::Delay(_) => {}
2308                Middleware::WeakJoin(_) => {}
2309            }
2310        }
2311
2312        if let EndpointType::Kafka(kafka) = &input.endpoint_type {
2313            assert_eq!(kafka.topic, Some("input-topic".to_string()));
2314            assert_eq!(kafka.url, "localhost:9092");
2315            assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
2316            let tls = &kafka.tls;
2317            assert!(tls.required);
2318            assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
2319            assert!(tls.accept_invalid_certs);
2320        } else {
2321            panic!("Input endpoint should be Kafka");
2322        }
2323        assert!(has_dedup);
2324        assert!(has_metrics);
2325        assert!(has_dlq);
2326        assert!(has_retry);
2327        assert!(has_random_panic);
2328
2329        // --- Assert Output ---
2330        let output = &route.output;
2331        assert_eq!(output.middlewares.len(), 2);
2332        assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
2333
2334        if let EndpointType::Nats(nats) = &output.endpoint_type {
2335            assert_eq!(nats.subject, Some("output-subject".to_string()));
2336            assert_eq!(nats.url, "nats://localhost:4222");
2337        } else {
2338            panic!("Output endpoint should be NATS");
2339        }
2340    }
2341
2342    #[test]
2343    fn test_deserialize_from_yaml() {
2344        // We use serde_yaml directly here because the `config` crate's processing
2345        // can interfere with complex deserialization logic.
2346        let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
2347        println!("Deserialized from YAML: {:#?}", result);
2348        let config = result.expect("Failed to deserialize TEST_YAML");
2349        assert_config_values(&config);
2350    }
2351
2352    #[test]
2353    fn test_deserialize_from_env() {
2354        // Set environment variables based on README
2355        unsafe {
2356            std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
2357            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
2358            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
2359            std::env::set_var(
2360                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
2361                "my-consumer-group",
2362            );
2363            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
2364            std::env::set_var(
2365                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
2366                "/path_to_ca",
2367            );
2368            std::env::set_var(
2369                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
2370                "true",
2371            );
2372            std::env::set_var(
2373                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
2374                "output-subject",
2375            );
2376            std::env::set_var(
2377                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
2378                "nats://localhost:4222",
2379            );
2380            std::env::set_var(
2381                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
2382                "dlq-subject",
2383            );
2384            std::env::set_var(
2385                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
2386                "nats://localhost:4222",
2387            );
2388        }
2389
2390        let builder = ConfigBuilder::builder()
2391            // Enable automatic type parsing for values from environment variables.
2392            .add_source(
2393                Environment::with_prefix("MQB")
2394                    .separator("__")
2395                    .try_parsing(true),
2396            );
2397
2398        let config: Config = builder
2399            .build()
2400            .expect("Failed to build config")
2401            .try_deserialize()
2402            .expect("Failed to deserialize config");
2403
2404        // We can't test all values from env, but we can check the ones we set.
2405        assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
2406        if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
2407            assert_eq!(k.topic, Some("input-topic".to_string()));
2408            assert!(k.tls.required);
2409        } else {
2410            panic!("Expected Kafka endpoint");
2411        }
2412
2413        let input = &config.get("kafka_to_nats").unwrap().input;
2414        assert_eq!(input.middlewares.len(), 1);
2415        if let Middleware::Dlq(_) = &input.middlewares[0] {
2416            // Correctly parsed
2417        } else {
2418            panic!("Expected DLQ middleware");
2419        }
2420    }
2421
2422    #[test]
2423    fn test_extract_secrets() {
2424        let mut config = Config::new();
2425        let mut route = Route::default();
2426
2427        // Setup Kafka with secrets
2428        let mut kafka_config = KafkaConfig::new("kafka://user:pass@localhost:9092");
2429        kafka_config.username = Some("user".to_string());
2430        kafka_config.password = Some("pass".to_string());
2431        kafka_config.tls.cert_password = Some("certpass".to_string());
2432
2433        route.input = Endpoint {
2434            endpoint_type: EndpointType::Kafka(kafka_config),
2435            middlewares: vec![],
2436            handler: None,
2437        };
2438
2439        // Setup HTTP with basic auth
2440        let mut http_config = HttpConfig::new("http://httpuser:httppass@localhost");
2441        http_config.basic_auth = Some(("httpuser".to_string(), "httppass".to_string()));
2442        http_config
2443            .custom_headers
2444            .insert("X-API-Key".to_string(), "http-api-key".to_string());
2445        http_config.custom_headers.insert(
2446            "X-Access-Token".to_string(),
2447            "http-access-token".to_string(),
2448        );
2449        http_config.custom_headers.insert(
2450            "X-Authentication".to_string(),
2451            "http-authentication".to_string(),
2452        );
2453        http_config.custom_headers.insert(
2454            "Authorization".to_string(),
2455            "Bearer secret-token".to_string(),
2456        );
2457        http_config
2458            .custom_headers
2459            .insert("X-Trace-Id".to_string(), "trace-value".to_string());
2460
2461        route.output = Endpoint {
2462            endpoint_type: EndpointType::Http(http_config),
2463            middlewares: vec![],
2464            handler: None,
2465        };
2466
2467        config.insert("test_route".to_string(), route);
2468
2469        let secrets = extract_config_secrets(&mut config);
2470
2471        // Verify secrets extracted
2472        assert_eq!(
2473            secrets
2474                .get("MQB__TEST_ROUTE__INPUT__KAFKA__URL")
2475                .map(|s| s.as_str()),
2476            Some("kafka://user:pass@localhost:9092")
2477        );
2478        assert_eq!(
2479            secrets
2480                .get("MQB__TEST_ROUTE__INPUT__KAFKA__USERNAME")
2481                .map(|s| s.as_str()),
2482            Some("user")
2483        );
2484        assert_eq!(
2485            secrets
2486                .get("MQB__TEST_ROUTE__INPUT__KAFKA__PASSWORD")
2487                .map(|s| s.as_str()),
2488            Some("pass")
2489        );
2490        assert_eq!(
2491            secrets
2492                .get("MQB__TEST_ROUTE__INPUT__KAFKA__TLS__CERT_PASSWORD")
2493                .map(|s| s.as_str()),
2494            Some("certpass")
2495        );
2496        assert_eq!(
2497            secrets
2498                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__URL")
2499                .map(|s| s.as_str()),
2500            Some("http://httpuser:httppass@localhost")
2501        );
2502        assert_eq!(
2503            secrets
2504                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__0")
2505                .map(|s| s.as_str()),
2506            Some("httpuser")
2507        );
2508        assert_eq!(
2509            secrets
2510                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__1")
2511                .map(|s| s.as_str()),
2512            Some("httppass")
2513        );
2514        assert_eq!(
2515            secrets
2516                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_API_KEY")
2517                .map(|s| s.as_str()),
2518            Some("http-api-key")
2519        );
2520        assert_eq!(
2521            secrets
2522                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_ACCESS_TOKEN")
2523                .map(|s| s.as_str()),
2524            Some("http-access-token")
2525        );
2526        assert_eq!(
2527            secrets
2528                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_AUTHENTICATION")
2529                .map(|s| s.as_str()),
2530            Some("http-authentication")
2531        );
2532        assert_eq!(
2533            secrets
2534                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__AUTHORIZATION")
2535                .map(|s| s.as_str()),
2536            Some("Bearer secret-token")
2537        );
2538
2539        // Verify config cleared
2540        let route = config.get("test_route").unwrap();
2541        if let EndpointType::Kafka(k) = &route.input.endpoint_type {
2542            assert!(k.url.is_empty());
2543            assert!(k.username.is_none());
2544            assert!(k.password.is_none());
2545            assert!(k.tls.cert_password.is_none());
2546        }
2547        if let EndpointType::Http(h) = &route.output.endpoint_type {
2548            assert!(h.url.is_empty());
2549            assert!(h.basic_auth.is_none());
2550            assert!(!h.custom_headers.contains_key("X-API-Key"));
2551            assert!(!h.custom_headers.contains_key("X-Access-Token"));
2552            assert!(!h.custom_headers.contains_key("X-Authentication"));
2553            assert!(!h.custom_headers.contains_key("Authorization"));
2554            assert_eq!(
2555                h.custom_headers.get("X-Trace-Id").map(|s| s.as_str()),
2556                Some("trace-value")
2557            );
2558        }
2559    }
2560
2561    #[test]
2562    fn test_extract_sensitive_url_only_strips_authority_credentials() {
2563        let mut config = Config::new();
2564        let path_at_route = Route {
2565            output: Endpoint {
2566                endpoint_type: EndpointType::Http(HttpConfig::new(
2567                    "https://example.com/path/user@example.com?email=a@b.test",
2568                )),
2569                middlewares: vec![],
2570                handler: None,
2571            },
2572            ..Default::default()
2573        };
2574        config.insert("path_at_route".to_string(), path_at_route);
2575
2576        let credential_route = Route {
2577            output: Endpoint {
2578                endpoint_type: EndpointType::Http(HttpConfig::new(
2579                    "https://user:pass@example.com/path",
2580                )),
2581                middlewares: vec![],
2582                handler: None,
2583            },
2584            ..Default::default()
2585        };
2586        config.insert("credential_route".to_string(), credential_route);
2587
2588        let query_at_route = Route {
2589            output: Endpoint {
2590                endpoint_type: EndpointType::Http(HttpConfig::new(
2591                    "https://example.com?next=a@b.test",
2592                )),
2593                middlewares: vec![],
2594                handler: None,
2595            },
2596            ..Default::default()
2597        };
2598        config.insert("query_at_route".to_string(), query_at_route);
2599
2600        let fragment_at_route = Route {
2601            output: Endpoint {
2602                endpoint_type: EndpointType::Http(HttpConfig::new(
2603                    "https://example.com#user@example.com",
2604                )),
2605                middlewares: vec![],
2606                handler: None,
2607            },
2608            ..Default::default()
2609        };
2610        config.insert("fragment_at_route".to_string(), fragment_at_route);
2611
2612        let secrets = extract_config_secrets(&mut config);
2613
2614        if let EndpointType::Http(http) = &config.get("path_at_route").unwrap().output.endpoint_type
2615        {
2616            assert_eq!(
2617                http.url,
2618                "https://example.com/path/user@example.com?email=a@b.test"
2619            );
2620        }
2621        if let EndpointType::Http(http) =
2622            &config.get("query_at_route").unwrap().output.endpoint_type
2623        {
2624            assert_eq!(http.url, "https://example.com?next=a@b.test");
2625        }
2626        if let EndpointType::Http(http) = &config
2627            .get("fragment_at_route")
2628            .unwrap()
2629            .output
2630            .endpoint_type
2631        {
2632            assert_eq!(http.url, "https://example.com#user@example.com");
2633        }
2634        if let EndpointType::Http(http) =
2635            &config.get("credential_route").unwrap().output.endpoint_type
2636        {
2637            assert!(http.url.is_empty());
2638        }
2639        assert_eq!(
2640            secrets
2641                .get("MQB__CREDENTIAL_ROUTE__OUTPUT__HTTP__URL")
2642                .map(String::as_str),
2643            Some("https://user:pass@example.com/path")
2644        );
2645        assert!(!secrets.contains_key("MQB__PATH_AT_ROUTE__OUTPUT__HTTP__URL"));
2646        assert!(!secrets.contains_key("MQB__QUERY_AT_ROUTE__OUTPUT__HTTP__URL"));
2647        assert!(!secrets.contains_key("MQB__FRAGMENT_AT_ROUTE__OUTPUT__HTTP__URL"));
2648    }
2649
2650    #[test]
2651    fn test_file_config_inference() {
2652        let yaml = r#"
2653mode: group_subscribe
2654path: "/tmp/test"
2655group_id: "my_group"
2656"#;
2657        let config: FileConfig = serde_yaml_ng::from_str(yaml).unwrap();
2658        match config.mode {
2659            Some(FileConsumerMode::GroupSubscribe { group_id, .. }) => {
2660                assert_eq!(group_id, "my_group")
2661            }
2662            _ => panic!("Expected GroupSubscribe"),
2663        }
2664
2665        let yaml_queue = r#"
2666mode: consume
2667path: "/tmp/test"
2668"#;
2669        let config_queue: FileConfig = serde_yaml_ng::from_str(yaml_queue).unwrap();
2670        match config_queue.mode {
2671            Some(FileConsumerMode::Consume { delete }) => assert!(!delete),
2672            _ => panic!("Expected Consume"),
2673        }
2674    }
2675}
2676
2677#[cfg(all(test, feature = "schema"))]
2678mod schema_tests {
2679    use super::*;
2680
2681    #[test]
2682    fn generate_json_schema() {
2683        let schema = schemars::schema_for!(Config);
2684        let schema_json = serde_json::to_string_pretty(&schema).unwrap();
2685
2686        let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2687        path.push("mq-bridge.schema.json");
2688        std::fs::write(path, schema_json).expect("Failed to write schema file");
2689    }
2690}