1use serde::{
7 de::{MapAccess, Visitor},
8 Deserialize, Deserializer, Serialize,
9};
10use std::{
11 collections::HashMap,
12 sync::{atomic::AtomicUsize, Arc},
13};
14
15use crate::traits::Handler;
16use tracing::trace;
17
18pub type Config = HashMap<String, Route>;
80
81pub type PublisherConfig = HashMap<String, Endpoint>;
84
85#[derive(Debug, Deserialize, Serialize, Clone)]
87#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
88#[serde(deny_unknown_fields)]
89pub struct Route {
90 pub input: Endpoint,
92 #[serde(default = "default_output_endpoint")]
94 pub output: Endpoint,
95 #[serde(flatten, default)]
97 pub options: RouteOptions,
98}
99
100impl Default for Route {
101 fn default() -> Self {
102 Self {
103 input: Endpoint::null(),
104 output: Endpoint::null(),
105 options: RouteOptions::default(),
106 }
107 }
108}
109
110#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
127#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
128#[serde(deny_unknown_fields)]
129pub struct RouteOptions {
130 #[serde(default, skip_serializing_if = "String::is_empty")]
132 pub description: String,
133 #[serde(default = "default_concurrency")]
135 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
136 pub concurrency: usize,
137 #[serde(default = "default_batch_size")]
139 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
140 pub batch_size: usize,
141 #[serde(default = "default_commit_concurrency_limit")]
145 pub commit_concurrency_limit: usize,
146}
147
148impl Default for RouteOptions {
149 fn default() -> Self {
150 Self {
151 description: String::new(),
152 concurrency: default_concurrency(),
153 batch_size: default_batch_size(),
154 commit_concurrency_limit: default_commit_concurrency_limit(),
155 }
156 }
157}
158
159pub(crate) fn default_concurrency() -> usize {
160 1
161}
162
163pub(crate) fn default_batch_size() -> usize {
164 1
165}
166
167pub(crate) fn default_commit_concurrency_limit() -> usize {
168 4096
169}
170
171fn default_output_endpoint() -> Endpoint {
172 Endpoint::new(EndpointType::Null)
173}
174
175fn default_retry_attempts() -> usize {
176 3
177}
178fn default_initial_interval_ms() -> u64 {
179 100
180}
181fn default_max_interval_ms() -> u64 {
182 5000
183}
184fn default_multiplier() -> f64 {
185 2.0
186}
187fn default_clean_session() -> bool {
188 false
189}
190
191fn is_known_endpoint_name(name: &str) -> bool {
192 matches!(
193 name,
194 "aws"
195 | "kafka"
196 | "nats"
197 | "file"
198 | "static"
199 | "memory"
200 | "sled"
201 | "amqp"
202 | "mongodb"
203 | "mqtt"
204 | "http"
205 | "ibmmq"
206 | "zeromq"
207 | "grpc"
208 | "fanout"
209 | "ref"
210 | "switch"
211 | "response"
212 | "sqlx"
213 )
214}
215
216#[derive(Serialize, Clone, Default)]
218#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
219#[serde(deny_unknown_fields)]
220pub struct Endpoint {
221 #[serde(default)]
223 pub middlewares: Vec<Middleware>,
224
225 #[serde(flatten)]
227 pub endpoint_type: EndpointType,
228
229 #[serde(skip_serializing)]
230 #[cfg_attr(feature = "schema", schemars(skip))]
231 pub handler: Option<Arc<dyn Handler>>,
233}
234
235impl std::fmt::Debug for Endpoint {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 f.debug_struct("Endpoint")
238 .field("middlewares", &self.middlewares)
239 .field("endpoint_type", &self.endpoint_type)
240 .field(
241 "handler",
242 &if self.handler.is_some() {
243 "Some(<Handler>)"
244 } else {
245 "None"
246 },
247 )
248 .finish()
249 }
250}
251
252impl<'de> Deserialize<'de> for Endpoint {
253 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
254 where
255 D: Deserializer<'de>,
256 {
257 struct EndpointVisitor;
258
259 impl<'de> Visitor<'de> for EndpointVisitor {
260 type Value = Endpoint;
261
262 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
263 formatter.write_str("a map representing an endpoint or null")
264 }
265
266 fn visit_unit<E>(self) -> Result<Self::Value, E>
267 where
268 E: serde::de::Error,
269 {
270 Ok(Endpoint::new(EndpointType::Null))
271 }
272
273 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
274 where
275 A: MapAccess<'de>,
276 {
277 let mut temp_map = serde_json::Map::new();
280 let mut middlewares_val = None;
281
282 while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
283 if key == "middlewares" {
284 middlewares_val = Some(value);
285 } else {
286 temp_map.insert(key, value);
287 }
288 }
289
290 let temp_val = serde_json::Value::Object(temp_map);
292 let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
293 Ok(et) => et,
294 Err(original_err) => {
295 if let serde_json::Value::Object(map) = &temp_val {
296 if map.len() == 1 {
297 let (name, config) = map.iter().next().unwrap();
298 if is_known_endpoint_name(name) {
299 return Err(serde::de::Error::custom(original_err));
300 }
301 trace!("Falling back to Custom endpoint for key: {}", name);
302 EndpointType::Custom {
303 name: name.clone(),
304 config: config.clone(),
305 }
306 } else if map.is_empty() {
307 EndpointType::Null
308 } else {
309 return Err(serde::de::Error::custom(
310 "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
311 ));
312 }
313 } else {
314 return Err(serde::de::Error::custom("Invalid endpoint configuration"));
315 }
316 }
317 };
318
319 let middlewares = match middlewares_val {
321 Some(val) => {
322 deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
323 }
324 None => Vec::new(),
325 };
326
327 Ok(Endpoint {
328 middlewares,
329 endpoint_type,
330 handler: None,
331 })
332 }
333 }
334
335 deserializer.deserialize_any(EndpointVisitor)
336 }
337}
338
339fn is_known_middleware_name(name: &str) -> bool {
340 matches!(
341 name,
342 "deduplication"
343 | "metrics"
344 | "dlq"
345 | "retry"
346 | "random_panic"
347 | "delay"
348 | "weak_join"
349 | "custom"
350 )
351}
352
353fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
357 let arr = match value {
358 serde_json::Value::Array(arr) => arr,
359 serde_json::Value::Object(map) => {
360 let mut middlewares: Vec<_> = map
361 .into_iter()
362 .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
365 .collect();
366 middlewares.sort_by_key(|(index, _)| *index);
367
368 middlewares.into_iter().map(|(_, value)| value).collect()
369 }
370 _ => return Err(anyhow::anyhow!("Expected an array or object")),
371 };
372
373 let mut middlewares = Vec::new();
374 for item in arr {
375 let known_name = if let serde_json::Value::Object(map) = &item {
377 if map.len() == 1 {
378 let (name, _) = map.iter().next().unwrap();
379 if is_known_middleware_name(name) {
380 Some(name.clone())
381 } else {
382 None
383 }
384 } else {
385 None
386 }
387 } else {
388 None
389 };
390
391 if let Some(name) = known_name {
392 match serde_json::from_value::<Middleware>(item.clone()) {
393 Ok(m) => middlewares.push(m),
394 Err(e) => {
395 return Err(anyhow::anyhow!(
396 "Failed to deserialize known middleware '{}': {}",
397 name,
398 e
399 ))
400 }
401 }
402 } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
403 middlewares.push(m);
404 } else if let serde_json::Value::Object(map) = &item {
405 if map.len() == 1 {
406 let (name, config) = map.iter().next().unwrap();
407 middlewares.push(Middleware::Custom {
408 name: name.clone(),
409 config: config.clone(),
410 });
411 } else {
412 return Err(anyhow::anyhow!(
413 "Invalid middleware configuration: {:?}",
414 item
415 ));
416 }
417 } else {
418 return Err(anyhow::anyhow!(
419 "Invalid middleware configuration: {:?}",
420 item
421 ));
422 }
423 }
424 Ok(middlewares)
425}
426
427#[derive(Debug, Deserialize, Serialize, Clone, Default)]
449#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
450#[serde(rename_all = "lowercase")]
451pub enum EndpointType {
452 Aws(AwsConfig),
453 Kafka(KafkaConfig),
454 Nats(NatsConfig),
455 File(FileConfig),
456 Static(String),
457 Ref(String),
458 Memory(MemoryConfig),
459 Sled(SledConfig),
460 Amqp(AmqpConfig),
461 MongoDb(MongoDbConfig),
462 Mqtt(MqttConfig),
463 Http(HttpConfig),
464 IbmMq(IbmMqConfig),
465 ZeroMq(ZeroMqConfig),
466 Grpc(GrpcConfig),
467 Sqlx(SqlxConfig),
468 Fanout(Vec<Endpoint>),
469 Switch(SwitchConfig),
470 Response(ResponseConfig),
471 Reader(Box<Endpoint>),
472 Custom {
473 name: String,
474 config: serde_json::Value,
475 },
476 #[default]
477 Null,
478}
479
480impl EndpointType {
481 pub fn name(&self) -> &'static str {
482 match self {
483 EndpointType::Aws(_) => "aws",
484 EndpointType::Kafka(_) => "kafka",
485 EndpointType::Nats(_) => "nats",
486 EndpointType::File(_) => "file",
487 EndpointType::Static(_) => "static",
488 EndpointType::Ref(_) => "ref",
489 EndpointType::Memory(_) => "memory",
490 EndpointType::Sled(_) => "sled",
491 EndpointType::Amqp(_) => "amqp",
492 EndpointType::MongoDb(_) => "mongodb",
493 EndpointType::Mqtt(_) => "mqtt",
494 EndpointType::Http(_) => "http",
495 EndpointType::IbmMq(_) => "ibmmq",
496 EndpointType::ZeroMq(_) => "zeromq",
497 EndpointType::Grpc(_) => "grpc",
498 EndpointType::Sqlx(_) => "sqlx",
499 EndpointType::Fanout(_) => "fanout",
500 EndpointType::Switch(_) => "switch",
501 EndpointType::Response(_) => "response",
502 EndpointType::Reader(_) => "reader",
503 EndpointType::Custom { .. } => "custom",
504 EndpointType::Null => "null",
505 }
506 }
507
508 pub fn is_core(&self) -> bool {
509 matches!(
510 self,
511 EndpointType::File(_)
512 | EndpointType::Static(_)
513 | EndpointType::Ref(_)
514 | EndpointType::Memory(_)
515 | EndpointType::Fanout(_)
516 | EndpointType::Switch(_)
517 | EndpointType::Response(_)
518 | EndpointType::Reader(_)
519 | EndpointType::Custom { .. }
520 | EndpointType::Null
521 )
522 }
523}
524
525#[derive(Debug, Deserialize, Serialize, Clone)]
527#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
528#[serde(rename_all = "snake_case")]
529pub enum Middleware {
530 Deduplication(DeduplicationMiddleware),
531 Metrics(MetricsMiddleware),
532 Dlq(Box<DeadLetterQueueMiddleware>),
533 Retry(RetryMiddleware),
534 RandomPanic(RandomPanicMiddleware),
535 Delay(DelayMiddleware),
536 WeakJoin(WeakJoinMiddleware),
537 Custom {
538 name: String,
539 config: serde_json::Value,
540 },
541}
542
543#[derive(Debug, Deserialize, Serialize, Clone)]
548#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
549#[serde(deny_unknown_fields)]
550pub struct DeduplicationMiddleware {
551 pub sled_path: String,
553 pub ttl_seconds: u64,
555}
556
557#[derive(Debug, Deserialize, Serialize, Clone)]
565#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
566#[serde(deny_unknown_fields)]
567pub struct MetricsMiddleware {}
568
569#[derive(Debug, Deserialize, Serialize, Clone, Default)]
576#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
577#[serde(deny_unknown_fields)]
578pub struct DeadLetterQueueMiddleware {
579 pub endpoint: Endpoint,
581}
582
583#[derive(Debug, Deserialize, Serialize, Clone, Default)]
588#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
589#[serde(deny_unknown_fields)]
590pub struct RetryMiddleware {
591 #[serde(default = "default_retry_attempts")]
593 pub max_attempts: usize,
594 #[serde(default = "default_initial_interval_ms")]
596 pub initial_interval_ms: u64,
597 #[serde(default = "default_max_interval_ms")]
599 pub max_interval_ms: u64,
600 #[serde(default = "default_multiplier")]
602 pub multiplier: f64,
603}
604
605#[derive(Debug, Deserialize, Serialize, Clone)]
610#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
611#[serde(deny_unknown_fields)]
612pub struct DelayMiddleware {
613 pub delay_ms: u64,
615}
616
617#[derive(Debug, Deserialize, Serialize, Clone)]
623#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
624#[serde(deny_unknown_fields)]
625pub struct WeakJoinMiddleware {
626 pub group_by: String,
628 pub expected_count: usize,
630 pub timeout_ms: u64,
632}
633
634#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
636#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
637#[serde(rename_all = "snake_case")]
638pub enum FaultMode {
639 #[default]
641 Panic,
642 Disconnect,
644 Timeout,
646 JsonFormatError,
648 Nack,
650}
651
652impl std::fmt::Display for FaultMode {
653 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654 match self {
655 FaultMode::Panic => write!(f, "panic"),
656 FaultMode::Disconnect => write!(f, "disconnect"),
657 FaultMode::Timeout => write!(f, "timeout"),
658 FaultMode::JsonFormatError => write!(f, "json_format_error"),
659 FaultMode::Nack => write!(f, "nack"),
660 }
661 }
662}
663
664#[derive(Debug, Clone, Serialize, Deserialize, Default)]
677#[serde(deny_unknown_fields)]
678#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
679pub struct RandomPanicMiddleware {
680 #[serde(default)]
682 pub mode: FaultMode,
683 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
685 #[serde(default)]
686 pub trigger_on_message: Option<usize>,
687 #[serde(default = "default_true")]
689 pub enabled: bool,
690 #[serde(skip, default = "default_atomic_usize_arc")]
691 #[cfg_attr(feature = "schema", schemars(skip))]
692 pub message_count: Arc<AtomicUsize>,
693}
694
695fn default_true() -> bool {
696 true
697}
698
699fn default_atomic_usize_arc() -> Arc<AtomicUsize> {
700 Arc::new(AtomicUsize::new(0))
701}
702
703fn deserialize_null_as_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
704where
705 D: Deserializer<'de>,
706{
707 let opt = Option::<bool>::deserialize(deserializer)?;
708 Ok(opt.unwrap_or(false))
709}
710
711#[derive(Debug, Deserialize, Serialize, Clone, Default)]
713#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
714#[serde(deny_unknown_fields)]
715pub struct AwsConfig {
716 pub queue_url: Option<String>,
718 pub topic_arn: Option<String>,
720 pub region: Option<String>,
722 pub endpoint_url: Option<String>,
724 pub access_key: Option<String>,
726 pub secret_key: Option<String>,
728 pub session_token: Option<String>,
730 #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
732 pub max_messages: Option<i32>,
733 #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
735 pub wait_time_seconds: Option<i32>,
736}
737
738impl AwsConfig {
739 pub fn new() -> Self {
741 Self::default()
742 }
743
744 pub fn with_queue_url(mut self, queue_url: impl Into<String>) -> Self {
745 self.queue_url = Some(queue_url.into());
746 self
747 }
748
749 pub fn with_topic_arn(mut self, topic_arn: impl Into<String>) -> Self {
750 self.topic_arn = Some(topic_arn.into());
751 self
752 }
753
754 pub fn with_region(mut self, region: impl Into<String>) -> Self {
755 self.region = Some(region.into());
756 self
757 }
758
759 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
760 self.endpoint_url = Some(endpoint_url.into());
761 self
762 }
763
764 pub fn with_credentials(
765 mut self,
766 access_key: impl Into<String>,
767 secret_key: impl Into<String>,
768 ) -> Self {
769 self.access_key = Some(access_key.into());
770 self.secret_key = Some(secret_key.into());
771 self
772 }
773}
774
775#[derive(Debug, Deserialize, Serialize, Clone, Default)]
779#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
780#[serde(deny_unknown_fields)]
781pub struct KafkaConfig {
782 #[serde(alias = "brokers")]
784 pub url: String,
785 pub topic: Option<String>,
787 pub username: Option<String>,
789 pub password: Option<String>,
791 #[serde(default)]
793 pub tls: TlsConfig,
794 pub group_id: Option<String>,
797 #[serde(default)]
799 pub delayed_ack: bool,
800 #[serde(default)]
802 pub producer_options: Option<Vec<(String, String)>>,
803 #[serde(default)]
805 pub consumer_options: Option<Vec<(String, String)>>,
806}
807
808impl KafkaConfig {
809 pub fn new(url: impl Into<String>) -> Self {
811 Self {
812 url: url.into(),
813 ..Default::default()
814 }
815 }
816
817 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
818 self.topic = Some(topic.into());
819 self
820 }
821
822 pub fn with_group_id(mut self, group_id: impl Into<String>) -> Self {
823 self.group_id = Some(group_id.into());
824 self
825 }
826
827 pub fn with_credentials(
828 mut self,
829 username: impl Into<String>,
830 password: impl Into<String>,
831 ) -> Self {
832 self.username = Some(username.into());
833 self.password = Some(password.into());
834 self
835 }
836
837 pub fn with_producer_option(
838 mut self,
839 key: impl Into<String>,
840 value: impl Into<String>,
841 ) -> Self {
842 let options = self.producer_options.get_or_insert_with(Vec::new);
843 options.push((key.into(), value.into()));
844 self
845 }
846
847 pub fn with_consumer_option(
848 mut self,
849 key: impl Into<String>,
850 value: impl Into<String>,
851 ) -> Self {
852 let options = self.consumer_options.get_or_insert_with(Vec::new);
853 options.push((key.into(), value.into()));
854 self
855 }
856}
857
858#[derive(Debug, Deserialize, Serialize, Clone, Default)]
862#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
863#[serde(deny_unknown_fields)]
864pub struct SledConfig {
865 pub path: String,
867 pub tree: Option<String>,
869 #[serde(default)]
871 pub read_from_start: bool,
872 #[serde(default)]
874 pub delete_after_read: bool,
875}
876
877impl SledConfig {
878 pub fn new(path: impl Into<String>) -> Self {
880 Self {
881 path: path.into(),
882 ..Default::default()
883 }
884 }
885
886 pub fn with_tree(mut self, tree: impl Into<String>) -> Self {
887 self.tree = Some(tree.into());
888 self
889 }
890
891 pub fn with_read_from_start(mut self, read_from_start: bool) -> Self {
892 self.read_from_start = read_from_start;
893 self
894 }
895}
896
897#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
899#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
900#[serde(rename_all = "snake_case")]
901pub enum FileFormat {
902 #[default]
904 Normal,
905 Json,
907 Text,
909 Raw,
911}
912
913#[derive(Debug, Clone, Serialize, Deserialize, Default)]
916#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
917pub struct FileConfig {
918 pub path: String,
920 pub delimiter: Option<String>,
924 #[serde(flatten, default)]
927 pub mode: Option<FileConsumerMode>,
928 #[serde(default)]
930 pub format: FileFormat,
931}
932
933#[derive(Debug, Clone, Deserialize, Serialize)]
934#[serde(tag = "mode", rename_all = "snake_case")]
935#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
936pub enum FileConsumerMode {
937 Consume {
941 #[serde(default)]
942 delete: bool,
943 },
944 Subscribe {
948 #[serde(default)]
949 delete: bool,
950 },
951 GroupSubscribe {
956 group_id: String,
958 #[serde(default)]
961 read_from_tail: bool,
962 },
963}
964
965impl Default for FileConsumerMode {
966 fn default() -> Self {
967 Self::Consume { delete: false }
968 }
969}
970
971impl FileConfig {
972 pub fn new(path: impl Into<String>) -> Self {
974 Self {
975 path: path.into(),
976 mode: Some(FileConsumerMode::default()),
977 delimiter: None,
978 format: FileFormat::default(),
979 }
980 }
981
982 pub fn with_mode(mut self, mode: FileConsumerMode) -> Self {
983 self.mode = Some(mode);
984 self
985 }
986
987 pub fn effective_mode(&self) -> FileConsumerMode {
989 self.mode.clone().unwrap_or_default()
990 }
991}
992
993#[derive(Debug, Deserialize, Serialize, Clone, Default)]
997#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
998#[serde(deny_unknown_fields)]
999pub struct NatsConfig {
1000 pub url: String,
1002 pub subject: Option<String>,
1004 pub stream: Option<String>,
1006 pub username: Option<String>,
1008 pub password: Option<String>,
1010 #[serde(default)]
1012 pub tls: TlsConfig,
1013 pub token: Option<String>,
1015 #[serde(default)]
1019 pub request_reply: bool,
1020 pub request_timeout_ms: Option<u64>,
1022 #[serde(default)]
1024 pub delayed_ack: bool,
1025 #[serde(default)]
1027 pub no_jetstream: bool,
1028 #[serde(default)]
1030 pub subscriber_mode: bool,
1031 pub stream_max_messages: Option<i64>,
1033 pub deliver_policy: Option<NatsDeliverPolicy>,
1035 pub stream_max_bytes: Option<i64>,
1037 pub prefetch_count: Option<usize>,
1039}
1040
1041#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1042#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1043#[serde(rename_all = "snake_case")]
1044pub enum NatsDeliverPolicy {
1045 #[default]
1046 All,
1047 Last,
1048 New,
1049 LastPerSubject,
1050}
1051
1052impl NatsConfig {
1053 pub fn new(url: impl Into<String>) -> Self {
1055 Self {
1056 url: url.into(),
1057 ..Default::default()
1058 }
1059 }
1060
1061 pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
1062 self.subject = Some(subject.into());
1063 self
1064 }
1065
1066 pub fn with_stream(mut self, stream: impl Into<String>) -> Self {
1067 self.stream = Some(stream.into());
1068 self
1069 }
1070
1071 pub fn with_deliver_policy(mut self, policy: NatsDeliverPolicy) -> Self {
1072 self.deliver_policy = Some(policy);
1073 self
1074 }
1075
1076 pub fn with_credentials(
1077 mut self,
1078 username: impl Into<String>,
1079 password: impl Into<String>,
1080 ) -> Self {
1081 self.username = Some(username.into());
1082 self.password = Some(password.into());
1083 self
1084 }
1085}
1086
1087#[derive(Debug, Serialize, Deserialize, Clone, Default)]
1088#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1089#[serde(deny_unknown_fields)]
1090pub struct MemoryConfig {
1091 pub topic: String,
1093 pub capacity: Option<usize>,
1095 #[serde(default)]
1097 pub request_reply: bool,
1098 pub request_timeout_ms: Option<u64>,
1100 #[serde(default)]
1102 pub subscribe_mode: bool,
1103 #[serde(default)]
1105 pub enable_nack: bool,
1106}
1107
1108impl MemoryConfig {
1109 pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
1110 Self {
1111 topic: topic.into(),
1112 capacity,
1113 ..Default::default()
1114 }
1115 }
1116 pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
1117 Self {
1118 subscribe_mode,
1119 ..self
1120 }
1121 }
1122
1123 pub fn with_request_reply(mut self, request_reply: bool) -> Self {
1124 self.request_reply = request_reply;
1125 self
1126 }
1127}
1128
1129#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1133#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1134#[serde(deny_unknown_fields)]
1135pub struct AmqpConfig {
1136 pub url: String,
1140 pub queue: Option<String>,
1142 #[serde(default)]
1144 pub subscribe_mode: bool,
1145 pub username: Option<String>,
1147 pub password: Option<String>,
1149 #[serde(default)]
1151 pub tls: TlsConfig,
1152 pub exchange: Option<String>,
1154 pub prefetch_count: Option<u16>,
1156 #[serde(default)]
1158 pub no_persistence: bool,
1159 #[serde(default)]
1161 pub no_declare_queue: bool,
1162 #[serde(default)]
1164 pub delayed_ack: bool,
1165}
1166
1167impl AmqpConfig {
1168 pub fn new(url: impl Into<String>) -> Self {
1170 Self {
1171 url: url.into(),
1172 ..Default::default()
1173 }
1174 }
1175
1176 pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1177 self.queue = Some(queue.into());
1178 self
1179 }
1180
1181 pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
1182 self.exchange = Some(exchange.into());
1183 self
1184 }
1185
1186 pub fn with_credentials(
1187 mut self,
1188 username: impl Into<String>,
1189 password: impl Into<String>,
1190 ) -> Self {
1191 self.username = Some(username.into());
1192 self.password = Some(password.into());
1193 self
1194 }
1195}
1196
1197#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1201#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1202#[serde(rename_all = "lowercase")]
1203pub enum MongoDbFormat {
1204 #[default]
1205 Normal,
1206 Json,
1207 Text,
1208 Raw,
1209}
1210
1211#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1215#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1216#[serde(deny_unknown_fields)]
1217pub struct MongoDbConfig {
1218 pub url: String,
1221 pub collection: Option<String>,
1223 pub username: Option<String>,
1226 pub password: Option<String>,
1229 #[serde(default)]
1231 pub tls: TlsConfig,
1232 pub database: String,
1234 pub polling_interval_ms: Option<u64>,
1236 pub reply_polling_ms: Option<u64>,
1238 #[serde(default)]
1240 pub request_reply: bool,
1241 #[serde(default)]
1243 pub change_stream: bool,
1244 pub request_timeout_ms: Option<u64>,
1246 pub ttl_seconds: Option<u64>,
1248 pub capped_size_bytes: Option<i64>,
1250 #[serde(default)]
1252 pub format: MongoDbFormat,
1253 pub cursor_id: Option<String>,
1255}
1256
1257impl MongoDbConfig {
1258 pub fn new(url: impl Into<String>, database: impl Into<String>) -> Self {
1260 Self {
1261 url: url.into(),
1262 database: database.into(),
1263 ..Default::default()
1264 }
1265 }
1266
1267 pub fn with_collection(mut self, collection: impl Into<String>) -> Self {
1268 self.collection = Some(collection.into());
1269 self
1270 }
1271
1272 pub fn with_credentials(
1273 mut self,
1274 username: impl Into<String>,
1275 password: impl Into<String>,
1276 ) -> Self {
1277 self.username = Some(username.into());
1278 self.password = Some(password.into());
1279 self
1280 }
1281
1282 pub fn with_change_stream(mut self, change_stream: bool) -> Self {
1283 self.change_stream = change_stream;
1284 self
1285 }
1286}
1287
1288#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1292#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1293#[serde(deny_unknown_fields)]
1294pub struct MqttConfig {
1295 pub url: String,
1297 pub topic: Option<String>,
1299 pub username: Option<String>,
1301 pub password: Option<String>,
1303 #[serde(default)]
1305 pub tls: TlsConfig,
1306 pub client_id: Option<String>,
1308 pub queue_capacity: Option<usize>,
1310 pub max_inflight: Option<u16>,
1312 pub qos: Option<u8>,
1314 #[serde(default = "default_clean_session")]
1316 pub clean_session: bool,
1317 pub keep_alive_seconds: Option<u64>,
1319 #[serde(default)]
1321 pub protocol: MqttProtocol,
1322 pub session_expiry_interval: Option<u32>,
1324 #[serde(default)]
1328 pub delayed_ack: bool,
1329}
1330
1331impl MqttConfig {
1332 pub fn new(url: impl Into<String>) -> Self {
1334 Self {
1335 url: url.into(),
1336 ..Default::default()
1337 }
1338 }
1339
1340 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1341 self.topic = Some(topic.into());
1342 self
1343 }
1344
1345 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
1346 self.client_id = Some(client_id.into());
1347 self
1348 }
1349
1350 pub fn with_credentials(
1351 mut self,
1352 username: impl Into<String>,
1353 password: impl Into<String>,
1354 ) -> Self {
1355 self.username = Some(username.into());
1356 self.password = Some(password.into());
1357 self
1358 }
1359}
1360
1361#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1365#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1366#[serde(rename_all = "lowercase")]
1367pub enum MqttProtocol {
1368 #[default]
1369 V5,
1370 V3,
1371}
1372
1373#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1376#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1377#[serde(deny_unknown_fields)]
1378pub struct ZeroMqConfig {
1379 pub url: String,
1381 #[serde(default)]
1383 pub socket_type: Option<ZeroMqSocketType>,
1384 pub topic: Option<String>,
1386 #[serde(default)]
1388 pub bind: bool,
1389 #[serde(default)]
1391 pub internal_buffer_size: Option<usize>,
1392}
1393
1394impl ZeroMqConfig {
1395 pub fn new(url: impl Into<String>) -> Self {
1397 Self {
1398 url: url.into(),
1399 ..Default::default()
1400 }
1401 }
1402
1403 pub fn with_socket_type(mut self, socket_type: ZeroMqSocketType) -> Self {
1404 self.socket_type = Some(socket_type);
1405 self
1406 }
1407
1408 pub fn with_bind(mut self, bind: bool) -> Self {
1409 self.bind = bind;
1410 self
1411 }
1412}
1413
1414#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
1419#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1420#[serde(rename_all = "lowercase")]
1421pub enum ZeroMqSocketType {
1422 Push,
1423 Pull,
1424 Pub,
1425 Sub,
1426 Req,
1427 Rep,
1428}
1429
1430#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1433#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1434#[serde(deny_unknown_fields)]
1435pub struct GrpcConfig {
1436 pub url: String,
1438 pub topic: Option<String>,
1440 pub timeout_ms: Option<u64>,
1444 #[serde(default)]
1446 pub tls: TlsConfig,
1447 #[serde(default)]
1450 pub server_mode: bool,
1451 #[serde(default)]
1453 pub initial_stream_window_size: Option<u32>,
1454 #[serde(default)]
1456 pub initial_connection_window_size: Option<u32>,
1457 #[serde(default)]
1459 pub concurrency_limit_per_connection: Option<usize>,
1460 #[serde(default)]
1462 pub http2_keepalive_interval_ms: Option<u64>,
1463 #[serde(default)]
1465 pub http2_keepalive_timeout_ms: Option<u64>,
1466 #[serde(default)]
1468 pub max_decoding_message_size: Option<usize>,
1469}
1470
1471impl GrpcConfig {
1472 pub fn new(url: impl Into<String>) -> Self {
1474 Self {
1475 url: url.into(),
1476 ..Default::default()
1477 }
1478 }
1479
1480 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1481 self.topic = Some(topic.into());
1482 self
1483 }
1484
1485 pub fn with_server_mode(mut self, server_mode: bool) -> Self {
1487 self.server_mode = server_mode;
1488 self
1489 }
1490}
1491
1492#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1496#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1497#[serde(deny_unknown_fields)]
1498pub struct HttpConfig {
1499 pub url: String,
1501 pub path: Option<String>,
1503 pub method: Option<String>,
1505 #[serde(default)]
1507 pub tls: TlsConfig,
1508 pub workers: Option<usize>,
1510 pub message_id_header: Option<String>,
1512 pub request_timeout_ms: Option<u64>,
1514 pub internal_buffer_size: Option<usize>,
1516 #[serde(default)]
1518 pub fire_and_forget: bool,
1519 #[serde(default, skip_serializing_if = "Option::is_none")]
1521 pub batch_concurrency: Option<usize>,
1522 #[serde(default, skip_serializing_if = "Option::is_none")]
1524 pub tcp_keepalive_ms: Option<u64>,
1525 #[serde(default, skip_serializing_if = "Option::is_none")]
1527 pub pool_idle_timeout_ms: Option<u64>,
1528 #[serde(default)]
1530 pub compression_enabled: bool,
1531 #[serde(default)]
1533 pub compression_threshold_bytes: Option<usize>,
1534 pub concurrency_limit: Option<usize>,
1537 #[serde(
1538 default,
1539 skip_serializing_if = "Option::is_none",
1540 deserialize_with = "deserialize_basic_auth"
1541 )]
1542 pub basic_auth: Option<(String, String)>,
1543 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
1545 pub custom_headers: HashMap<String, String>,
1546}
1547
1548fn deserialize_basic_auth<'de, D>(deserializer: D) -> Result<Option<(String, String)>, D::Error>
1549where
1550 D: Deserializer<'de>,
1551{
1552 let val = serde_json::Value::deserialize(deserializer)?;
1553 match val {
1554 serde_json::Value::Null => Ok(None),
1555 serde_json::Value::Array(arr) => {
1556 if arr.len() != 2 {
1557 return Err(serde::de::Error::custom("basic_auth must have 2 elements"));
1558 }
1559 let u = arr[0]
1560 .as_str()
1561 .ok_or_else(|| serde::de::Error::custom("basic_auth[0] must be string"))?
1562 .to_string();
1563 let p = arr[1]
1564 .as_str()
1565 .ok_or_else(|| serde::de::Error::custom("basic_auth[1] must be string"))?
1566 .to_string();
1567 Ok(Some((u, p)))
1568 }
1569 serde_json::Value::Object(map) => {
1570 let u = map
1571 .get("0")
1572 .and_then(|v| v.as_str())
1573 .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '0'"))?
1574 .to_string();
1575 let p = map
1576 .get("1")
1577 .and_then(|v| v.as_str())
1578 .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '1'"))?
1579 .to_string();
1580 Ok(Some((u, p)))
1581 }
1582 _ => Err(serde::de::Error::custom("invalid type for basic_auth")),
1583 }
1584}
1585
1586impl HttpConfig {
1587 pub fn new(url: impl Into<String>) -> Self {
1589 Self {
1590 url: url.into(),
1591 ..Default::default()
1592 }
1593 }
1594
1595 pub fn with_workers(mut self, workers: usize) -> Self {
1596 self.workers = Some(workers);
1597 self
1598 }
1599
1600 pub fn with_method(mut self, method: impl Into<String>) -> Self {
1601 self.method = Some(method.into());
1602 self
1603 }
1604
1605 pub fn with_path(mut self, path: impl Into<String>) -> Self {
1606 self.path = Some(path.into());
1607 self
1608 }
1609}
1610
1611#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1615#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1616#[serde(deny_unknown_fields)]
1617pub struct IbmMqConfig {
1618 pub url: String,
1620 pub queue: Option<String>,
1622 pub topic: Option<String>,
1624 pub queue_manager: String,
1626 pub channel: String,
1628 pub username: Option<String>,
1630 pub password: Option<String>,
1632 pub cipher_spec: Option<String>,
1634 #[serde(default)]
1636 pub tls: TlsConfig,
1637 #[serde(default = "default_max_message_size")]
1639 pub max_message_size: usize,
1640 #[serde(default = "default_wait_timeout_ms")]
1642 pub wait_timeout_ms: i32,
1643 #[serde(default)]
1645 pub internal_buffer_size: Option<usize>,
1646 #[serde(default)]
1648 pub disable_status_inq: bool,
1649}
1650
1651impl IbmMqConfig {
1652 pub fn new(
1654 url: impl Into<String>,
1655 queue_manager: impl Into<String>,
1656 channel: impl Into<String>,
1657 ) -> Self {
1658 Self {
1659 url: url.into(),
1660 queue_manager: queue_manager.into(),
1661 channel: channel.into(),
1662 disable_status_inq: false,
1663 ..Default::default()
1664 }
1665 }
1666
1667 pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1668 self.queue = Some(queue.into());
1669 self
1670 }
1671
1672 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1673 self.topic = Some(topic.into());
1674 self
1675 }
1676
1677 pub fn with_credentials(
1678 mut self,
1679 username: impl Into<String>,
1680 password: impl Into<String>,
1681 ) -> Self {
1682 self.username = Some(username.into());
1683 self.password = Some(password.into());
1684 self
1685 }
1686}
1687
1688fn default_max_message_size() -> usize {
1689 4 * 1024 * 1024 }
1691
1692fn default_wait_timeout_ms() -> i32 {
1693 1000 }
1695
1696#[derive(Debug, Deserialize, Serialize, Clone)]
1699#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1700#[serde(deny_unknown_fields)]
1701pub struct SwitchConfig {
1702 pub metadata_key: String,
1704 pub cases: HashMap<String, Endpoint>,
1706 pub default: Option<Box<Endpoint>>,
1708}
1709
1710#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1712#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1713#[serde(deny_unknown_fields)]
1714pub struct ResponseConfig {
1715 }
1717
1718#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1722#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1723#[serde(deny_unknown_fields)]
1724pub struct SqlxConfig {
1725 pub url: String,
1727 #[serde(default)]
1729 pub username: Option<String>,
1730 #[serde(default)]
1732 pub password: Option<String>,
1733 pub table: String,
1735 pub insert_query: Option<String>,
1738 pub select_query: Option<String>,
1742 #[serde(default)]
1744 pub delete_after_read: bool,
1745 #[serde(default)]
1747 pub auto_create_table: bool,
1748 pub polling_interval_ms: Option<u64>,
1750 #[serde(default)]
1752 pub tls: TlsConfig,
1753 pub max_connections: Option<u32>,
1755 pub min_connections: Option<u32>,
1757 pub acquire_timeout_ms: Option<u64>,
1759 pub idle_timeout_ms: Option<u64>,
1761 pub max_lifetime_ms: Option<u64>,
1763}
1764
1765#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq, Hash)]
1786#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1787#[serde(deny_unknown_fields)]
1788pub struct TlsConfig {
1789 #[serde(default, deserialize_with = "deserialize_null_as_false")]
1791 pub required: bool,
1792 pub ca_file: Option<String>,
1794 pub cert_file: Option<String>,
1796 pub key_file: Option<String>,
1798 pub cert_password: Option<String>,
1800 #[serde(default)]
1802 pub accept_invalid_certs: bool,
1803}
1804
1805impl TlsConfig {
1806 pub fn new() -> Self {
1808 Self::default()
1809 }
1810
1811 pub fn with_ca_file(mut self, ca_file: impl Into<String>) -> Self {
1812 self.ca_file = Some(ca_file.into());
1813 self.required = true;
1814 self
1815 }
1816
1817 pub fn with_client_cert(
1818 mut self,
1819 cert_file: impl Into<String>,
1820 key_file: impl Into<String>,
1821 ) -> Self {
1822 self.cert_file = Some(cert_file.into());
1823 self.key_file = Some(key_file.into());
1824 self.required = true;
1825 self
1826 }
1827
1828 pub fn with_insecure(mut self, accept_invalid_certs: bool) -> Self {
1829 self.accept_invalid_certs = accept_invalid_certs;
1830 self
1831 }
1832
1833 pub fn is_mtls_client_configured(&self) -> bool {
1835 self.required && self.cert_file.is_some() && self.key_file.is_some()
1836 }
1837
1838 pub fn is_tls_server_configured(&self) -> bool {
1840 self.required && self.cert_file.is_some() && self.key_file.is_some()
1841 }
1842
1843 pub fn is_tls_client_configured(&self) -> bool {
1845 self.required
1846 || self.ca_file.is_some()
1847 || (self.cert_file.is_some() && self.key_file.is_some())
1848 }
1849
1850 pub fn normalize_url(&self, url: &str) -> String {
1852 if url
1853 .get(..7)
1854 .is_some_and(|prefix| prefix.eq_ignore_ascii_case("http://"))
1855 || url
1856 .get(..8)
1857 .is_some_and(|prefix| prefix.eq_ignore_ascii_case("https://"))
1858 {
1859 url.to_string()
1860 } else {
1861 let is_tls = self.required;
1862 let scheme = if is_tls { "https" } else { "http" };
1863 format!("{}://{}", scheme, url)
1864 }
1865 }
1866}
1867
1868pub trait SecretExtractor {
1870 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>);
1872}
1873
1874fn extract_sensitive_string_map_entries(
1875 values: &mut HashMap<String, String>,
1876 prefix: &str,
1877 field_name: &str,
1878 secrets: &mut HashMap<String, String>,
1879) {
1880 let secret_keys = values
1881 .keys()
1882 .filter(|key| {
1883 let key = key.to_ascii_lowercase();
1884 key.contains("key") || key.contains("token") || key.contains("auth")
1885 })
1886 .cloned()
1887 .collect::<Vec<_>>();
1888
1889 for key in secret_keys {
1890 if let Some(value) = values.remove(&key) {
1891 secrets.insert(
1892 sanitize_secret_key(&format!("{}__{}__{}", prefix, field_name, key)),
1893 value,
1894 );
1895 }
1896 }
1897}
1898
1899fn url_has_userinfo(url: &str) -> bool {
1900 let Some(authority_start) = url.find("://").map(|idx| idx + 3) else {
1901 return false;
1902 };
1903 let authority_end = url[authority_start..]
1904 .find(['/', '?', '#'])
1905 .map(|idx| authority_start + idx)
1906 .unwrap_or(url.len());
1907 url[authority_start..authority_end].contains('@')
1908}
1909
1910fn sanitize_secret_key(key: &str) -> String {
1911 key.chars()
1912 .map(|ch| {
1913 let ch = ch.to_ascii_uppercase();
1914 if ch.is_ascii_alphanumeric() || ch == '_' {
1915 ch
1916 } else {
1917 '_'
1918 }
1919 })
1920 .collect()
1921}
1922
1923fn extract_sensitive_url(
1924 url: &mut String,
1925 prefix: &str,
1926 field_name: &str,
1927 secrets: &mut HashMap<String, String>,
1928) {
1929 if !url.is_empty() && url_has_userinfo(url) {
1930 secrets.insert(
1931 sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
1932 std::mem::take(url),
1933 );
1934 }
1935}
1936
1937fn extract_sensitive_optional_url(
1938 url: &mut Option<String>,
1939 prefix: &str,
1940 field_name: &str,
1941 secrets: &mut HashMap<String, String>,
1942) {
1943 if url.as_ref().is_some_and(|url| url_has_userinfo(url)) {
1944 if let Some(url) = url.take() {
1945 secrets.insert(
1946 sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
1947 url,
1948 );
1949 }
1950 }
1951}
1952
1953impl SecretExtractor for Route {
1954 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
1955 self.input
1956 .extract_secrets(&format!("{}__{}", prefix, "INPUT"), secrets);
1957 self.output
1958 .extract_secrets(&format!("{}__{}", prefix, "OUTPUT"), secrets);
1959 }
1960}
1961
1962impl SecretExtractor for Endpoint {
1963 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
1964 for (i, middleware) in self.middlewares.iter_mut().enumerate() {
1965 middleware.extract_secrets(&format!("{}__{}__{}", prefix, "MIDDLEWARES", i), secrets);
1966 }
1967 self.endpoint_type.extract_secrets(prefix, secrets);
1968 }
1969}
1970
1971impl SecretExtractor for EndpointType {
1972 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
1973 match self {
1974 EndpointType::Aws(cfg) => {
1975 cfg.extract_secrets(&format!("{}__{}", prefix, "AWS"), secrets)
1976 }
1977 EndpointType::Kafka(cfg) => {
1978 cfg.extract_secrets(&format!("{}__{}", prefix, "KAFKA"), secrets)
1979 }
1980 EndpointType::Nats(cfg) => {
1981 cfg.extract_secrets(&format!("{}__{}", prefix, "NATS"), secrets)
1982 }
1983 EndpointType::Amqp(cfg) => {
1984 cfg.extract_secrets(&format!("{}__{}", prefix, "AMQP"), secrets)
1985 }
1986 EndpointType::MongoDb(cfg) => {
1987 cfg.extract_secrets(&format!("{}__{}", prefix, "MONGODB"), secrets)
1988 }
1989 EndpointType::Mqtt(cfg) => {
1990 cfg.extract_secrets(&format!("{}__{}", prefix, "MQTT"), secrets)
1991 }
1992 EndpointType::Http(cfg) => {
1993 cfg.extract_secrets(&format!("{}__{}", prefix, "HTTP"), secrets)
1994 }
1995 EndpointType::IbmMq(cfg) => {
1996 cfg.extract_secrets(&format!("{}__{}", prefix, "IBMMQ"), secrets)
1997 }
1998 EndpointType::ZeroMq(cfg) => {
1999 cfg.extract_secrets(&format!("{}__{}", prefix, "ZEROMQ"), secrets)
2000 }
2001 EndpointType::Sqlx(cfg) => {
2002 cfg.extract_secrets(&format!("{}__{}", prefix, "SQLX"), secrets)
2003 }
2004 EndpointType::Grpc(cfg) => {
2005 cfg.extract_secrets(&format!("{}__{}", prefix, "GRPC"), secrets)
2006 }
2007 EndpointType::Fanout(endpoints) => {
2008 for (i, ep) in endpoints.iter_mut().enumerate() {
2009 ep.extract_secrets(&format!("{}__{}__{}", prefix, "FANOUT", i), secrets);
2010 }
2011 }
2012 EndpointType::Switch(cfg) => {
2013 for (key, ep) in cfg.cases.iter_mut() {
2014 ep.extract_secrets(
2015 &format!(
2016 "{}__{}__{}",
2017 prefix,
2018 "SWITCH__CASES",
2019 sanitize_secret_key(key)
2020 ),
2021 secrets,
2022 );
2023 }
2024 if let Some(default) = &mut cfg.default {
2025 default.extract_secrets(&format!("{}__{}", prefix, "SWITCH__DEFAULT"), secrets);
2026 }
2027 }
2028 EndpointType::Reader(ep) => {
2029 ep.extract_secrets(&format!("{}__{}", prefix, "READER"), secrets)
2030 }
2031 _ => {}
2032 }
2033 }
2034}
2035
2036impl SecretExtractor for Middleware {
2037 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2038 if let Middleware::Dlq(cfg) = self {
2039 cfg.endpoint
2040 .extract_secrets(&format!("{}__{}__{}", prefix, "DLQ", "ENDPOINT"), secrets);
2041 }
2042 }
2043}
2044
2045impl SecretExtractor for AwsConfig {
2046 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2047 if let Some(val) = self.access_key.take() {
2048 secrets.insert(format!("{}__{}", prefix, "ACCESS_KEY"), val);
2049 }
2050 if let Some(val) = self.secret_key.take() {
2051 secrets.insert(format!("{}__{}", prefix, "SECRET_KEY"), val);
2052 }
2053 if let Some(val) = self.session_token.take() {
2054 secrets.insert(format!("{}__{}", prefix, "SESSION_TOKEN"), val);
2055 }
2056 extract_sensitive_optional_url(&mut self.queue_url, prefix, "QUEUE_URL", secrets);
2057 extract_sensitive_optional_url(&mut self.endpoint_url, prefix, "ENDPOINT_URL", secrets);
2058 }
2059}
2060
2061impl SecretExtractor for KafkaConfig {
2062 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2063 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2064 if let Some(val) = self.username.take() {
2065 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2066 }
2067 if let Some(val) = self.password.take() {
2068 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2069 }
2070 self.tls
2071 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2072 }
2073}
2074
2075impl SecretExtractor for NatsConfig {
2076 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2077 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2078 if let Some(val) = self.username.take() {
2079 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2080 }
2081 if let Some(val) = self.password.take() {
2082 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2083 }
2084 if let Some(val) = self.token.take() {
2085 secrets.insert(format!("{}__{}", prefix, "TOKEN"), val);
2086 }
2087 self.tls
2088 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2089 }
2090}
2091
2092impl SecretExtractor for AmqpConfig {
2093 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2094 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2095 if let Some(val) = self.username.take() {
2096 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2097 }
2098 if let Some(val) = self.password.take() {
2099 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2100 }
2101 self.tls
2102 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2103 }
2104}
2105
2106impl SecretExtractor for MongoDbConfig {
2107 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2108 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2109 if let Some(val) = self.username.take() {
2110 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2111 }
2112 if let Some(val) = self.password.take() {
2113 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2114 }
2115 self.tls
2116 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2117 }
2118}
2119
2120impl SecretExtractor for MqttConfig {
2121 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2122 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2123 if let Some(val) = self.username.take() {
2124 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2125 }
2126 if let Some(val) = self.password.take() {
2127 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2128 }
2129 self.tls
2130 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2131 }
2132}
2133
2134impl SecretExtractor for HttpConfig {
2135 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2136 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2137 if let Some((u, p)) = self.basic_auth.take() {
2138 secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 0), u);
2139 secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 1), p);
2140 }
2141 extract_sensitive_string_map_entries(
2142 &mut self.custom_headers,
2143 prefix,
2144 "CUSTOM_HEADERS",
2145 secrets,
2146 );
2147 self.tls
2148 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2149 }
2150}
2151
2152impl SecretExtractor for IbmMqConfig {
2153 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2154 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2155 if let Some(val) = self.username.take() {
2156 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2157 }
2158 if let Some(val) = self.password.take() {
2159 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2160 }
2161 self.tls
2162 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2163 }
2164}
2165
2166impl SecretExtractor for ZeroMqConfig {
2167 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2168 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2169 }
2170}
2171
2172impl SecretExtractor for SqlxConfig {
2173 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2174 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2175 if let Some(val) = self.username.take() {
2176 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2177 }
2178 if let Some(val) = self.password.take() {
2179 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2180 }
2181 self.tls
2182 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2183 }
2184}
2185
2186impl SecretExtractor for GrpcConfig {
2187 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2188 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2189 self.tls
2190 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2191 }
2192}
2193
2194impl SecretExtractor for TlsConfig {
2195 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2196 if let Some(val) = self.cert_password.take() {
2197 secrets.insert(format!("{}__{}", prefix, "CERT_PASSWORD"), val);
2198 }
2199 }
2200}
2201
2202pub fn extract_config_secrets(config: &mut Config) -> HashMap<String, String> {
2209 let mut secrets = HashMap::new();
2210 for (route_name, route) in config.iter_mut() {
2211 let prefix = sanitize_secret_key(&format!("MQB__{}", route_name));
2212 route.extract_secrets(&prefix, &mut secrets);
2213 }
2214 secrets
2215}
2216
2217#[cfg(test)]
2218mod tests {
2219 use super::*;
2220 use config::{Config as ConfigBuilder, Environment};
2221
2222 const TEST_YAML: &str = r#"
2223kafka_to_nats:
2224 concurrency: 10
2225 input:
2226 middlewares:
2227 - deduplication:
2228 sled_path: "/tmp/mq-bridge/dedup_db"
2229 ttl_seconds: 3600
2230 - metrics: {}
2231 - retry:
2232 max_attempts: 5
2233 initial_interval_ms: 200
2234 - random_panic:
2235 mode: nack
2236 - dlq:
2237 endpoint:
2238 nats:
2239 subject: "dlq-subject"
2240 url: "nats://localhost:4222"
2241 kafka:
2242 topic: "input-topic"
2243 url: "localhost:9092"
2244 group_id: "my-consumer-group"
2245 tls:
2246 required: true
2247 ca_file: "/path_to_ca"
2248 cert_file: "/path_to_cert"
2249 key_file: "/path_to_key"
2250 cert_password: "password"
2251 accept_invalid_certs: true
2252 output:
2253 middlewares:
2254 - metrics: {}
2255 - dlq:
2256 endpoint:
2257 file:
2258 path: "error.out"
2259 nats:
2260 subject: "output-subject"
2261 url: "nats://localhost:4222"
2262"#;
2263
2264 fn assert_config_values(config: &Config) {
2265 assert_eq!(config.len(), 1);
2266 let route = config.get("kafka_to_nats").expect("Route should exist");
2267
2268 assert_eq!(route.options.concurrency, 10);
2269
2270 let input = &route.input;
2272 assert_eq!(input.middlewares.len(), 5);
2273
2274 let mut has_dedup = false;
2275 let mut has_metrics = false;
2276 let mut has_dlq = false;
2277 let mut has_retry = false;
2278 let mut has_random_panic = false;
2279 for middleware in &input.middlewares {
2280 match middleware {
2281 Middleware::Deduplication(dedup) => {
2282 assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
2283 assert_eq!(dedup.ttl_seconds, 3600);
2284 has_dedup = true;
2285 }
2286 Middleware::Metrics(_) => {
2287 has_metrics = true;
2288 }
2289 Middleware::Custom { .. } => {}
2290 Middleware::Dlq(dlq) => {
2291 assert!(dlq.endpoint.middlewares.is_empty());
2292 if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
2293 assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
2294 assert_eq!(nats_cfg.url, "nats://localhost:4222");
2295 }
2296 has_dlq = true;
2297 }
2298 Middleware::Retry(retry) => {
2299 assert_eq!(retry.max_attempts, 5);
2300 assert_eq!(retry.initial_interval_ms, 200);
2301 has_retry = true;
2302 }
2303 Middleware::RandomPanic(rp) => {
2304 assert!(rp.mode == FaultMode::Nack);
2305 has_random_panic = true;
2306 }
2307 Middleware::Delay(_) => {}
2308 Middleware::WeakJoin(_) => {}
2309 }
2310 }
2311
2312 if let EndpointType::Kafka(kafka) = &input.endpoint_type {
2313 assert_eq!(kafka.topic, Some("input-topic".to_string()));
2314 assert_eq!(kafka.url, "localhost:9092");
2315 assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
2316 let tls = &kafka.tls;
2317 assert!(tls.required);
2318 assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
2319 assert!(tls.accept_invalid_certs);
2320 } else {
2321 panic!("Input endpoint should be Kafka");
2322 }
2323 assert!(has_dedup);
2324 assert!(has_metrics);
2325 assert!(has_dlq);
2326 assert!(has_retry);
2327 assert!(has_random_panic);
2328
2329 let output = &route.output;
2331 assert_eq!(output.middlewares.len(), 2);
2332 assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
2333
2334 if let EndpointType::Nats(nats) = &output.endpoint_type {
2335 assert_eq!(nats.subject, Some("output-subject".to_string()));
2336 assert_eq!(nats.url, "nats://localhost:4222");
2337 } else {
2338 panic!("Output endpoint should be NATS");
2339 }
2340 }
2341
2342 #[test]
2343 fn test_deserialize_from_yaml() {
2344 let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
2347 println!("Deserialized from YAML: {:#?}", result);
2348 let config = result.expect("Failed to deserialize TEST_YAML");
2349 assert_config_values(&config);
2350 }
2351
2352 #[test]
2353 fn test_deserialize_from_env() {
2354 unsafe {
2356 std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
2357 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
2358 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
2359 std::env::set_var(
2360 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
2361 "my-consumer-group",
2362 );
2363 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
2364 std::env::set_var(
2365 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
2366 "/path_to_ca",
2367 );
2368 std::env::set_var(
2369 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
2370 "true",
2371 );
2372 std::env::set_var(
2373 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
2374 "output-subject",
2375 );
2376 std::env::set_var(
2377 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
2378 "nats://localhost:4222",
2379 );
2380 std::env::set_var(
2381 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
2382 "dlq-subject",
2383 );
2384 std::env::set_var(
2385 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
2386 "nats://localhost:4222",
2387 );
2388 }
2389
2390 let builder = ConfigBuilder::builder()
2391 .add_source(
2393 Environment::with_prefix("MQB")
2394 .separator("__")
2395 .try_parsing(true),
2396 );
2397
2398 let config: Config = builder
2399 .build()
2400 .expect("Failed to build config")
2401 .try_deserialize()
2402 .expect("Failed to deserialize config");
2403
2404 assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
2406 if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
2407 assert_eq!(k.topic, Some("input-topic".to_string()));
2408 assert!(k.tls.required);
2409 } else {
2410 panic!("Expected Kafka endpoint");
2411 }
2412
2413 let input = &config.get("kafka_to_nats").unwrap().input;
2414 assert_eq!(input.middlewares.len(), 1);
2415 if let Middleware::Dlq(_) = &input.middlewares[0] {
2416 } else {
2418 panic!("Expected DLQ middleware");
2419 }
2420 }
2421
2422 #[test]
2423 fn test_extract_secrets() {
2424 let mut config = Config::new();
2425 let mut route = Route::default();
2426
2427 let mut kafka_config = KafkaConfig::new("kafka://user:pass@localhost:9092");
2429 kafka_config.username = Some("user".to_string());
2430 kafka_config.password = Some("pass".to_string());
2431 kafka_config.tls.cert_password = Some("certpass".to_string());
2432
2433 route.input = Endpoint {
2434 endpoint_type: EndpointType::Kafka(kafka_config),
2435 middlewares: vec![],
2436 handler: None,
2437 };
2438
2439 let mut http_config = HttpConfig::new("http://httpuser:httppass@localhost");
2441 http_config.basic_auth = Some(("httpuser".to_string(), "httppass".to_string()));
2442 http_config
2443 .custom_headers
2444 .insert("X-API-Key".to_string(), "http-api-key".to_string());
2445 http_config.custom_headers.insert(
2446 "X-Access-Token".to_string(),
2447 "http-access-token".to_string(),
2448 );
2449 http_config.custom_headers.insert(
2450 "X-Authentication".to_string(),
2451 "http-authentication".to_string(),
2452 );
2453 http_config.custom_headers.insert(
2454 "Authorization".to_string(),
2455 "Bearer secret-token".to_string(),
2456 );
2457 http_config
2458 .custom_headers
2459 .insert("X-Trace-Id".to_string(), "trace-value".to_string());
2460
2461 route.output = Endpoint {
2462 endpoint_type: EndpointType::Http(http_config),
2463 middlewares: vec![],
2464 handler: None,
2465 };
2466
2467 config.insert("test_route".to_string(), route);
2468
2469 let secrets = extract_config_secrets(&mut config);
2470
2471 assert_eq!(
2473 secrets
2474 .get("MQB__TEST_ROUTE__INPUT__KAFKA__URL")
2475 .map(|s| s.as_str()),
2476 Some("kafka://user:pass@localhost:9092")
2477 );
2478 assert_eq!(
2479 secrets
2480 .get("MQB__TEST_ROUTE__INPUT__KAFKA__USERNAME")
2481 .map(|s| s.as_str()),
2482 Some("user")
2483 );
2484 assert_eq!(
2485 secrets
2486 .get("MQB__TEST_ROUTE__INPUT__KAFKA__PASSWORD")
2487 .map(|s| s.as_str()),
2488 Some("pass")
2489 );
2490 assert_eq!(
2491 secrets
2492 .get("MQB__TEST_ROUTE__INPUT__KAFKA__TLS__CERT_PASSWORD")
2493 .map(|s| s.as_str()),
2494 Some("certpass")
2495 );
2496 assert_eq!(
2497 secrets
2498 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__URL")
2499 .map(|s| s.as_str()),
2500 Some("http://httpuser:httppass@localhost")
2501 );
2502 assert_eq!(
2503 secrets
2504 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__0")
2505 .map(|s| s.as_str()),
2506 Some("httpuser")
2507 );
2508 assert_eq!(
2509 secrets
2510 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__1")
2511 .map(|s| s.as_str()),
2512 Some("httppass")
2513 );
2514 assert_eq!(
2515 secrets
2516 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_API_KEY")
2517 .map(|s| s.as_str()),
2518 Some("http-api-key")
2519 );
2520 assert_eq!(
2521 secrets
2522 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_ACCESS_TOKEN")
2523 .map(|s| s.as_str()),
2524 Some("http-access-token")
2525 );
2526 assert_eq!(
2527 secrets
2528 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_AUTHENTICATION")
2529 .map(|s| s.as_str()),
2530 Some("http-authentication")
2531 );
2532 assert_eq!(
2533 secrets
2534 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__AUTHORIZATION")
2535 .map(|s| s.as_str()),
2536 Some("Bearer secret-token")
2537 );
2538
2539 let route = config.get("test_route").unwrap();
2541 if let EndpointType::Kafka(k) = &route.input.endpoint_type {
2542 assert!(k.url.is_empty());
2543 assert!(k.username.is_none());
2544 assert!(k.password.is_none());
2545 assert!(k.tls.cert_password.is_none());
2546 }
2547 if let EndpointType::Http(h) = &route.output.endpoint_type {
2548 assert!(h.url.is_empty());
2549 assert!(h.basic_auth.is_none());
2550 assert!(!h.custom_headers.contains_key("X-API-Key"));
2551 assert!(!h.custom_headers.contains_key("X-Access-Token"));
2552 assert!(!h.custom_headers.contains_key("X-Authentication"));
2553 assert!(!h.custom_headers.contains_key("Authorization"));
2554 assert_eq!(
2555 h.custom_headers.get("X-Trace-Id").map(|s| s.as_str()),
2556 Some("trace-value")
2557 );
2558 }
2559 }
2560
2561 #[test]
2562 fn test_extract_sensitive_url_only_strips_authority_credentials() {
2563 let mut config = Config::new();
2564 let path_at_route = Route {
2565 output: Endpoint {
2566 endpoint_type: EndpointType::Http(HttpConfig::new(
2567 "https://example.com/path/user@example.com?email=a@b.test",
2568 )),
2569 middlewares: vec![],
2570 handler: None,
2571 },
2572 ..Default::default()
2573 };
2574 config.insert("path_at_route".to_string(), path_at_route);
2575
2576 let credential_route = Route {
2577 output: Endpoint {
2578 endpoint_type: EndpointType::Http(HttpConfig::new(
2579 "https://user:pass@example.com/path",
2580 )),
2581 middlewares: vec![],
2582 handler: None,
2583 },
2584 ..Default::default()
2585 };
2586 config.insert("credential_route".to_string(), credential_route);
2587
2588 let query_at_route = Route {
2589 output: Endpoint {
2590 endpoint_type: EndpointType::Http(HttpConfig::new(
2591 "https://example.com?next=a@b.test",
2592 )),
2593 middlewares: vec![],
2594 handler: None,
2595 },
2596 ..Default::default()
2597 };
2598 config.insert("query_at_route".to_string(), query_at_route);
2599
2600 let fragment_at_route = Route {
2601 output: Endpoint {
2602 endpoint_type: EndpointType::Http(HttpConfig::new(
2603 "https://example.com#user@example.com",
2604 )),
2605 middlewares: vec![],
2606 handler: None,
2607 },
2608 ..Default::default()
2609 };
2610 config.insert("fragment_at_route".to_string(), fragment_at_route);
2611
2612 let secrets = extract_config_secrets(&mut config);
2613
2614 if let EndpointType::Http(http) = &config.get("path_at_route").unwrap().output.endpoint_type
2615 {
2616 assert_eq!(
2617 http.url,
2618 "https://example.com/path/user@example.com?email=a@b.test"
2619 );
2620 }
2621 if let EndpointType::Http(http) =
2622 &config.get("query_at_route").unwrap().output.endpoint_type
2623 {
2624 assert_eq!(http.url, "https://example.com?next=a@b.test");
2625 }
2626 if let EndpointType::Http(http) = &config
2627 .get("fragment_at_route")
2628 .unwrap()
2629 .output
2630 .endpoint_type
2631 {
2632 assert_eq!(http.url, "https://example.com#user@example.com");
2633 }
2634 if let EndpointType::Http(http) =
2635 &config.get("credential_route").unwrap().output.endpoint_type
2636 {
2637 assert!(http.url.is_empty());
2638 }
2639 assert_eq!(
2640 secrets
2641 .get("MQB__CREDENTIAL_ROUTE__OUTPUT__HTTP__URL")
2642 .map(String::as_str),
2643 Some("https://user:pass@example.com/path")
2644 );
2645 assert!(!secrets.contains_key("MQB__PATH_AT_ROUTE__OUTPUT__HTTP__URL"));
2646 assert!(!secrets.contains_key("MQB__QUERY_AT_ROUTE__OUTPUT__HTTP__URL"));
2647 assert!(!secrets.contains_key("MQB__FRAGMENT_AT_ROUTE__OUTPUT__HTTP__URL"));
2648 }
2649
2650 #[test]
2651 fn test_file_config_inference() {
2652 let yaml = r#"
2653mode: group_subscribe
2654path: "/tmp/test"
2655group_id: "my_group"
2656"#;
2657 let config: FileConfig = serde_yaml_ng::from_str(yaml).unwrap();
2658 match config.mode {
2659 Some(FileConsumerMode::GroupSubscribe { group_id, .. }) => {
2660 assert_eq!(group_id, "my_group")
2661 }
2662 _ => panic!("Expected GroupSubscribe"),
2663 }
2664
2665 let yaml_queue = r#"
2666mode: consume
2667path: "/tmp/test"
2668"#;
2669 let config_queue: FileConfig = serde_yaml_ng::from_str(yaml_queue).unwrap();
2670 match config_queue.mode {
2671 Some(FileConsumerMode::Consume { delete }) => assert!(!delete),
2672 _ => panic!("Expected Consume"),
2673 }
2674 }
2675}
2676
2677#[cfg(all(test, feature = "schema"))]
2678mod schema_tests {
2679 use super::*;
2680
2681 #[test]
2682 fn generate_json_schema() {
2683 let schema = schemars::schema_for!(Config);
2684 let schema_json = serde_json::to_string_pretty(&schema).unwrap();
2685
2686 let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2687 path.push("mq-bridge.schema.json");
2688 std::fs::write(path, schema_json).expect("Failed to write schema file");
2689 }
2690}