Skip to main content

mq_bridge/
models.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use serde::{
7    de::{MapAccess, Visitor},
8    Deserialize, Deserializer, Serialize,
9};
10use std::{
11    collections::HashMap,
12    sync::{atomic::AtomicUsize, Arc},
13};
14
15use crate::traits::Handler;
16use tracing::trace;
17
18/// The top-level configuration is a map of named routes.
19/// The key is the route name (e.g., "kafka_to_nats").
20///
21/// # Examples
22///
23/// Deserializing a complex configuration from YAML:
24///
25/// ```
26/// use mq_bridge::models::{Config, EndpointType, Middleware};
27///
28/// let yaml = r#"
29/// kafka_to_nats:
30///   concurrency: 10
31///   input:
32///     middlewares:
33///       - deduplication:
34///           sled_path: "/tmp/mq-bridge/dedup_db"
35///           ttl_seconds: 3600
36///       - metrics: {}
37///       - retry:
38///           max_attempts: 5
39///           initial_interval_ms: 200
40///       - random_panic:
41///           mode: nack
42///       - dlq:
43///           endpoint:
44///             nats:
45///               subject: "dlq-subject"
46///               url: "nats://localhost:4222"
47///     kafka:
48///       topic: "input-topic"
49///       url: "localhost:9092"
50///       group_id: "my-consumer-group"
51///       tls:
52///         required: true
53///         ca_file: "/path_to_ca"
54///         cert_file: "/path_to_cert"
55///         key_file: "/path_to_key"
56///         cert_password: "password"
57///         accept_invalid_certs: true
58///   output:
59///     middlewares:
60///       - metrics: {}
61///       - dlq:
62///           endpoint:
63///             file:
64///               path: "error.out"
65///     nats:
66///       subject: "output-subject"
67///       url: "nats://localhost:4222"
68/// "#;
69///
70/// let config: Config = serde_yaml_ng::from_str(yaml).unwrap();
71/// let route = config.get("kafka_to_nats").unwrap();
72///
73/// assert_eq!(route.options.concurrency, 10);
74/// // Check input middleware
75/// assert!(route.input.middlewares.iter().any(|m| matches!(m, Middleware::Deduplication(_))));
76/// // Check output endpoint
77/// assert!(matches!(route.output.endpoint_type, EndpointType::Nats(_)));
78/// ```
79pub type Config = HashMap<String, Route>;
80
81/// A configuration map for named publishers (endpoints).
82/// The key is the publisher name.
83pub type PublisherConfig = HashMap<String, Endpoint>;
84
85/// Defines a single message processing route from an input to an output.
86#[derive(Debug, Deserialize, Serialize, Clone)]
87#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
88#[serde(deny_unknown_fields)]
89pub struct Route {
90    /// The input/source endpoint for the route.
91    pub input: Endpoint,
92    /// The output/sink endpoint for the route.
93    #[serde(default = "default_output_endpoint")]
94    pub output: Endpoint,
95    /// (Optional) Fine-tuning options for the route's execution.
96    #[serde(flatten, default)]
97    pub options: RouteOptions,
98}
99
100impl Default for Route {
101    fn default() -> Self {
102        Self {
103            input: Endpoint::null(),
104            output: Endpoint::null(),
105            options: RouteOptions::default(),
106        }
107    }
108}
109
110/// Fine-tuning options for a route's execution.
111///
112/// These options control concurrency, batching, and commit behavior for message processing.
113///
114/// # Examples
115///
116/// ```
117/// use mq_bridge::models::RouteOptions;
118///
119/// let options = RouteOptions {
120///     description: "My Route".to_string(),
121///     concurrency: 10,
122///     batch_size: 5,
123///     commit_concurrency_limit: 1024,
124/// };
125/// ```
126#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
127#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
128#[serde(deny_unknown_fields)]
129pub struct RouteOptions {
130    /// A human-readable description of the route's purpose. Defaults to an empty string.
131    #[serde(default, skip_serializing_if = "String::is_empty")]
132    pub description: String,
133    /// (Optional) Number of concurrent processing tasks for this route. While it improves throughput for high-latency
134    /// handlers, it adds synchronization overhead for ordered commits and may lead to out-of-order processing
135    /// in the handler. Defaults to 1.
136    #[serde(default = "default_concurrency")]
137    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
138    pub concurrency: usize,
139    /// (Optional) Maximum number of messages to process in a single batch. The consumer waits for at least one message
140    /// and then attempts to fetch more if available. Increasing this improves throughput but also increases
141    /// the potential impact of a single batch processing failure. Defaults to 1.
142    #[serde(default = "default_batch_size")]
143    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
144    pub batch_size: usize,
145    /// (Optional) The maximum number of in-flight commit requests queued for ordered sequencing.
146    /// Lower values apply backpressure earlier; higher values allow larger commit backlogs.
147    /// Defaults to 4096.
148    #[serde(default = "default_commit_concurrency_limit")]
149    pub commit_concurrency_limit: usize,
150}
151
152impl Default for RouteOptions {
153    fn default() -> Self {
154        Self {
155            description: String::new(),
156            concurrency: default_concurrency(),
157            batch_size: default_batch_size(),
158            commit_concurrency_limit: default_commit_concurrency_limit(),
159        }
160    }
161}
162
163pub(crate) fn default_concurrency() -> usize {
164    1
165}
166
167pub(crate) fn default_batch_size() -> usize {
168    1
169}
170
171pub(crate) fn default_commit_concurrency_limit() -> usize {
172    4096
173}
174
175fn default_output_endpoint() -> Endpoint {
176    Endpoint::new(EndpointType::Null)
177}
178
179fn default_retry_attempts() -> usize {
180    3
181}
182fn default_initial_interval_ms() -> u64 {
183    100
184}
185fn default_max_interval_ms() -> u64 {
186    5000
187}
188fn default_multiplier() -> f64 {
189    2.0
190}
191fn default_clean_session() -> bool {
192    false
193}
194fn default_cookie_metadata_key() -> String {
195    "cookie".to_string()
196}
197fn default_set_cookie_metadata_key() -> String {
198    "set-cookie".to_string()
199}
200
201fn is_known_endpoint_name(name: &str) -> bool {
202    matches!(
203        name,
204        "aws"
205            | "kafka"
206            | "nats"
207            | "file"
208            | "static"
209            | "memory"
210            | "sled"
211            | "amqp"
212            | "mongodb"
213            | "mqtt"
214            | "http"
215            | "websocket"
216            | "ibmmq"
217            | "zeromq"
218            | "grpc"
219            | "fanout"
220            | "ref"
221            | "switch"
222            | "response"
223            | "reader"
224            | "null"
225            | "sqlx"
226    )
227}
228
229/// Represents a connection point for messages, which can be a source (input) or a sink (output).
230#[derive(Serialize, Clone, Default)]
231#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
232#[serde(deny_unknown_fields)]
233pub struct Endpoint {
234    /// (Optional) A list of middlewares to apply to the endpoint.
235    #[serde(default)]
236    pub middlewares: Vec<Middleware>,
237
238    /// The specific endpoint implementation, determined by the configuration key (e.g., "kafka", "nats").
239    #[serde(flatten)]
240    pub endpoint_type: EndpointType,
241
242    #[serde(skip_serializing)]
243    #[cfg_attr(feature = "schema", schemars(skip))]
244    /// Internal handler for processing messages (not serialized).
245    pub handler: Option<Arc<dyn Handler>>,
246}
247
248impl std::fmt::Debug for Endpoint {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        f.debug_struct("Endpoint")
251            .field("middlewares", &self.middlewares)
252            .field("endpoint_type", &self.endpoint_type)
253            .field(
254                "handler",
255                &if self.handler.is_some() {
256                    "Some(<Handler>)"
257                } else {
258                    "None"
259                },
260            )
261            .finish()
262    }
263}
264
265impl<'de> Deserialize<'de> for Endpoint {
266    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
267    where
268        D: Deserializer<'de>,
269    {
270        struct EndpointVisitor;
271
272        impl<'de> Visitor<'de> for EndpointVisitor {
273            type Value = Endpoint;
274
275            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
276                formatter.write_str("a map representing an endpoint or null")
277            }
278
279            fn visit_unit<E>(self) -> Result<Self::Value, E>
280            where
281                E: serde::de::Error,
282            {
283                Ok(Endpoint::new(EndpointType::Null))
284            }
285
286            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
287            where
288                A: MapAccess<'de>,
289            {
290                // Buffer the map into a temporary serde_json::Map.
291                // This allows us to separate the `middlewares` field from the rest.
292                let mut temp_map = serde_json::Map::new();
293                let mut middlewares_val = None;
294
295                while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
296                    if key == "middlewares" {
297                        middlewares_val = Some(value);
298                    } else {
299                        temp_map.insert(key, value);
300                    }
301                }
302
303                // Deserialize the rest of the map into the flattened EndpointType.
304                let temp_val = serde_json::Value::Object(temp_map);
305                let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
306                    Ok(et) => et,
307                    Err(original_err) => {
308                        if let serde_json::Value::Object(map) = &temp_val {
309                            if map.len() == 1 {
310                                let (name, config) = map.iter().next().unwrap();
311                                if is_known_endpoint_name(name) {
312                                    return Err(serde::de::Error::custom(original_err));
313                                }
314                                trace!("Falling back to Custom endpoint for key: {}", name);
315                                EndpointType::Custom {
316                                    name: name.clone(),
317                                    config: config.clone(),
318                                }
319                            } else if map.is_empty() {
320                                EndpointType::Null
321                            } else {
322                                return Err(serde::de::Error::custom(
323                                    "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
324                                ));
325                            }
326                        } else {
327                            return Err(serde::de::Error::custom("Invalid endpoint configuration"));
328                        }
329                    }
330                };
331
332                // Deserialize the extracted middlewares value using the existing helper logic.
333                let middlewares = match middlewares_val {
334                    Some(val) => {
335                        deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
336                    }
337                    None => Vec::new(),
338                };
339
340                Ok(Endpoint {
341                    middlewares,
342                    endpoint_type,
343                    handler: None,
344                })
345            }
346        }
347
348        deserializer.deserialize_any(EndpointVisitor)
349    }
350}
351
352fn is_known_middleware_name(name: &str) -> bool {
353    matches!(
354        name,
355        "deduplication"
356            | "metrics"
357            | "dlq"
358            | "retry"
359            | "random_panic"
360            | "delay"
361            | "weak_join"
362            | "limiter"
363            | "buffer"
364            | "cookie_jar"
365            | "custom"
366    )
367}
368
369/// Deserialize middlewares from a generic serde_json::Value.
370///
371/// This logic was extracted from `deserialize_middlewares_from_map_or_seq` to be reused by the custom `Endpoint` deserializer.
372fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
373    let arr = match value {
374        serde_json::Value::Array(arr) => arr,
375        serde_json::Value::Object(map) => {
376            let mut middlewares: Vec<_> = map
377                .into_iter()
378                // The config crate can produce maps with numeric string keys ("0", "1", ...)
379                // from environment variables. We need to sort by these keys to maintain order.
380                .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
381                .collect();
382            middlewares.sort_by_key(|(index, _)| *index);
383
384            middlewares.into_iter().map(|(_, value)| value).collect()
385        }
386        _ => return Err(anyhow::anyhow!("Expected an array or object")),
387    };
388
389    let mut middlewares = Vec::new();
390    for item in arr {
391        // Check if it is a map with a single key that matches a known middleware
392        let known_name = if let serde_json::Value::Object(map) = &item {
393            if map.len() == 1 {
394                let (name, _) = map.iter().next().unwrap();
395                if is_known_middleware_name(name) {
396                    Some(name.clone())
397                } else {
398                    None
399                }
400            } else {
401                None
402            }
403        } else {
404            None
405        };
406
407        if let Some(name) = known_name {
408            match serde_json::from_value::<Middleware>(item.clone()) {
409                Ok(m) => middlewares.push(m),
410                Err(e) => {
411                    return Err(anyhow::anyhow!(
412                        "Failed to deserialize known middleware '{}': {}",
413                        name,
414                        e
415                    ))
416                }
417            }
418        } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
419            middlewares.push(m);
420        } else if let serde_json::Value::Object(map) = &item {
421            if map.len() == 1 {
422                let (name, config) = map.iter().next().unwrap();
423                middlewares.push(Middleware::Custom {
424                    name: name.clone(),
425                    config: config.clone(),
426                });
427            } else {
428                return Err(anyhow::anyhow!(
429                    "Invalid middleware configuration: {:?}",
430                    item
431                ));
432            }
433        } else {
434            return Err(anyhow::anyhow!(
435                "Invalid middleware configuration: {:?}",
436                item
437            ));
438        }
439    }
440    Ok(middlewares)
441}
442
443/// An enumeration of all supported endpoint types.
444/// `#[serde(rename_all = "lowercase")]` ensures that the keys in the config (e.g., "kafka")
445/// match the enum variants.
446///
447/// # Examples
448///
449/// Configuring a Fanout endpoint in YAML:
450/// ```
451/// use mq_bridge::models::{Endpoint, EndpointType};
452///
453/// let yaml = r#"
454/// fanout:
455///   - memory: { topic: "out1" }
456///   - memory: { topic: "out2" }
457/// "#;
458///
459/// let endpoint: Endpoint = serde_yaml_ng::from_str(yaml).unwrap();
460/// if let EndpointType::Fanout(targets) = endpoint.endpoint_type {
461///     assert_eq!(targets.len(), 2);
462/// }
463/// ```
464#[derive(Debug, Deserialize, Serialize, Clone, Default)]
465#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
466#[serde(rename_all = "lowercase")]
467pub enum EndpointType {
468    Aws(AwsConfig),
469    Kafka(KafkaConfig),
470    Nats(NatsConfig),
471    File(FileConfig),
472    Static(String),
473    Ref(String),
474    Memory(MemoryConfig),
475    Sled(SledConfig),
476    Amqp(AmqpConfig),
477    MongoDb(MongoDbConfig),
478    Mqtt(MqttConfig),
479    Http(HttpConfig),
480    WebSocket(WebSocketConfig),
481    IbmMq(IbmMqConfig),
482    ZeroMq(ZeroMqConfig),
483    Grpc(GrpcConfig),
484    Sqlx(SqlxConfig),
485    Fanout(Vec<Endpoint>),
486    Switch(SwitchConfig),
487    Response(ResponseConfig),
488    Reader(Box<Endpoint>),
489    Custom {
490        name: String,
491        config: serde_json::Value,
492    },
493    #[default]
494    Null,
495}
496
497impl EndpointType {
498    pub fn name(&self) -> &'static str {
499        match self {
500            EndpointType::Aws(_) => "aws",
501            EndpointType::Kafka(_) => "kafka",
502            EndpointType::Nats(_) => "nats",
503            EndpointType::File(_) => "file",
504            EndpointType::Static(_) => "static",
505            EndpointType::Ref(_) => "ref",
506            EndpointType::Memory(_) => "memory",
507            EndpointType::Sled(_) => "sled",
508            EndpointType::Amqp(_) => "amqp",
509            EndpointType::MongoDb(_) => "mongodb",
510            EndpointType::Mqtt(_) => "mqtt",
511            EndpointType::Http(_) => "http",
512            EndpointType::WebSocket(_) => "websocket",
513            EndpointType::IbmMq(_) => "ibmmq",
514            EndpointType::ZeroMq(_) => "zeromq",
515            EndpointType::Grpc(_) => "grpc",
516            EndpointType::Sqlx(_) => "sqlx",
517            EndpointType::Fanout(_) => "fanout",
518            EndpointType::Switch(_) => "switch",
519            EndpointType::Response(_) => "response",
520            EndpointType::Reader(_) => "reader",
521            EndpointType::Custom { .. } => "custom",
522            EndpointType::Null => "null",
523        }
524    }
525
526    pub fn is_core(&self) -> bool {
527        matches!(
528            self,
529            EndpointType::File(_)
530                | EndpointType::Static(_)
531                | EndpointType::Ref(_)
532                | EndpointType::Memory(_)
533                | EndpointType::Fanout(_)
534                | EndpointType::Switch(_)
535                | EndpointType::Response(_)
536                | EndpointType::Reader(_)
537                | EndpointType::Custom { .. }
538                | EndpointType::Null
539        )
540    }
541}
542
543/// An enumeration of all supported middleware types.
544#[derive(Debug, Deserialize, Serialize, Clone)]
545#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
546#[serde(rename_all = "snake_case")]
547pub enum Middleware {
548    Deduplication(DeduplicationMiddleware),
549    Metrics(MetricsMiddleware),
550    Dlq(Box<DeadLetterQueueMiddleware>),
551    Retry(RetryMiddleware),
552    RandomPanic(RandomPanicMiddleware),
553    Delay(DelayMiddleware),
554    WeakJoin(WeakJoinMiddleware),
555    Limiter(LimiterMiddleware),
556    Buffer(BufferMiddleware),
557    CookieJar(CookieJarMiddleware),
558    Custom {
559        name: String,
560        config: serde_json::Value,
561    },
562}
563
564/// Deduplication middleware configuration.
565///
566/// Prevents duplicate messages from being processed using a Sled-backed database.
567/// Messages are identified by their deduplication key and removed after the TTL expires.
568#[derive(Debug, Deserialize, Serialize, Clone)]
569#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
570#[serde(deny_unknown_fields)]
571pub struct DeduplicationMiddleware {
572    /// Path to the Sled database directory.
573    pub sled_path: String,
574    /// Time-to-live for deduplication entries in seconds.
575    pub ttl_seconds: u64,
576}
577
578/// Metrics middleware configuration.
579///
580/// Enables collection and reporting of message processing metrics such as throughput,
581/// latency, and error rates. The presence of this middleware in the configuration
582/// enables metrics collection for the endpoint.
583///
584/// Metrics are typically exported via Prometheus or similar monitoring systems.
585#[derive(Debug, Deserialize, Serialize, Clone)]
586#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
587#[serde(deny_unknown_fields)]
588pub struct MetricsMiddleware {}
589
590/// Dead-Letter Queue (DLQ) middleware configuration.
591///
592/// Routes failed messages to a designated endpoint for later analysis and recovery.
593/// It is recommended to pair this with the Retry middleware to avoid message loss.
594///
595/// Failed messages are sent to the configured endpoint when they are exhausted after retry attempts.
596#[derive(Debug, Deserialize, Serialize, Clone, Default)]
597#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
598#[serde(deny_unknown_fields)]
599pub struct DeadLetterQueueMiddleware {
600    /// The endpoint to send failed messages to.
601    pub endpoint: Endpoint,
602}
603
604/// Retry middleware configuration.
605///
606/// Implements exponential backoff retry logic for failed message processing.
607/// Failed messages are automatically retried with increasing delays between attempts.
608#[derive(Debug, Deserialize, Serialize, Clone, Default)]
609#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
610#[serde(deny_unknown_fields)]
611pub struct RetryMiddleware {
612    /// Maximum number of retry attempts. Defaults to 3.
613    #[serde(default = "default_retry_attempts")]
614    pub max_attempts: usize,
615    /// Initial retry interval in milliseconds. Defaults to 100ms.
616    #[serde(default = "default_initial_interval_ms")]
617    pub initial_interval_ms: u64,
618    /// Maximum retry interval in milliseconds. Defaults to 5000ms.
619    #[serde(default = "default_max_interval_ms")]
620    pub max_interval_ms: u64,
621    /// Multiplier for exponential backoff. Defaults to 2.0.
622    #[serde(default = "default_multiplier")]
623    pub multiplier: f64,
624}
625
626/// Delay middleware configuration.
627///
628/// Introduces a fixed delay before processing each message.
629/// Useful for rate limiting, testing, or allowing time for dependent systems to become ready.
630#[derive(Debug, Deserialize, Serialize, Clone)]
631#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
632#[serde(deny_unknown_fields)]
633pub struct DelayMiddleware {
634    /// Delay duration in milliseconds.
635    pub delay_ms: u64,
636}
637
638/// Throughput limiter middleware configuration.
639///
640/// Applies a best-effort pacing delay so an endpoint does not exceed the configured
641/// message rate. For batch operations the limiter accounts for the number of messages
642/// in the batch, not just the batch count.
643#[derive(Debug, Deserialize, Serialize, Clone)]
644#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
645#[serde(deny_unknown_fields)]
646pub struct LimiterMiddleware {
647    /// Target throughput in messages per second. Must be greater than zero.
648    pub messages_per_second: f64,
649}
650
651/// Publisher-side buffer middleware configuration.
652///
653/// Buffers outbound messages briefly so multiple single-message sends can be
654/// forwarded as one `send_batch` call to the wrapped publisher.
655#[derive(Debug, Deserialize, Serialize, Clone)]
656#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
657#[serde(deny_unknown_fields)]
658pub struct BufferMiddleware {
659    /// Maximum number of messages to accumulate before flushing immediately.
660    pub max_messages: usize,
661    /// Maximum time to wait before flushing a non-full buffer.
662    pub max_delay_ms: u64,
663}
664
665/// Cookie/session jar middleware configuration.
666///
667/// Optimized for HTTP by default: it can read `cookie` and `set-cookie` metadata,
668/// persist session cookies, and inject them into later outgoing requests.
669///
670/// The middleware can also capture arbitrary metadata values into the same session store
671/// and optionally expose stored values back into message metadata.
672#[derive(Debug, Deserialize, Serialize, Clone)]
673#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
674#[serde(deny_unknown_fields)]
675pub struct CookieJarMiddleware {
676    /// Optional shared scope name. When set, middleware instances using the same scope
677    /// share one session store across endpoints/routes in the process.
678    #[serde(default)]
679    pub shared_scope: Option<String>,
680    /// Metadata key used to read/write HTTP Cookie headers. Defaults to `cookie`.
681    #[serde(default = "default_cookie_metadata_key")]
682    pub cookie_metadata_key: String,
683    /// Metadata key used to read HTTP Set-Cookie responses. Defaults to `set-cookie`.
684    #[serde(default = "default_set_cookie_metadata_key")]
685    pub set_cookie_metadata_key: String,
686    /// Additional metadata keys to persist into the session value store.
687    #[serde(default)]
688    pub capture_metadata_keys: Vec<String>,
689    /// Optional metadata prefix used to export stored values back onto each message.
690    ///
691    /// Exported keys use `PREFIXcookie.<name>` for cookies and `PREFIXvalue.<name>` for
692    /// captured generic values.
693    #[serde(default)]
694    pub export_metadata_prefix: Option<String>,
695    /// Optional mapping of outgoing metadata keys to stored session value names.
696    ///
697    /// Example: `{ "authorization": "access_token" }` copies the stored value
698    /// `access_token` into outgoing metadata key `authorization` when not already present.
699    #[serde(default)]
700    pub inject_metadata: HashMap<String, String>,
701}
702
703impl Default for CookieJarMiddleware {
704    fn default() -> Self {
705        Self {
706            shared_scope: None,
707            cookie_metadata_key: default_cookie_metadata_key(),
708            set_cookie_metadata_key: default_set_cookie_metadata_key(),
709            capture_metadata_keys: Vec::new(),
710            export_metadata_prefix: None,
711            inject_metadata: HashMap::new(),
712        }
713    }
714}
715
716/// Weak Join middleware configuration.
717///
718/// Groups and correlates messages based on a metadata key, waiting for a specified number
719/// of messages within a timeout window before processing them as a batch.
720/// Messages that exceed the timeout are processed individually.
721#[derive(Debug, Deserialize, Serialize, Clone)]
722#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
723#[serde(deny_unknown_fields)]
724pub struct WeakJoinMiddleware {
725    /// The metadata key to group messages by (e.g., "correlation_id").
726    pub group_by: String,
727    /// The number of messages to wait for.
728    pub expected_count: usize,
729    /// Timeout in milliseconds.
730    pub timeout_ms: u64,
731}
732
733/// Fault injection modes for testing error handling and recovery mechanisms.
734#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
735#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
736#[serde(rename_all = "snake_case")]
737pub enum FaultMode {
738    /// Trigger a thread panic.
739    #[default]
740    Panic,
741    /// Simulate a connection/network error (retryable).
742    Disconnect,
743    /// Simulate a timeout error (retryable).
744    Timeout,
745    /// Simulate a JSON format error (non-retryable).
746    JsonFormatError,
747    /// Return a negative acknowledgement (for handlers).
748    Nack,
749}
750
751impl std::fmt::Display for FaultMode {
752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753        match self {
754            FaultMode::Panic => write!(f, "panic"),
755            FaultMode::Disconnect => write!(f, "disconnect"),
756            FaultMode::Timeout => write!(f, "timeout"),
757            FaultMode::JsonFormatError => write!(f, "json_format_error"),
758            FaultMode::Nack => write!(f, "nack"),
759        }
760    }
761}
762
763/// Middleware for fault injection testing.
764///
765/// Allows testing error handling and recovery mechanisms by injecting faults
766/// at specific points in the message processing pipeline.
767///
768/// # Examples
769///
770/// ```yaml
771/// random_panic:
772///   mode: panic
773///   trigger_on_message: 3  # Trigger on the 3rd message
774/// ```
775#[derive(Debug, Clone, Serialize, Deserialize, Default)]
776#[serde(deny_unknown_fields)]
777#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
778pub struct RandomPanicMiddleware {
779    /// The type of fault to inject.
780    #[serde(default)]
781    pub mode: FaultMode,
782    /// Trigger the fault on the Nth message (1-indexed). None = trigger on every message.
783    #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
784    #[serde(default)]
785    pub trigger_on_message: Option<usize>,
786    /// Enable/disable the fault injection without removing the configuration.
787    #[serde(default = "default_true")]
788    pub enabled: bool,
789    #[serde(skip, default = "default_atomic_usize_arc")]
790    #[cfg_attr(feature = "schema", schemars(skip))]
791    pub message_count: Arc<AtomicUsize>,
792}
793
794fn default_true() -> bool {
795    true
796}
797
798fn default_atomic_usize_arc() -> Arc<AtomicUsize> {
799    Arc::new(AtomicUsize::new(0))
800}
801
802fn deserialize_null_as_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
803where
804    D: Deserializer<'de>,
805{
806    let opt = Option::<bool>::deserialize(deserializer)?;
807    Ok(opt.unwrap_or(false))
808}
809
810// --- AWS Specific Configuration ---
811#[derive(Debug, Deserialize, Serialize, Clone, Default)]
812#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
813#[serde(deny_unknown_fields)]
814pub struct AwsConfig {
815    /// The SQS queue URL. Required for Consumer. Optional for Publisher if `topic_arn` is set. If it contains userinfo, it will be treated as a secret.
816    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
817    pub queue_url: Option<String>,
818    /// (Publisher only) The SNS topic ARN.
819    pub topic_arn: Option<String>,
820    /// AWS Region (e.g., "us-east-1").
821    pub region: Option<String>,
822    /// Custom endpoint URL (e.g., for LocalStack).
823    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
824    pub endpoint_url: Option<String>,
825    /// AWS Access Key ID.
826    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
827    pub access_key: Option<String>,
828    /// AWS Secret Access Key.
829    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
830    pub secret_key: Option<String>,
831    /// AWS Session Token.
832    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
833    pub session_token: Option<String>,
834    /// (Consumer only) Maximum number of messages to receive in a batch (1-10).
835    #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
836    pub max_messages: Option<i32>,
837    /// (Consumer only) Wait time for long polling in seconds (0-20).
838    #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
839    pub wait_time_seconds: Option<i32>,
840    /// Use binary payloads in SQS/SNS messages.
841    #[serde(default)]
842    pub binary_payload_mode: bool,
843}
844
845impl AwsConfig {
846    /// Creates a new AWS configuration with default settings.
847    pub fn new() -> Self {
848        Self::default()
849    }
850
851    pub fn with_queue_url(mut self, queue_url: impl Into<String>) -> Self {
852        self.queue_url = Some(queue_url.into());
853        self
854    }
855
856    pub fn with_topic_arn(mut self, topic_arn: impl Into<String>) -> Self {
857        self.topic_arn = Some(topic_arn.into());
858        self
859    }
860
861    pub fn with_region(mut self, region: impl Into<String>) -> Self {
862        self.region = Some(region.into());
863        self
864    }
865
866    pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
867        self.endpoint_url = Some(endpoint_url.into());
868        self
869    }
870
871    pub fn with_credentials(
872        mut self,
873        access_key: impl Into<String>,
874        secret_key: impl Into<String>,
875    ) -> Self {
876        self.access_key = Some(access_key.into());
877        self.secret_key = Some(secret_key.into());
878        self
879    }
880}
881
882// --- Kafka Specific Configuration ---
883
884/// General Kafka connection configuration.
885#[derive(Debug, Deserialize, Serialize, Clone, Default)]
886#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
887#[serde(deny_unknown_fields)]
888pub struct KafkaConfig {
889    /// Comma-separated list of Kafka broker URLs. If it contains userinfo, it will be treated as a secret.
890    #[serde(alias = "brokers")]
891    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
892    pub url: String,
893    /// The Kafka topic to produce to or consume from.
894    pub topic: Option<String>,
895    /// Optional username for SASL authentication.
896    pub username: Option<String>,
897    /// Optional password for SASL authentication.
898    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
899    pub password: Option<String>,
900    /// TLS configuration.
901    #[serde(default)]
902    pub tls: TlsConfig,
903    /// (Consumer only) Consumer group ID.
904    /// If not provided, the consumer acts in **Subscriber mode**: it generates a unique, ephemeral group ID and starts consuming from the latest offset.
905    pub group_id: Option<String>,
906    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
907    #[serde(default)]
908    pub delayed_ack: bool,
909    /// (Publisher only) Additional librdkafka producer configuration options (key-value pairs).
910    #[serde(default)]
911    pub producer_options: Option<Vec<(String, String)>>,
912    /// (Consumer only) Additional librdkafka consumer configuration options (key-value pairs).
913    #[serde(default)]
914    pub consumer_options: Option<Vec<(String, String)>>,
915}
916
917impl KafkaConfig {
918    /// Creates a new Kafka configuration with the specified broker URL.
919    pub fn new(url: impl Into<String>) -> Self {
920        Self {
921            url: url.into(),
922            ..Default::default()
923        }
924    }
925
926    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
927        self.topic = Some(topic.into());
928        self
929    }
930
931    pub fn with_group_id(mut self, group_id: impl Into<String>) -> Self {
932        self.group_id = Some(group_id.into());
933        self
934    }
935
936    pub fn with_credentials(
937        mut self,
938        username: impl Into<String>,
939        password: impl Into<String>,
940    ) -> Self {
941        self.username = Some(username.into());
942        self.password = Some(password.into());
943        self
944    }
945
946    pub fn with_producer_option(
947        mut self,
948        key: impl Into<String>,
949        value: impl Into<String>,
950    ) -> Self {
951        let options = self.producer_options.get_or_insert_with(Vec::new);
952        options.push((key.into(), value.into()));
953        self
954    }
955
956    pub fn with_consumer_option(
957        mut self,
958        key: impl Into<String>,
959        value: impl Into<String>,
960    ) -> Self {
961        let options = self.consumer_options.get_or_insert_with(Vec::new);
962        options.push((key.into(), value.into()));
963        self
964    }
965}
966
967// --- Sled Specific Configuration ---
968
969/// General Sled database configuration
970#[derive(Debug, Deserialize, Serialize, Clone, Default)]
971#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
972#[serde(deny_unknown_fields)]
973pub struct SledConfig {
974    /// Path to the Sled database directory.
975    pub path: String,
976    /// The tree name to use as a queue. Defaults to "default".
977    pub tree: Option<String>,
978    /// (Consumer only) If true, start reading from the beginning of the tree.
979    #[serde(default)]
980    pub read_from_start: bool,
981    /// (Consumer only) If true, delete messages after processing (Queue mode).
982    #[serde(default)]
983    pub delete_after_read: bool,
984}
985
986impl SledConfig {
987    /// Creates a new Sled configuration with the specified database path.
988    pub fn new(path: impl Into<String>) -> Self {
989        Self {
990            path: path.into(),
991            ..Default::default()
992        }
993    }
994
995    pub fn with_tree(mut self, tree: impl Into<String>) -> Self {
996        self.tree = Some(tree.into());
997        self
998    }
999
1000    pub fn with_read_from_start(mut self, read_from_start: bool) -> Self {
1001        self.read_from_start = read_from_start;
1002        self
1003    }
1004}
1005
1006/// Format for messages written to or read from a file.
1007#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1008#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1009#[serde(rename_all = "snake_case")]
1010pub enum FileFormat {
1011    /// The full `CanonicalMessage` is serialized to JSON. Payload is a byte array.
1012    #[default]
1013    Normal,
1014    /// The full `CanonicalMessage` is serialized to JSON. Payload is rendered as a JSON value if possible.
1015    Json,
1016    /// The full `CanonicalMessage` is serialized to JSON. Payload is rendered as a string if possible.
1017    Text,
1018    /// The raw payload of the message is written. For consumers, the line is read as raw bytes.
1019    Raw,
1020}
1021
1022// --- File Specific Configuration ---
1023
1024#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1025#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1026pub struct FileConfig {
1027    /// Path to the file.
1028    pub path: String,
1029    /// Optional delimiter for messages. Defaults to newline ("\n").
1030    /// Can be a string or a hex sequence (e.g. "0x00").
1031    /// Currently only single-byte delimiters are supported.
1032    pub delimiter: Option<String>,
1033    /// The consumption mode. If not specified, defaults to `consume`.
1034    /// For publishers, this setting is ignored.
1035    #[serde(flatten, default)]
1036    pub mode: Option<FileConsumerMode>,
1037    /// The format for writing messages to the file (Publisher) or interpreting them (Consumer). Defaults to `normal`.
1038    #[serde(default)]
1039    pub format: FileFormat,
1040}
1041
1042#[derive(Debug, Clone, Deserialize, Serialize)]
1043#[serde(tag = "mode", rename_all = "snake_case")]
1044#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1045pub enum FileConsumerMode {
1046    /// **Queue Mode**: Standard point-to-point consumption. Reads from the start
1047    /// of the file. If `delete` is true, processed lines are physically removed
1048    /// from the file once they are successfully acknowledged.
1049    Consume {
1050        #[serde(default)]
1051        delete: bool,
1052    },
1053    /// **Broadcast Mode**: Pub-sub style consumption. Tails the file by starting
1054    /// at the current end. If `delete` is true, lines are removed only after
1055    /// all local application subscribers for this specific file have acknowledged them.
1056    Subscribe {
1057        #[serde(default)]
1058        delete: bool,
1059    },
1060    /// **Persistent Mode**: Consumption with external offset tracking.
1061    /// Saves the last read byte position to a `.offset` file identified by the `group_id`.
1062    /// This allows the consumer to resume exactly where it left off after a restart
1063    /// without deleting data or requiring the bridge to stay running.
1064    GroupSubscribe {
1065        /// The consumer group ID that is used for offset tracking. Should be unique.
1066        group_id: String,
1067        /// If true, starts reading from the end of the file if no offset is stored.
1068        /// If false, starts reading from the beginning.
1069        #[serde(default)]
1070        read_from_tail: bool,
1071    },
1072}
1073
1074impl Default for FileConsumerMode {
1075    fn default() -> Self {
1076        Self::Consume { delete: false }
1077    }
1078}
1079
1080impl FileConfig {
1081    /// Creates a new File configuration with the specified path.
1082    pub fn new(path: impl Into<String>) -> Self {
1083        Self {
1084            path: path.into(),
1085            mode: Some(FileConsumerMode::default()),
1086            delimiter: None,
1087            format: FileFormat::default(),
1088        }
1089    }
1090
1091    pub fn with_mode(mut self, mode: FileConsumerMode) -> Self {
1092        self.mode = Some(mode);
1093        self
1094    }
1095
1096    /// Returns the effective consumer mode, defaulting to `Consume` if not set.
1097    pub fn effective_mode(&self) -> FileConsumerMode {
1098        self.mode.clone().unwrap_or_default()
1099    }
1100}
1101
1102// --- NATS Specific Configuration ---
1103
1104/// General NATS connection configuration.
1105#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1106#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1107#[serde(deny_unknown_fields)]
1108pub struct NatsConfig {
1109    /// Comma-separated list of NATS server URLs (e.g., "nats://localhost:4222,nats://localhost:4223"). If it contains userinfo, it will be treated as a secret.
1110    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1111    pub url: String,
1112    /// The NATS subject to publish to or subscribe to.
1113    pub subject: Option<String>,
1114    /// (Consumer only). The JetStream stream name. Required for Consumers.
1115    pub stream: Option<String>,
1116    /// Optional username for authentication.
1117    pub username: Option<String>,
1118    /// Optional password for authentication.
1119    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1120    pub password: Option<String>,
1121    /// TLS configuration.
1122    #[serde(default)]
1123    pub tls: TlsConfig,
1124    /// Optional token for authentication.
1125    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1126    pub token: Option<String>,
1127    /// (Publisher only) If true, the publisher uses the request-reply pattern.
1128    /// It sends a request and waits for a response (using `core_client.request_with_headers()`).
1129    /// Defaults to false.
1130    #[serde(default)]
1131    pub request_reply: bool,
1132    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
1133    pub request_timeout_ms: Option<u64>,
1134    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
1135    #[serde(default)]
1136    pub delayed_ack: bool,
1137    /// If no_jetstream: true, use Core NATS (fire-and-forget) instead of JetStream. Defaults to false.
1138    #[serde(default)]
1139    pub no_jetstream: bool,
1140    /// (Consumer only) If true, use ephemeral **Subscriber mode**. Defaults to false (durable consumer).
1141    #[serde(default)]
1142    pub subscriber_mode: bool,
1143    /// (Publisher only) Maximum number of messages in the stream (if created by the bridge). Defaults to 1,000,000.
1144    pub stream_max_messages: Option<i64>,
1145    /// (Consumer only) The delivery policy for the consumer. Defaults to "all".
1146    pub deliver_policy: Option<NatsDeliverPolicy>,
1147    /// (Publisher only) Maximum total bytes in the stream (if created by the bridge). Defaults to 1GB.
1148    pub stream_max_bytes: Option<i64>,
1149    /// (Consumer only) Number of messages to prefetch from the consumer. Defaults to 10000.
1150    pub prefetch_count: Option<usize>,
1151}
1152
1153#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1154#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1155#[serde(rename_all = "snake_case")]
1156pub enum NatsDeliverPolicy {
1157    #[default]
1158    All,
1159    Last,
1160    New,
1161    LastPerSubject,
1162}
1163
1164impl NatsConfig {
1165    /// Creates a new NATS configuration with the specified server URL.
1166    pub fn new(url: impl Into<String>) -> Self {
1167        Self {
1168            url: url.into(),
1169            ..Default::default()
1170        }
1171    }
1172
1173    pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
1174        self.subject = Some(subject.into());
1175        self
1176    }
1177
1178    pub fn with_stream(mut self, stream: impl Into<String>) -> Self {
1179        self.stream = Some(stream.into());
1180        self
1181    }
1182
1183    pub fn with_deliver_policy(mut self, policy: NatsDeliverPolicy) -> Self {
1184        self.deliver_policy = Some(policy);
1185        self
1186    }
1187
1188    pub fn with_credentials(
1189        mut self,
1190        username: impl Into<String>,
1191        password: impl Into<String>,
1192    ) -> Self {
1193        self.username = Some(username.into());
1194        self.password = Some(password.into());
1195        self
1196    }
1197}
1198
1199#[derive(Debug, Serialize, Deserialize, Clone, Default)]
1200#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1201#[serde(deny_unknown_fields)]
1202pub struct MemoryConfig {
1203    /// The topic name for the in-memory channel.
1204    pub topic: String,
1205    /// The capacity of the channel. Defaults to 100.
1206    pub capacity: Option<usize>,
1207    /// (Publisher only) If true, send() waits for a response.
1208    #[serde(default)]
1209    pub request_reply: bool,
1210    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
1211    pub request_timeout_ms: Option<u64>,
1212    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false (queue).
1213    #[serde(default)]
1214    pub subscribe_mode: bool,
1215    /// (Consumer only) If true, enables NACK support (re-queuing), which requires cloning messages. Defaults to false.
1216    #[serde(default)]
1217    pub enable_nack: bool,
1218}
1219
1220impl MemoryConfig {
1221    pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
1222        Self {
1223            topic: topic.into(),
1224            capacity,
1225            ..Default::default()
1226        }
1227    }
1228    pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
1229        Self {
1230            subscribe_mode,
1231            ..self
1232        }
1233    }
1234
1235    pub fn with_request_reply(mut self, request_reply: bool) -> Self {
1236        self.request_reply = request_reply;
1237        self
1238    }
1239}
1240
1241// --- AMQP Specific Configuration ---
1242
1243/// General AMQP connection configuration.
1244#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1245#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1246#[serde(deny_unknown_fields)]
1247pub struct AmqpConfig {
1248    /// AMQP connection URI. The `lapin` client connects to a single host specified in the URI. If it contains userinfo, it will be treated as a secret.
1249    /// For high availability, provide the address of a load balancer or use DNS resolution
1250    /// that points to multiple brokers. Example: "amqp://localhost:5672/vhost".
1251    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1252    pub url: String,
1253    /// The AMQP queue name.
1254    pub queue: Option<String>,
1255    /// (Consumer only) If true, act as a **Subscriber** (fan-out). Defaults to false.
1256    #[serde(default)]
1257    pub subscribe_mode: bool,
1258    /// Optional username for authentication.
1259    pub username: Option<String>,
1260    /// Optional password for authentication.
1261    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1262    pub password: Option<String>,
1263    /// TLS configuration.
1264    #[serde(default)]
1265    pub tls: TlsConfig,
1266    /// The exchange to publish to or bind the queue to.
1267    pub exchange: Option<String>,
1268    /// (Consumer only) Number of messages to prefetch. Defaults to 100.
1269    pub prefetch_count: Option<u16>,
1270    /// If true, declare queues as non-durable (transient). Defaults to false. Affects both Consumer (queue durability) and Publisher (message persistence).
1271    #[serde(default)]
1272    pub no_persistence: bool,
1273    /// (Publisher only) If true, do not attempt to declare the queue. Assumes the queue already exists. Defaults to false.
1274    #[serde(default)]
1275    pub no_declare_queue: bool,
1276    /// (Publisher only) If true, do not wait for an acknowledgement when sending to broker. Defaults to false.
1277    #[serde(default)]
1278    pub delayed_ack: bool,
1279}
1280
1281impl AmqpConfig {
1282    /// Creates a new AMQP configuration with the specified connection URL.
1283    pub fn new(url: impl Into<String>) -> Self {
1284        Self {
1285            url: url.into(),
1286            ..Default::default()
1287        }
1288    }
1289
1290    pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1291        self.queue = Some(queue.into());
1292        self
1293    }
1294
1295    pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
1296        self.exchange = Some(exchange.into());
1297        self
1298    }
1299
1300    pub fn with_credentials(
1301        mut self,
1302        username: impl Into<String>,
1303        password: impl Into<String>,
1304    ) -> Self {
1305        self.username = Some(username.into());
1306        self.password = Some(password.into());
1307        self
1308    }
1309}
1310
1311/// MongoDB message storage format.
1312///
1313/// Determines how messages are stored and retrieved from MongoDB collections.
1314#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1315#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1316#[serde(rename_all = "lowercase")]
1317pub enum MongoDbFormat {
1318    #[default]
1319    Normal,
1320    Json,
1321    Text,
1322    Raw,
1323}
1324
1325// --- MongoDB Specific Configuration ---
1326
1327/// General MongoDB connection configuration.
1328#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1329#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1330#[serde(deny_unknown_fields)]
1331pub struct MongoDbConfig {
1332    /// MongoDB connection string URI. Can contain a comma-separated list of hosts for a replica set. If it contains userinfo, it will be treated as a secret.
1333    /// Credentials provided via the separate `username` and `password` fields take precedence over any credentials embedded in the URL.
1334    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1335    pub url: String,
1336    /// The MongoDB collection name.
1337    pub collection: Option<String>,
1338    /// Optional username. Takes precedence over any credentials embedded in the `url`.
1339    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
1340    pub username: Option<String>,
1341    /// Optional password. Takes precedence over any credentials embedded in the `url`.
1342    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1343    /// Use embedded URL credentials for simple one-off connections but prefer explicit username/password fields (or environment-sourced secrets) for clarity and secret management in production.
1344    pub password: Option<String>,
1345    /// TLS configuration.
1346    #[serde(default)]
1347    pub tls: TlsConfig,
1348    /// The database name.
1349    pub database: String,
1350    /// (Consumer only) Polling interval in milliseconds for the consumer (when not using Change Streams). Defaults to 100ms.
1351    pub polling_interval_ms: Option<u64>,
1352    /// (Publisher only) Polling interval in milliseconds for the publisher when waiting for a reply. Defaults to 50ms.
1353    pub reply_polling_ms: Option<u64>,
1354    /// (Publisher only) If true, the publisher will wait for a response in a dedicated collection. Defaults to false.
1355    #[serde(default)]
1356    pub request_reply: bool,
1357    /// (Consumer only) If true, use Change Streams (**Subscriber mode**). Defaults to false (polling/consumer mode).
1358    #[serde(default)]
1359    pub change_stream: bool,
1360    /// (Publisher only) Timeout for request-reply operations in milliseconds. Defaults to 30000ms.
1361    pub request_timeout_ms: Option<u64>,
1362    /// (Publisher only) TTL in seconds for documents created by the publisher. If set, a TTL index is created.
1363    pub ttl_seconds: Option<u64>,
1364    /// (Publisher only) If set, creates a capped collection with this size in bytes.
1365    pub capped_size_bytes: Option<i64>,
1366    /// Format for storing messages. Defaults to Normal.
1367    #[serde(default)]
1368    pub format: MongoDbFormat,
1369    /// The ID used for the cursor in sequenced mode. If not provided, consumption starts from the current sequence (ephemeral).
1370    pub cursor_id: Option<String>,
1371    /// (Consumer only) Optional custom MongoDB query to filter messages. Provided as a JSON string (e.g., '{"type": "notification"}').
1372    pub receive_query: Option<String>,
1373    /// (Optional) Collection to store sequence counters and cursor positions. Defaults to the message collection if not set.
1374    pub meta_collection: Option<String>,
1375}
1376
1377impl MongoDbConfig {
1378    /// Creates a new MongoDB configuration with the specified URL and database name.
1379    pub fn new(url: impl Into<String>, database: impl Into<String>) -> Self {
1380        Self {
1381            url: url.into(),
1382            database: database.into(),
1383            ..Default::default()
1384        }
1385    }
1386
1387    pub fn with_collection(mut self, collection: impl Into<String>) -> Self {
1388        self.collection = Some(collection.into());
1389        self
1390    }
1391
1392    pub fn with_credentials(
1393        mut self,
1394        username: impl Into<String>,
1395        password: impl Into<String>,
1396    ) -> Self {
1397        self.username = Some(username.into());
1398        self.password = Some(password.into());
1399        self
1400    }
1401
1402    pub fn with_change_stream(mut self, change_stream: bool) -> Self {
1403        self.change_stream = change_stream;
1404        self
1405    }
1406}
1407
1408// --- MQTT Specific Configuration ---
1409
1410/// General MQTT connection configuration.
1411#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1412#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1413#[serde(deny_unknown_fields)]
1414pub struct MqttConfig {
1415    /// MQTT broker URL (e.g., "tcp://localhost:1883"). Does not support multiple hosts. If it contains userinfo, it will be treated as a secret.
1416    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1417    pub url: String,
1418    /// The MQTT topic.
1419    pub topic: Option<String>,
1420    /// Optional username for authentication.
1421    pub username: Option<String>,
1422    /// Optional password for authentication.
1423    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1424    pub password: Option<String>,
1425    /// TLS configuration.
1426    #[serde(default)]
1427    pub tls: TlsConfig,
1428    /// Optional client ID. If not provided, one is generated or derived from route name.
1429    pub client_id: Option<String>,
1430    /// Capacity of the internal channel for incoming messages. Defaults to 100.
1431    pub queue_capacity: Option<usize>,
1432    /// Maximum number of inflight messages.
1433    pub max_inflight: Option<u16>,
1434    /// Quality of Service level (0, 1, or 2). Defaults to 1.
1435    pub qos: Option<u8>,
1436    /// (Consumer only) If true, start with a clean session. Defaults to false (persistent session). Setting this to true effectively enables **Subscriber mode** (ephemeral).
1437    #[serde(default = "default_clean_session")]
1438    pub clean_session: bool,
1439    /// Keep-alive interval in seconds. Defaults to 20.
1440    pub keep_alive_seconds: Option<u64>,
1441    /// MQTT protocol version (V3 or V5). Defaults to V5.
1442    #[serde(default)]
1443    pub protocol: MqttProtocol,
1444    /// Session expiry interval in seconds (MQTT v5 only).
1445    pub session_expiry_interval: Option<u32>,
1446    /// (Consumer only) If true, messages are acknowledged immediately upon receipt (auto-ack).
1447    /// If false (default), messages are acknowledged after processing (manual-ack).
1448    /// Note: This setting does not currently enable synchronous publishing (waiting for PubAck) for the MQTT publisher.
1449    #[serde(default)]
1450    pub delayed_ack: bool,
1451}
1452
1453impl MqttConfig {
1454    /// Creates a new MQTT configuration with the specified broker URL.
1455    pub fn new(url: impl Into<String>) -> Self {
1456        Self {
1457            url: url.into(),
1458            ..Default::default()
1459        }
1460    }
1461
1462    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1463        self.topic = Some(topic.into());
1464        self
1465    }
1466
1467    pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
1468        self.client_id = Some(client_id.into());
1469        self
1470    }
1471
1472    pub fn with_credentials(
1473        mut self,
1474        username: impl Into<String>,
1475        password: impl Into<String>,
1476    ) -> Self {
1477        self.username = Some(username.into());
1478        self.password = Some(password.into());
1479        self
1480    }
1481}
1482
1483/// MQTT protocol version.
1484///
1485/// Specifies which version of the MQTT protocol to use for connections.
1486#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1487#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1488#[serde(rename_all = "lowercase")]
1489pub enum MqttProtocol {
1490    #[default]
1491    V5,
1492    V3,
1493}
1494
1495// --- ZeroMQ Specific Configuration ---
1496
1497#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1498#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1499#[serde(deny_unknown_fields)]
1500pub struct ZeroMqConfig {
1501    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1502    /// The ZeroMQ URL (e.g., "tcp://127.0.0.1:5555").
1503    pub url: String,
1504    /// The socket type (PUSH, PULL, PUB, SUB, REQ, REP).
1505    #[serde(default)]
1506    pub socket_type: Option<ZeroMqSocketType>,
1507    /// (Consumer only) The ZeroMQ topic (for SUB sockets).
1508    pub topic: Option<String>,
1509    /// If true, bind to the address. If false, connect.
1510    #[serde(default)]
1511    pub bind: bool,
1512    /// Internal buffer size for the channel. Defaults to 128.
1513    #[serde(default)]
1514    pub internal_buffer_size: Option<usize>,
1515}
1516
1517impl ZeroMqConfig {
1518    /// Creates a new ZeroMQ configuration with the specified URL.
1519    pub fn new(url: impl Into<String>) -> Self {
1520        Self {
1521            url: url.into(),
1522            ..Default::default()
1523        }
1524    }
1525
1526    pub fn with_socket_type(mut self, socket_type: ZeroMqSocketType) -> Self {
1527        self.socket_type = Some(socket_type);
1528        self
1529    }
1530
1531    pub fn with_bind(mut self, bind: bool) -> Self {
1532        self.bind = bind;
1533        self
1534    }
1535}
1536
1537/// ZeroMQ socket type.
1538///
1539/// Defines the messaging pattern for ZeroMQ connections.
1540/// Different patterns support different communication paradigms (request-reply, publish-subscribe, etc.).
1541#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
1542#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1543#[serde(rename_all = "lowercase")]
1544pub enum ZeroMqSocketType {
1545    Push,
1546    Pull,
1547    Pub,
1548    Sub,
1549    Req,
1550    Rep,
1551}
1552
1553// --- gRPC Specific Configuration ---
1554
1555#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1556#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1557#[serde(deny_unknown_fields)]
1558pub struct GrpcConfig {
1559    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1560    /// The gRPC server URL (e.g., "http://localhost:50051" for client or "0.0.0.0:50051" for server mode).
1561    pub url: String,
1562    /// Topic / subject used for both subscribe and publish paths.
1563    pub topic: Option<String>,
1564    /// Timeout in milliseconds.
1565    /// - Client mode: used as the connection timeout and per-request deadline.
1566    /// - Server mode: applied as the per-request deadline on the embedded server.
1567    pub timeout_ms: Option<u64>,
1568    /// TLS configuration.
1569    #[serde(default)]
1570    pub tls: TlsConfig,
1571    /// If `true`, start an embedded tonic gRPC server that accepts incoming `Publish` /
1572    /// `PublishBatch` RPCs. If `false` (the default), connect to a remote server as a client.
1573    #[serde(default)]
1574    pub server_mode: bool,
1575    /// HTTP/2 stream-level initial window size in bytes. **Server-mode only.**
1576    #[serde(default)]
1577    pub initial_stream_window_size: Option<u32>,
1578    /// HTTP/2 connection-level initial window size in bytes. **Server-mode only.**
1579    #[serde(default)]
1580    pub initial_connection_window_size: Option<u32>,
1581    /// Maximum number of concurrent requests handled per connection. **Server-mode only.**
1582    #[serde(default)]
1583    pub concurrency_limit_per_connection: Option<usize>,
1584    /// HTTP/2 keepalive ping interval in milliseconds. **Server-mode only.** Default disabled
1585    #[serde(default)]
1586    pub http2_keepalive_interval_ms: Option<u64>,
1587    /// Timeout for a keepalive ping acknowledgement in milliseconds. **Server-mode only.**
1588    #[serde(default)]
1589    pub http2_keepalive_timeout_ms: Option<u64>,
1590    /// Maximum size of a decoded incoming message in bytes. **Server-mode only.** Default 4 MiB.
1591    #[serde(default)]
1592    pub max_decoding_message_size: Option<usize>,
1593}
1594
1595impl GrpcConfig {
1596    /// Creates a new gRPC configuration with the specified server URL.
1597    pub fn new(url: impl Into<String>) -> Self {
1598        Self {
1599            url: url.into(),
1600            ..Default::default()
1601        }
1602    }
1603
1604    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1605        self.topic = Some(topic.into());
1606        self
1607    }
1608
1609    /// Enable or disable server mode for this gRPC endpoint.
1610    pub fn with_server_mode(mut self, server_mode: bool) -> Self {
1611        self.server_mode = server_mode;
1612        self
1613    }
1614}
1615
1616// --- HTTP Specific Configuration ---
1617
1618/// General HTTP connection configuration.
1619#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1620#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1621#[serde(deny_unknown_fields)]
1622pub struct HttpConfig {
1623    /// For consumers, the listen address (e.g., "0.0.0.0:8080"). For publishers, the target URL.
1624    pub url: String,
1625    /// (Consumer only) Optional request path filter. If set, only requests whose URI path matches exactly are delivered to this consumer.
1626    pub path: Option<String>,
1627    /// (Optional) HTTP method. For publishers: the method to use (defaults to POST). For consumers: restrict to this method (others return 405).
1628    pub method: Option<String>,
1629    /// TLS configuration.
1630    #[serde(default)]
1631    pub tls: TlsConfig,
1632    /// (Consumer only) Number of worker threads to use. Defaults to 0 for unlimited.
1633    pub workers: Option<usize>,
1634    /// (Consumer only) Header key to extract the message ID from. Defaults to "message-id".
1635    pub message_id_header: Option<String>,
1636    /// Timeout for HTTP requests in milliseconds. For consumers, it's the request-reply timeout. For publishers, it's the timeout for each individual request. Defaults to 30000ms.
1637    pub request_timeout_ms: Option<u64>,
1638    /// (Consumer only) Internal buffer size for the channel. Defaults to 100.
1639    pub internal_buffer_size: Option<usize>,
1640    /// (Consumer only) If true, respond immediately with 202 Accepted without waiting for downstream processing. Defaults to false.
1641    #[serde(default)]
1642    pub fire_and_forget: bool,
1643    /// (Publisher only) The number of concurrent HTTP requests to send in a batch. Defaults to 20.
1644    #[serde(default, skip_serializing_if = "Option::is_none")]
1645    pub batch_concurrency: Option<usize>,
1646    /// (Publisher only) TCP keepalive timeout for the underlying connection pool in milliseconds. Defaults to 60000ms.
1647    #[serde(default, skip_serializing_if = "Option::is_none")]
1648    pub tcp_keepalive_ms: Option<u64>,
1649    /// (Publisher only) Timeout for idle connections in the connection pool in milliseconds. Defaults to 90000ms.
1650    #[serde(default, skip_serializing_if = "Option::is_none")]
1651    pub pool_idle_timeout_ms: Option<u64>,
1652    /// Enable gzip compression for request/response bodies exceeding the threshold. Defaults to false.
1653    #[serde(default)]
1654    pub compression_enabled: bool,
1655    /// Minimum message size in bytes to compress. Messages smaller than this are sent uncompressed. Defaults to 1024 bytes.
1656    #[serde(default)]
1657    pub compression_threshold_bytes: Option<usize>,
1658    /// HTTP Basic Authentication credentials (username, password). For consumers: validates incoming requests. For publishers: adds Authorization header.
1659    /// (Consumer only) Maximum number of concurrent requests to handle. Defaults to 100.
1660    pub concurrency_limit: Option<usize>,
1661    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1662    #[serde(
1663        default,
1664        skip_serializing_if = "Option::is_none",
1665        deserialize_with = "deserialize_basic_auth"
1666    )]
1667    pub basic_auth: Option<(String, String)>,
1668    /// Custom headers as key-value pairs (e.g., {"X-API-Key": "token123"}). Added to outgoing HTTP headers for both consumers and publishers.
1669    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1670    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
1671    pub custom_headers: HashMap<String, String>,
1672}
1673
1674/// WebSocket connection configuration.
1675#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1676#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1677#[serde(deny_unknown_fields)]
1678pub struct WebSocketConfig {
1679    /// For consumers, the listen address (e.g. "0.0.0.0:9000"). For publishers, the target URL.
1680    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1681    pub url: String,
1682    /// (Consumer only) Optional request path filter. If set, only upgrade requests whose URI path matches exactly are delivered to this consumer.
1683    pub path: Option<String>,
1684    /// (Consumer only) Header key to extract the message ID from the WebSocket handshake. Defaults to "message-id".
1685    pub message_id_header: Option<String>,
1686    /// (Consumer only) Internal buffer size for the channel. Defaults to 100.
1687    pub internal_buffer_size: Option<usize>,
1688}
1689
1690fn deserialize_basic_auth<'de, D>(deserializer: D) -> Result<Option<(String, String)>, D::Error>
1691where
1692    D: Deserializer<'de>,
1693{
1694    let val = serde_json::Value::deserialize(deserializer)?;
1695    match val {
1696        serde_json::Value::Null => Ok(None),
1697        serde_json::Value::Array(arr) => {
1698            if arr.len() != 2 {
1699                return Err(serde::de::Error::custom("basic_auth must have 2 elements"));
1700            }
1701            let u = arr[0]
1702                .as_str()
1703                .ok_or_else(|| serde::de::Error::custom("basic_auth[0] must be string"))?
1704                .to_string();
1705            let p = arr[1]
1706                .as_str()
1707                .ok_or_else(|| serde::de::Error::custom("basic_auth[1] must be string"))?
1708                .to_string();
1709            Ok(Some((u, p)))
1710        }
1711        serde_json::Value::Object(map) => {
1712            let u = map
1713                .get("0")
1714                .and_then(|v| v.as_str())
1715                .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '0'"))?
1716                .to_string();
1717            let p = map
1718                .get("1")
1719                .and_then(|v| v.as_str())
1720                .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '1'"))?
1721                .to_string();
1722            Ok(Some((u, p)))
1723        }
1724        _ => Err(serde::de::Error::custom("invalid type for basic_auth")),
1725    }
1726}
1727
1728impl HttpConfig {
1729    /// Creates a new HTTP configuration with the specified URL.
1730    pub fn new(url: impl Into<String>) -> Self {
1731        Self {
1732            url: url.into(),
1733            ..Default::default()
1734        }
1735    }
1736
1737    pub fn with_workers(mut self, workers: usize) -> Self {
1738        self.workers = Some(workers);
1739        self
1740    }
1741
1742    pub fn with_method(mut self, method: impl Into<String>) -> Self {
1743        self.method = Some(method.into());
1744        self
1745    }
1746
1747    pub fn with_path(mut self, path: impl Into<String>) -> Self {
1748        self.path = Some(path.into());
1749        self
1750    }
1751}
1752
1753impl WebSocketConfig {
1754    /// Creates a new WebSocket configuration with the specified URL.
1755    pub fn new(url: impl Into<String>) -> Self {
1756        Self {
1757            url: url.into(),
1758            ..Default::default()
1759        }
1760    }
1761
1762    pub fn with_path(mut self, path: impl Into<String>) -> Self {
1763        self.path = Some(path.into());
1764        self
1765    }
1766}
1767
1768// --- IBM MQ Specific Configuration ---
1769
1770/// Connection settings for the IBM MQ Queue Manager.
1771#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1772#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1773#[serde(deny_unknown_fields)]
1774pub struct IbmMqConfig {
1775    /// Required. Connection URL in `host(port)` format. Supports comma-separated list for failover (e.g., `host1(1414),host2(1414)`). If it contains userinfo, it will be treated as a secret.
1776    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1777    pub url: String,
1778    /// Target Queue name for point-to-point messaging. Optional if `topic` is set; defaults to route name if omitted.
1779    pub queue: Option<String>,
1780    /// Target Topic string for Publish/Subscribe. If set, enables **Subscriber mode** (Consumer) or publishes to a topic (Publisher). Optional if `queue` is set.
1781    pub topic: Option<String>,
1782    /// Required. Name of the Queue Manager to connect to (e.g., `QM1`).
1783    pub queue_manager: String,
1784    /// Required. Server Connection (SVRCONN) Channel name defined on the QM.
1785    pub channel: String,
1786    /// Username for authentication. Optional; required if the channel enforces authentication
1787    pub username: Option<String>,
1788    /// Password for authentication. Optional; required if the channel enforces authentication.
1789    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1790    pub password: Option<String>,
1791    /// TLS CipherSpec (e.g., `ANY_TLS12`). Optional; required for encrypted connections.
1792    pub cipher_spec: Option<String>,
1793    /// TLS configuration settings (e.g., keystore paths). Optional.
1794    #[serde(default)]
1795    pub tls: TlsConfig,
1796    /// Maximum message size in bytes (default: 4MB). Optional.
1797    #[serde(default = "default_max_message_size")]
1798    pub max_message_size: usize,
1799    /// (Consumer only) Polling timeout in milliseconds (default: 1000ms). Optional.
1800    #[serde(default = "default_wait_timeout_ms")]
1801    pub wait_timeout_ms: i32,
1802    /// Internal buffer size for the channel. Defaults to 100.
1803    #[serde(default)]
1804    pub internal_buffer_size: Option<usize>,
1805    /// If false, attempt to open the queue with INQUIRE permissions to fetch queue depth for status checks. Defaults to false.
1806    #[serde(default)]
1807    pub disable_status_inq: bool,
1808}
1809
1810impl IbmMqConfig {
1811    /// Creates a new IBM MQ configuration with the specified connection URL, queue manager, and channel.
1812    pub fn new(
1813        url: impl Into<String>,
1814        queue_manager: impl Into<String>,
1815        channel: impl Into<String>,
1816    ) -> Self {
1817        Self {
1818            url: url.into(),
1819            queue_manager: queue_manager.into(),
1820            channel: channel.into(),
1821            disable_status_inq: false,
1822            ..Default::default()
1823        }
1824    }
1825
1826    pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1827        self.queue = Some(queue.into());
1828        self
1829    }
1830
1831    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1832        self.topic = Some(topic.into());
1833        self
1834    }
1835
1836    pub fn with_credentials(
1837        mut self,
1838        username: impl Into<String>,
1839        password: impl Into<String>,
1840    ) -> Self {
1841        self.username = Some(username.into());
1842        self.password = Some(password.into());
1843        self
1844    }
1845}
1846
1847fn default_max_message_size() -> usize {
1848    4 * 1024 * 1024 // 4MB default
1849}
1850
1851fn default_wait_timeout_ms() -> i32 {
1852    1000 // 1 second default
1853}
1854
1855// --- Switch/Router Configuration ---
1856
1857#[derive(Debug, Deserialize, Serialize, Clone)]
1858#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1859#[serde(deny_unknown_fields)]
1860pub struct SwitchConfig {
1861    /// The metadata key to inspect for routing decisions.
1862    pub metadata_key: String,
1863    /// A map of values to endpoints.
1864    pub cases: HashMap<String, Endpoint>,
1865    /// The default endpoint if no case matches.
1866    pub default: Option<Box<Endpoint>>,
1867}
1868
1869// --- Response Endpoint Configuration ---
1870#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1871#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1872#[serde(deny_unknown_fields)]
1873pub struct ResponseConfig {
1874    // This struct is a marker and currently has no fields.
1875}
1876
1877// --- SQLx Specific Configuration ---
1878
1879/// General SQLx connection configuration.
1880#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1881#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1882#[serde(deny_unknown_fields)]
1883pub struct SqlxConfig {
1884    /// Database connection URL. If it contains userinfo, it will be treated as a secret.
1885    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1886    pub url: String,
1887    /// Optional username. Takes precedence over any credentials embedded in the `url`.
1888    #[serde(default)]
1889    pub username: Option<String>,
1890    /// Optional password. Takes precedence over any credentials embedded in the `url`.
1891    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1892    #[serde(default)]
1893    pub password: Option<String>,
1894    /// The table to interact with.
1895    pub table: String,
1896    /// (Publisher only) Optional. A custom SQL INSERT query. Use `?` as a placeholder for the payload.
1897    /// If not provided, a default `INSERT INTO {table} (payload) VALUES (?)` is used.
1898    pub insert_query: Option<String>,
1899    /// (Consumer only) Optional. A custom SQL SELECT query to fetch messages. This is only supported for PostgreSQL and Microsoft SQL Server.
1900    /// The query must include a placeholder for the batch size (`$1` for PostgreSQL, `@p1` for SQL Server).
1901    /// The bridge will bind the route's `batch_size` to this placeholder.
1902    pub select_query: Option<String>,
1903    /// (Consumer only) If true, delete messages after processing.
1904    #[serde(default)]
1905    pub delete_after_read: bool,
1906    /// (Publisher only) If true, automatically create the table and indexes if they don't exist. Defaults to false.
1907    #[serde(default)]
1908    pub auto_create_table: bool,
1909    /// (Consumer only) Polling interval in milliseconds. Defaults to 100ms.
1910    pub polling_interval_ms: Option<u64>,
1911    /// TLS configuration for the database connection.
1912    #[serde(default)]
1913    pub tls: TlsConfig,
1914    /// Maximum number of connections in the pool. Defaults to 10.
1915    pub max_connections: Option<u32>,
1916    /// Minimum number of connections to keep in the pool. Defaults to 0.
1917    pub min_connections: Option<u32>,
1918    /// Timeout for acquiring a connection from the pool in milliseconds. Defaults to 30000ms.
1919    pub acquire_timeout_ms: Option<u64>,
1920    /// Maximum idle time for a connection in milliseconds. Defaults to 600000ms (10 minutes).
1921    pub idle_timeout_ms: Option<u64>,
1922    /// Maximum lifetime of a connection in milliseconds. Defaults to 1800000ms (30 minutes).
1923    pub max_lifetime_ms: Option<u64>,
1924}
1925
1926// --- Common Configuration ---
1927
1928/// TLS configuration for secure connections.
1929///
1930/// Configures Transport Layer Security (TLS/SSL) for encrypted communication.
1931/// Supports both client certificate (mutual TLS) and server certificate validation.
1932///
1933/// # Examples
1934///
1935/// ```
1936/// use mq_bridge::models::TlsConfig;
1937///
1938/// let tls = TlsConfig {
1939///     required: true,
1940///     ca_file: Some("/path/to/ca.pem".to_string()),
1941///     cert_file: Some("/path/to/cert.pem".to_string()),
1942///     key_file: Some("/path/to/key.pem".to_string()),
1943///     ..Default::default()
1944/// };
1945/// ```
1946#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq, Hash)]
1947#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1948#[serde(deny_unknown_fields)]
1949pub struct TlsConfig {
1950    /// If true, enable TLS/SSL.
1951    #[serde(default, deserialize_with = "deserialize_null_as_false")]
1952    pub required: bool,
1953    /// Path to the CA certificate file.
1954    pub ca_file: Option<String>,
1955    /// Path to the client certificate file (PEM).
1956    pub cert_file: Option<String>,
1957    /// Path to the client private key file (PEM).
1958    pub key_file: Option<String>,
1959    /// Password for the private key (if encrypted).
1960    #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1961    pub cert_password: Option<String>,
1962    /// If true, disable server certificate verification (insecure).
1963    #[serde(default)]
1964    pub accept_invalid_certs: bool,
1965}
1966
1967impl TlsConfig {
1968    /// Creates a new TLS configuration with default settings (TLS not required).
1969    pub fn new() -> Self {
1970        Self::default()
1971    }
1972
1973    pub fn with_ca_file(mut self, ca_file: impl Into<String>) -> Self {
1974        self.ca_file = Some(ca_file.into());
1975        self.required = true;
1976        self
1977    }
1978
1979    pub fn with_client_cert(
1980        mut self,
1981        cert_file: impl Into<String>,
1982        key_file: impl Into<String>,
1983    ) -> Self {
1984        self.cert_file = Some(cert_file.into());
1985        self.key_file = Some(key_file.into());
1986        self.required = true;
1987        self
1988    }
1989
1990    pub fn with_insecure(mut self, accept_invalid_certs: bool) -> Self {
1991        self.accept_invalid_certs = accept_invalid_certs;
1992        self
1993    }
1994
1995    /// Checks if mutual TLS (mTLS) client authentication is configured.
1996    pub fn is_mtls_client_configured(&self) -> bool {
1997        self.required && self.cert_file.is_some() && self.key_file.is_some()
1998    }
1999
2000    /// Checks if TLS server certificate authentication is configured.
2001    pub fn is_tls_server_configured(&self) -> bool {
2002        self.required && self.cert_file.is_some() && self.key_file.is_some()
2003    }
2004
2005    /// Checks if the TLS configuration is sufficient to make a TLS client connection.
2006    pub fn is_tls_client_configured(&self) -> bool {
2007        self.required
2008            || self.ca_file.is_some()
2009            || (self.cert_file.is_some() && self.key_file.is_some())
2010    }
2011
2012    /// Helper to normalize a URL by adding the appropriate scheme prefix (http:// or https://) if missing.
2013    pub fn normalize_url(&self, url: &str) -> String {
2014        if url
2015            .get(..7)
2016            .is_some_and(|prefix| prefix.eq_ignore_ascii_case("http://"))
2017            || url
2018                .get(..8)
2019                .is_some_and(|prefix| prefix.eq_ignore_ascii_case("https://"))
2020        {
2021            url.to_string()
2022        } else {
2023            let is_tls = self.required;
2024            let scheme = if is_tls { "https" } else { "http" };
2025            format!("{}://{}", scheme, url)
2026        }
2027    }
2028}
2029
2030/// Trait for extracting secrets from configuration structures.
2031pub trait SecretExtractor {
2032    /// Extracts secrets into the provided map using the given prefix, and clears them from self.
2033    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>);
2034}
2035
2036fn extract_sensitive_string_map_entries(
2037    values: &mut HashMap<String, String>,
2038    prefix: &str,
2039    field_name: &str,
2040    secrets: &mut HashMap<String, String>,
2041) {
2042    let secret_keys = values
2043        .keys()
2044        .filter(|key| {
2045            let key = key.to_ascii_lowercase();
2046            key.contains("key") || key.contains("token") || key.contains("auth")
2047        })
2048        .cloned()
2049        .collect::<Vec<_>>();
2050
2051    for key in secret_keys {
2052        if let Some(value) = values.remove(&key) {
2053            secrets.insert(
2054                sanitize_secret_key(&format!("{}__{}__{}", prefix, field_name, key)),
2055                value,
2056            );
2057        }
2058    }
2059}
2060
2061fn url_has_userinfo(url: &str) -> bool {
2062    let Some(authority_start) = url.find("://").map(|idx| idx + 3) else {
2063        return false;
2064    };
2065    let authority_end = url[authority_start..]
2066        .find(['/', '?', '#'])
2067        .map(|idx| authority_start + idx)
2068        .unwrap_or(url.len());
2069    url[authority_start..authority_end].contains('@')
2070}
2071
2072fn sanitize_secret_key(key: &str) -> String {
2073    key.chars()
2074        .map(|ch| {
2075            let ch = ch.to_ascii_uppercase();
2076            if ch.is_ascii_alphanumeric() || ch == '_' {
2077                ch
2078            } else {
2079                '_'
2080            }
2081        })
2082        .collect()
2083}
2084
2085fn extract_sensitive_url(
2086    url: &mut String,
2087    prefix: &str,
2088    field_name: &str,
2089    secrets: &mut HashMap<String, String>,
2090) {
2091    if !url.is_empty() && url_has_userinfo(url) {
2092        secrets.insert(
2093            sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
2094            std::mem::take(url),
2095        );
2096    }
2097}
2098
2099fn extract_sensitive_optional_url(
2100    url: &mut Option<String>,
2101    prefix: &str,
2102    field_name: &str,
2103    secrets: &mut HashMap<String, String>,
2104) {
2105    if url.as_ref().is_some_and(|url| url_has_userinfo(url)) {
2106        if let Some(url) = url.take() {
2107            secrets.insert(
2108                sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
2109                url,
2110            );
2111        }
2112    }
2113}
2114
2115impl SecretExtractor for Route {
2116    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2117        self.input
2118            .extract_secrets(&format!("{}__{}", prefix, "INPUT"), secrets);
2119        self.output
2120            .extract_secrets(&format!("{}__{}", prefix, "OUTPUT"), secrets);
2121    }
2122}
2123
2124impl SecretExtractor for Endpoint {
2125    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2126        for (i, middleware) in self.middlewares.iter_mut().enumerate() {
2127            middleware.extract_secrets(&format!("{}__{}__{}", prefix, "MIDDLEWARES", i), secrets);
2128        }
2129        self.endpoint_type.extract_secrets(prefix, secrets);
2130    }
2131}
2132
2133impl SecretExtractor for EndpointType {
2134    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2135        match self {
2136            EndpointType::Aws(cfg) => {
2137                cfg.extract_secrets(&format!("{}__{}", prefix, "AWS"), secrets)
2138            }
2139            EndpointType::Kafka(cfg) => {
2140                cfg.extract_secrets(&format!("{}__{}", prefix, "KAFKA"), secrets)
2141            }
2142            EndpointType::Nats(cfg) => {
2143                cfg.extract_secrets(&format!("{}__{}", prefix, "NATS"), secrets)
2144            }
2145            EndpointType::Amqp(cfg) => {
2146                cfg.extract_secrets(&format!("{}__{}", prefix, "AMQP"), secrets)
2147            }
2148            EndpointType::MongoDb(cfg) => {
2149                cfg.extract_secrets(&format!("{}__{}", prefix, "MONGODB"), secrets)
2150            }
2151            EndpointType::Mqtt(cfg) => {
2152                cfg.extract_secrets(&format!("{}__{}", prefix, "MQTT"), secrets)
2153            }
2154            EndpointType::Http(cfg) => {
2155                cfg.extract_secrets(&format!("{}__{}", prefix, "HTTP"), secrets)
2156            }
2157            EndpointType::WebSocket(cfg) => {
2158                cfg.extract_secrets(&format!("{}__{}", prefix, "WEBSOCKET"), secrets)
2159            }
2160            EndpointType::IbmMq(cfg) => {
2161                cfg.extract_secrets(&format!("{}__{}", prefix, "IBMMQ"), secrets)
2162            }
2163            EndpointType::ZeroMq(cfg) => {
2164                cfg.extract_secrets(&format!("{}__{}", prefix, "ZEROMQ"), secrets)
2165            }
2166            EndpointType::Sqlx(cfg) => {
2167                cfg.extract_secrets(&format!("{}__{}", prefix, "SQLX"), secrets)
2168            }
2169            EndpointType::Grpc(cfg) => {
2170                cfg.extract_secrets(&format!("{}__{}", prefix, "GRPC"), secrets)
2171            }
2172            EndpointType::Fanout(endpoints) => {
2173                for (i, ep) in endpoints.iter_mut().enumerate() {
2174                    ep.extract_secrets(&format!("{}__{}__{}", prefix, "FANOUT", i), secrets);
2175                }
2176            }
2177            EndpointType::Switch(cfg) => {
2178                for (key, ep) in cfg.cases.iter_mut() {
2179                    ep.extract_secrets(
2180                        &format!(
2181                            "{}__{}__{}",
2182                            prefix,
2183                            "SWITCH__CASES",
2184                            sanitize_secret_key(key)
2185                        ),
2186                        secrets,
2187                    );
2188                }
2189                if let Some(default) = &mut cfg.default {
2190                    default.extract_secrets(&format!("{}__{}", prefix, "SWITCH__DEFAULT"), secrets);
2191                }
2192            }
2193            EndpointType::Reader(ep) => {
2194                ep.extract_secrets(&format!("{}__{}", prefix, "READER"), secrets)
2195            }
2196            _ => {}
2197        }
2198    }
2199}
2200
2201impl SecretExtractor for Middleware {
2202    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2203        if let Middleware::Dlq(cfg) = self {
2204            cfg.endpoint
2205                .extract_secrets(&format!("{}__{}__{}", prefix, "DLQ", "ENDPOINT"), secrets);
2206        }
2207    }
2208}
2209
2210impl SecretExtractor for AwsConfig {
2211    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2212        if let Some(val) = self.access_key.take() {
2213            secrets.insert(format!("{}__{}", prefix, "ACCESS_KEY"), val);
2214        }
2215        if let Some(val) = self.secret_key.take() {
2216            secrets.insert(format!("{}__{}", prefix, "SECRET_KEY"), val);
2217        }
2218        if let Some(val) = self.session_token.take() {
2219            secrets.insert(format!("{}__{}", prefix, "SESSION_TOKEN"), val);
2220        }
2221        extract_sensitive_optional_url(&mut self.queue_url, prefix, "QUEUE_URL", secrets);
2222        extract_sensitive_optional_url(&mut self.endpoint_url, prefix, "ENDPOINT_URL", secrets);
2223    }
2224}
2225
2226impl SecretExtractor for KafkaConfig {
2227    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2228        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2229        if let Some(val) = self.username.take() {
2230            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2231        }
2232        if let Some(val) = self.password.take() {
2233            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2234        }
2235        self.tls
2236            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2237    }
2238}
2239
2240impl SecretExtractor for NatsConfig {
2241    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2242        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2243        if let Some(val) = self.username.take() {
2244            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2245        }
2246        if let Some(val) = self.password.take() {
2247            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2248        }
2249        if let Some(val) = self.token.take() {
2250            secrets.insert(format!("{}__{}", prefix, "TOKEN"), val);
2251        }
2252        self.tls
2253            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2254    }
2255}
2256
2257impl SecretExtractor for AmqpConfig {
2258    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2259        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2260        if let Some(val) = self.username.take() {
2261            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2262        }
2263        if let Some(val) = self.password.take() {
2264            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2265        }
2266        self.tls
2267            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2268    }
2269}
2270
2271impl SecretExtractor for MongoDbConfig {
2272    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2273        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2274        if let Some(val) = self.username.take() {
2275            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2276        }
2277        if let Some(val) = self.password.take() {
2278            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2279        }
2280        self.tls
2281            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2282    }
2283}
2284
2285impl SecretExtractor for MqttConfig {
2286    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2287        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2288        if let Some(val) = self.username.take() {
2289            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2290        }
2291        if let Some(val) = self.password.take() {
2292            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2293        }
2294        self.tls
2295            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2296    }
2297}
2298
2299impl SecretExtractor for HttpConfig {
2300    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2301        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2302        if let Some((u, p)) = self.basic_auth.take() {
2303            secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 0), u);
2304            secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 1), p);
2305        }
2306        extract_sensitive_string_map_entries(
2307            &mut self.custom_headers,
2308            prefix,
2309            "CUSTOM_HEADERS",
2310            secrets,
2311        );
2312        self.tls
2313            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2314    }
2315}
2316
2317impl SecretExtractor for WebSocketConfig {
2318    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2319        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2320    }
2321}
2322
2323impl SecretExtractor for IbmMqConfig {
2324    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2325        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2326        if let Some(val) = self.username.take() {
2327            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2328        }
2329        if let Some(val) = self.password.take() {
2330            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2331        }
2332        self.tls
2333            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2334    }
2335}
2336
2337impl SecretExtractor for ZeroMqConfig {
2338    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2339        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2340    }
2341}
2342
2343impl SecretExtractor for SqlxConfig {
2344    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2345        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2346        if let Some(val) = self.username.take() {
2347            secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2348        }
2349        if let Some(val) = self.password.take() {
2350            secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2351        }
2352        self.tls
2353            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2354    }
2355}
2356
2357impl SecretExtractor for GrpcConfig {
2358    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2359        extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2360        self.tls
2361            .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2362    }
2363}
2364
2365impl SecretExtractor for TlsConfig {
2366    fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2367        if let Some(val) = self.cert_password.take() {
2368            secrets.insert(format!("{}__{}", prefix, "CERT_PASSWORD"), val);
2369        }
2370    }
2371}
2372
2373/// Extracts sensitive values (passwords, keys, tokens) from the configuration
2374/// and returns them as a map of environment variables (key-value pairs).
2375/// The extracted fields in the configuration are set to `None`.
2376///
2377/// The keys in the returned map follow the `MQB__{ROUTE}__{ENDPOINT}__{FIELD}` pattern
2378/// compatible with the `config` crate's environment variable override mechanism.
2379pub fn extract_config_secrets(config: &mut Config) -> HashMap<String, String> {
2380    let mut secrets = HashMap::new();
2381    for (route_name, route) in config.iter_mut() {
2382        let prefix = sanitize_secret_key(&format!("MQB__{}", route_name));
2383        route.extract_secrets(&prefix, &mut secrets);
2384    }
2385    secrets
2386}
2387
2388#[cfg(test)]
2389mod tests {
2390    use super::*;
2391    use config::{Config as ConfigBuilder, Environment};
2392
2393    const TEST_YAML: &str = r#"
2394kafka_to_nats:
2395  concurrency: 10
2396  input:
2397    middlewares:
2398      - deduplication:
2399          sled_path: "/tmp/mq-bridge/dedup_db"
2400          ttl_seconds: 3600
2401      - metrics: {}
2402      - retry:
2403          max_attempts: 5
2404          initial_interval_ms: 200
2405      - random_panic:
2406          mode: nack
2407      - dlq:
2408          endpoint:
2409            nats:
2410              subject: "dlq-subject"
2411              url: "nats://localhost:4222"
2412    kafka:
2413      topic: "input-topic"
2414      url: "localhost:9092"
2415      group_id: "my-consumer-group"
2416      tls:
2417        required: true
2418        ca_file: "/path_to_ca"
2419        cert_file: "/path_to_cert"
2420        key_file: "/path_to_key"
2421        cert_password: "password"
2422        accept_invalid_certs: true
2423  output:
2424    middlewares:
2425      - metrics: {}
2426      - dlq:
2427          endpoint:
2428            file:
2429              path: "error.out"
2430    nats:
2431      subject: "output-subject"
2432      url: "nats://localhost:4222"
2433"#;
2434
2435    fn assert_config_values(config: &Config) {
2436        assert_eq!(config.len(), 1);
2437        let route = config.get("kafka_to_nats").expect("Route should exist");
2438
2439        assert_eq!(route.options.concurrency, 10);
2440
2441        // --- Assert Input ---
2442        let input = &route.input;
2443        assert_eq!(input.middlewares.len(), 5);
2444
2445        let mut has_dedup = false;
2446        let mut has_metrics = false;
2447        let mut has_dlq = false;
2448        let mut has_retry = false;
2449        let mut has_random_panic = false;
2450        for middleware in &input.middlewares {
2451            match middleware {
2452                Middleware::Deduplication(dedup) => {
2453                    assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
2454                    assert_eq!(dedup.ttl_seconds, 3600);
2455                    has_dedup = true;
2456                }
2457                Middleware::Metrics(_) => {
2458                    has_metrics = true;
2459                }
2460                Middleware::Custom { .. } => {}
2461                Middleware::Dlq(dlq) => {
2462                    assert!(dlq.endpoint.middlewares.is_empty());
2463                    if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
2464                        assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
2465                        assert_eq!(nats_cfg.url, "nats://localhost:4222");
2466                    }
2467                    has_dlq = true;
2468                }
2469                Middleware::Retry(retry) => {
2470                    assert_eq!(retry.max_attempts, 5);
2471                    assert_eq!(retry.initial_interval_ms, 200);
2472                    has_retry = true;
2473                }
2474                Middleware::RandomPanic(rp) => {
2475                    assert!(rp.mode == FaultMode::Nack);
2476                    has_random_panic = true;
2477                }
2478                Middleware::Delay(_) => {}
2479                Middleware::WeakJoin(_) => {}
2480                Middleware::Limiter(_) => {}
2481                Middleware::Buffer(_) => {}
2482                Middleware::CookieJar(_) => {}
2483            }
2484        }
2485
2486        if let EndpointType::Kafka(kafka) = &input.endpoint_type {
2487            assert_eq!(kafka.topic, Some("input-topic".to_string()));
2488            assert_eq!(kafka.url, "localhost:9092");
2489            assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
2490            let tls = &kafka.tls;
2491            assert!(tls.required);
2492            assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
2493            assert!(tls.accept_invalid_certs);
2494        } else {
2495            panic!("Input endpoint should be Kafka");
2496        }
2497        assert!(has_dedup);
2498        assert!(has_metrics);
2499        assert!(has_dlq);
2500        assert!(has_retry);
2501        assert!(has_random_panic);
2502
2503        // --- Assert Output ---
2504        let output = &route.output;
2505        assert_eq!(output.middlewares.len(), 2);
2506        assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
2507
2508        if let EndpointType::Nats(nats) = &output.endpoint_type {
2509            assert_eq!(nats.subject, Some("output-subject".to_string()));
2510            assert_eq!(nats.url, "nats://localhost:4222");
2511        } else {
2512            panic!("Output endpoint should be NATS");
2513        }
2514    }
2515
2516    #[test]
2517    fn test_deserialize_from_yaml() {
2518        // We use serde_yaml directly here because the `config` crate's processing
2519        // can interfere with complex deserialization logic.
2520        let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
2521        println!("Deserialized from YAML: {:#?}", result);
2522        let config = result.expect("Failed to deserialize TEST_YAML");
2523        assert_config_values(&config);
2524    }
2525
2526    #[test]
2527    fn test_deserialize_from_env() {
2528        // Set environment variables based on README
2529        unsafe {
2530            std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
2531            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
2532            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
2533            std::env::set_var(
2534                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
2535                "my-consumer-group",
2536            );
2537            std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
2538            std::env::set_var(
2539                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
2540                "/path_to_ca",
2541            );
2542            std::env::set_var(
2543                "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
2544                "true",
2545            );
2546            std::env::set_var(
2547                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
2548                "output-subject",
2549            );
2550            std::env::set_var(
2551                "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
2552                "nats://localhost:4222",
2553            );
2554            std::env::set_var(
2555                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
2556                "dlq-subject",
2557            );
2558            std::env::set_var(
2559                "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
2560                "nats://localhost:4222",
2561            );
2562        }
2563
2564        let builder = ConfigBuilder::builder()
2565            // Enable automatic type parsing for values from environment variables.
2566            .add_source(
2567                Environment::with_prefix("MQB")
2568                    .separator("__")
2569                    .try_parsing(true),
2570            );
2571
2572        let config: Config = builder
2573            .build()
2574            .expect("Failed to build config")
2575            .try_deserialize()
2576            .expect("Failed to deserialize config");
2577
2578        // We can't test all values from env, but we can check the ones we set.
2579        assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
2580        if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
2581            assert_eq!(k.topic, Some("input-topic".to_string()));
2582            assert!(k.tls.required);
2583        } else {
2584            panic!("Expected Kafka endpoint");
2585        }
2586
2587        let input = &config.get("kafka_to_nats").unwrap().input;
2588        assert_eq!(input.middlewares.len(), 1);
2589        if let Middleware::Dlq(_) = &input.middlewares[0] {
2590            // Correctly parsed
2591        } else {
2592            panic!("Expected DLQ middleware");
2593        }
2594    }
2595
2596    #[test]
2597    fn test_extract_secrets() {
2598        let mut config = Config::new();
2599        let mut route = Route::default();
2600
2601        // Setup Kafka with secrets
2602        let mut kafka_config = KafkaConfig::new("kafka://user:pass@localhost:9092");
2603        kafka_config.username = Some("user".to_string());
2604        kafka_config.password = Some("pass".to_string());
2605        kafka_config.tls.cert_password = Some("certpass".to_string());
2606
2607        route.input = Endpoint {
2608            endpoint_type: EndpointType::Kafka(kafka_config),
2609            middlewares: vec![],
2610            handler: None,
2611        };
2612
2613        // Setup HTTP with basic auth
2614        let mut http_config = HttpConfig::new("http://httpuser:httppass@localhost");
2615        http_config.basic_auth = Some(("httpuser".to_string(), "httppass".to_string()));
2616        http_config
2617            .custom_headers
2618            .insert("X-API-Key".to_string(), "http-api-key".to_string());
2619        http_config.custom_headers.insert(
2620            "X-Access-Token".to_string(),
2621            "http-access-token".to_string(),
2622        );
2623        http_config.custom_headers.insert(
2624            "X-Authentication".to_string(),
2625            "http-authentication".to_string(),
2626        );
2627        http_config.custom_headers.insert(
2628            "Authorization".to_string(),
2629            "Bearer secret-token".to_string(),
2630        );
2631        http_config
2632            .custom_headers
2633            .insert("X-Trace-Id".to_string(), "trace-value".to_string());
2634
2635        route.output = Endpoint {
2636            endpoint_type: EndpointType::Http(http_config),
2637            middlewares: vec![],
2638            handler: None,
2639        };
2640
2641        config.insert("test_route".to_string(), route);
2642
2643        let secrets = extract_config_secrets(&mut config);
2644
2645        // Verify secrets extracted
2646        assert_eq!(
2647            secrets
2648                .get("MQB__TEST_ROUTE__INPUT__KAFKA__URL")
2649                .map(|s| s.as_str()),
2650            Some("kafka://user:pass@localhost:9092")
2651        );
2652        assert_eq!(
2653            secrets
2654                .get("MQB__TEST_ROUTE__INPUT__KAFKA__USERNAME")
2655                .map(|s| s.as_str()),
2656            Some("user")
2657        );
2658        assert_eq!(
2659            secrets
2660                .get("MQB__TEST_ROUTE__INPUT__KAFKA__PASSWORD")
2661                .map(|s| s.as_str()),
2662            Some("pass")
2663        );
2664        assert_eq!(
2665            secrets
2666                .get("MQB__TEST_ROUTE__INPUT__KAFKA__TLS__CERT_PASSWORD")
2667                .map(|s| s.as_str()),
2668            Some("certpass")
2669        );
2670        assert_eq!(
2671            secrets
2672                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__URL")
2673                .map(|s| s.as_str()),
2674            Some("http://httpuser:httppass@localhost")
2675        );
2676        assert_eq!(
2677            secrets
2678                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__0")
2679                .map(|s| s.as_str()),
2680            Some("httpuser")
2681        );
2682        assert_eq!(
2683            secrets
2684                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__1")
2685                .map(|s| s.as_str()),
2686            Some("httppass")
2687        );
2688        assert_eq!(
2689            secrets
2690                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_API_KEY")
2691                .map(|s| s.as_str()),
2692            Some("http-api-key")
2693        );
2694        assert_eq!(
2695            secrets
2696                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_ACCESS_TOKEN")
2697                .map(|s| s.as_str()),
2698            Some("http-access-token")
2699        );
2700        assert_eq!(
2701            secrets
2702                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_AUTHENTICATION")
2703                .map(|s| s.as_str()),
2704            Some("http-authentication")
2705        );
2706        assert_eq!(
2707            secrets
2708                .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__AUTHORIZATION")
2709                .map(|s| s.as_str()),
2710            Some("Bearer secret-token")
2711        );
2712
2713        // Verify config cleared
2714        let route = config.get("test_route").unwrap();
2715        if let EndpointType::Kafka(k) = &route.input.endpoint_type {
2716            assert!(k.url.is_empty());
2717            assert!(k.username.is_none());
2718            assert!(k.password.is_none());
2719            assert!(k.tls.cert_password.is_none());
2720        }
2721        if let EndpointType::Http(h) = &route.output.endpoint_type {
2722            assert!(h.url.is_empty());
2723            assert!(h.basic_auth.is_none());
2724            assert!(!h.custom_headers.contains_key("X-API-Key"));
2725            assert!(!h.custom_headers.contains_key("X-Access-Token"));
2726            assert!(!h.custom_headers.contains_key("X-Authentication"));
2727            assert!(!h.custom_headers.contains_key("Authorization"));
2728            assert_eq!(
2729                h.custom_headers.get("X-Trace-Id").map(|s| s.as_str()),
2730                Some("trace-value")
2731            );
2732        }
2733    }
2734
2735    #[test]
2736    fn test_extract_sensitive_url_only_strips_authority_credentials() {
2737        let mut config = Config::new();
2738        let path_at_route = Route {
2739            output: Endpoint {
2740                endpoint_type: EndpointType::Http(HttpConfig::new(
2741                    "https://example.com/path/user@example.com?email=a@b.test",
2742                )),
2743                middlewares: vec![],
2744                handler: None,
2745            },
2746            ..Default::default()
2747        };
2748        config.insert("path_at_route".to_string(), path_at_route);
2749
2750        let credential_route = Route {
2751            output: Endpoint {
2752                endpoint_type: EndpointType::Http(HttpConfig::new(
2753                    "https://user:pass@example.com/path",
2754                )),
2755                middlewares: vec![],
2756                handler: None,
2757            },
2758            ..Default::default()
2759        };
2760        config.insert("credential_route".to_string(), credential_route);
2761
2762        let query_at_route = Route {
2763            output: Endpoint {
2764                endpoint_type: EndpointType::Http(HttpConfig::new(
2765                    "https://example.com?next=a@b.test",
2766                )),
2767                middlewares: vec![],
2768                handler: None,
2769            },
2770            ..Default::default()
2771        };
2772        config.insert("query_at_route".to_string(), query_at_route);
2773
2774        let fragment_at_route = Route {
2775            output: Endpoint {
2776                endpoint_type: EndpointType::Http(HttpConfig::new(
2777                    "https://example.com#user@example.com",
2778                )),
2779                middlewares: vec![],
2780                handler: None,
2781            },
2782            ..Default::default()
2783        };
2784        config.insert("fragment_at_route".to_string(), fragment_at_route);
2785
2786        let secrets = extract_config_secrets(&mut config);
2787
2788        if let EndpointType::Http(http) = &config.get("path_at_route").unwrap().output.endpoint_type
2789        {
2790            assert_eq!(
2791                http.url,
2792                "https://example.com/path/user@example.com?email=a@b.test"
2793            );
2794        }
2795        if let EndpointType::Http(http) =
2796            &config.get("query_at_route").unwrap().output.endpoint_type
2797        {
2798            assert_eq!(http.url, "https://example.com?next=a@b.test");
2799        }
2800        if let EndpointType::Http(http) = &config
2801            .get("fragment_at_route")
2802            .unwrap()
2803            .output
2804            .endpoint_type
2805        {
2806            assert_eq!(http.url, "https://example.com#user@example.com");
2807        }
2808        if let EndpointType::Http(http) =
2809            &config.get("credential_route").unwrap().output.endpoint_type
2810        {
2811            assert!(http.url.is_empty());
2812        }
2813        assert_eq!(
2814            secrets
2815                .get("MQB__CREDENTIAL_ROUTE__OUTPUT__HTTP__URL")
2816                .map(String::as_str),
2817            Some("https://user:pass@example.com/path")
2818        );
2819        assert!(!secrets.contains_key("MQB__PATH_AT_ROUTE__OUTPUT__HTTP__URL"));
2820        assert!(!secrets.contains_key("MQB__QUERY_AT_ROUTE__OUTPUT__HTTP__URL"));
2821        assert!(!secrets.contains_key("MQB__FRAGMENT_AT_ROUTE__OUTPUT__HTTP__URL"));
2822    }
2823
2824    #[test]
2825    fn test_file_config_inference() {
2826        let yaml = r#"
2827mode: group_subscribe
2828path: "/tmp/test"
2829group_id: "my_group"
2830"#;
2831        let config: FileConfig = serde_yaml_ng::from_str(yaml).unwrap();
2832        match config.mode {
2833            Some(FileConsumerMode::GroupSubscribe { group_id, .. }) => {
2834                assert_eq!(group_id, "my_group")
2835            }
2836            _ => panic!("Expected GroupSubscribe"),
2837        }
2838
2839        let yaml_queue = r#"
2840mode: consume
2841path: "/tmp/test"
2842"#;
2843        let config_queue: FileConfig = serde_yaml_ng::from_str(yaml_queue).unwrap();
2844        match config_queue.mode {
2845            Some(FileConsumerMode::Consume { delete }) => assert!(!delete),
2846            _ => panic!("Expected Consume"),
2847        }
2848    }
2849}
2850
2851#[cfg(all(test, feature = "schema"))]
2852mod schema_tests {
2853    use super::*;
2854
2855    #[test]
2856    fn generate_json_schema() {
2857        let schema = schemars::schema_for!(Config);
2858        let schema_json = serde_json::to_string_pretty(&schema).unwrap();
2859
2860        let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2861        path.push("mq-bridge.schema.json");
2862        std::fs::write(path, schema_json).expect("Failed to write schema file");
2863    }
2864}