1use serde::{
7 de::{MapAccess, Visitor},
8 Deserialize, Deserializer, Serialize,
9};
10use std::{
11 collections::HashMap,
12 sync::{atomic::AtomicUsize, Arc},
13};
14
15use crate::traits::Handler;
16use tracing::trace;
17
18pub type Config = HashMap<String, Route>;
80
81pub type PublisherConfig = HashMap<String, Endpoint>;
84
85#[derive(Debug, Deserialize, Serialize, Clone)]
87#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
88#[serde(deny_unknown_fields)]
89pub struct Route {
90 pub input: Endpoint,
92 #[serde(default = "default_output_endpoint")]
94 pub output: Endpoint,
95 #[serde(flatten, default)]
97 pub options: RouteOptions,
98}
99
100impl Default for Route {
101 fn default() -> Self {
102 Self {
103 input: Endpoint::null(),
104 output: Endpoint::null(),
105 options: RouteOptions::default(),
106 }
107 }
108}
109
110#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
127#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
128#[serde(deny_unknown_fields)]
129pub struct RouteOptions {
130 #[serde(default, skip_serializing_if = "String::is_empty")]
132 pub description: String,
133 #[serde(default = "default_concurrency")]
137 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
138 pub concurrency: usize,
139 #[serde(default = "default_batch_size")]
143 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
144 pub batch_size: usize,
145 #[serde(default = "default_commit_concurrency_limit")]
149 pub commit_concurrency_limit: usize,
150}
151
152impl Default for RouteOptions {
153 fn default() -> Self {
154 Self {
155 description: String::new(),
156 concurrency: default_concurrency(),
157 batch_size: default_batch_size(),
158 commit_concurrency_limit: default_commit_concurrency_limit(),
159 }
160 }
161}
162
163pub(crate) fn default_concurrency() -> usize {
164 1
165}
166
167pub(crate) fn default_batch_size() -> usize {
168 1
169}
170
171pub(crate) fn default_commit_concurrency_limit() -> usize {
172 4096
173}
174
175fn default_output_endpoint() -> Endpoint {
176 Endpoint::new(EndpointType::Null)
177}
178
179fn default_retry_attempts() -> usize {
180 3
181}
182fn default_initial_interval_ms() -> u64 {
183 100
184}
185fn default_max_interval_ms() -> u64 {
186 5000
187}
188fn default_multiplier() -> f64 {
189 2.0
190}
191fn default_clean_session() -> bool {
192 false
193}
194fn default_cookie_metadata_key() -> String {
195 "cookie".to_string()
196}
197fn default_set_cookie_metadata_key() -> String {
198 "set-cookie".to_string()
199}
200
201fn is_known_endpoint_name(name: &str) -> bool {
202 matches!(
203 name,
204 "aws"
205 | "kafka"
206 | "nats"
207 | "file"
208 | "static"
209 | "memory"
210 | "sled"
211 | "amqp"
212 | "mongodb"
213 | "mqtt"
214 | "http"
215 | "websocket"
216 | "ibmmq"
217 | "zeromq"
218 | "grpc"
219 | "fanout"
220 | "ref"
221 | "switch"
222 | "response"
223 | "reader"
224 | "null"
225 | "sqlx"
226 )
227}
228
229#[derive(Serialize, Clone, Default)]
231#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
232#[serde(deny_unknown_fields)]
233pub struct Endpoint {
234 #[serde(default)]
236 pub middlewares: Vec<Middleware>,
237
238 #[serde(flatten)]
240 pub endpoint_type: EndpointType,
241
242 #[serde(skip_serializing)]
243 #[cfg_attr(feature = "schema", schemars(skip))]
244 pub handler: Option<Arc<dyn Handler>>,
246}
247
248impl std::fmt::Debug for Endpoint {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 f.debug_struct("Endpoint")
251 .field("middlewares", &self.middlewares)
252 .field("endpoint_type", &self.endpoint_type)
253 .field(
254 "handler",
255 &if self.handler.is_some() {
256 "Some(<Handler>)"
257 } else {
258 "None"
259 },
260 )
261 .finish()
262 }
263}
264
265impl<'de> Deserialize<'de> for Endpoint {
266 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
267 where
268 D: Deserializer<'de>,
269 {
270 struct EndpointVisitor;
271
272 impl<'de> Visitor<'de> for EndpointVisitor {
273 type Value = Endpoint;
274
275 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
276 formatter.write_str("a map representing an endpoint or null")
277 }
278
279 fn visit_unit<E>(self) -> Result<Self::Value, E>
280 where
281 E: serde::de::Error,
282 {
283 Ok(Endpoint::new(EndpointType::Null))
284 }
285
286 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
287 where
288 A: MapAccess<'de>,
289 {
290 let mut temp_map = serde_json::Map::new();
293 let mut middlewares_val = None;
294
295 while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
296 if key == "middlewares" {
297 middlewares_val = Some(value);
298 } else {
299 temp_map.insert(key, value);
300 }
301 }
302
303 let temp_val = serde_json::Value::Object(temp_map);
305 let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
306 Ok(et) => et,
307 Err(original_err) => {
308 if let serde_json::Value::Object(map) = &temp_val {
309 if map.len() == 1 {
310 let (name, config) = map.iter().next().unwrap();
311 if is_known_endpoint_name(name) {
312 return Err(serde::de::Error::custom(original_err));
313 }
314 trace!("Falling back to Custom endpoint for key: {}", name);
315 EndpointType::Custom {
316 name: name.clone(),
317 config: config.clone(),
318 }
319 } else if map.is_empty() {
320 EndpointType::Null
321 } else {
322 return Err(serde::de::Error::custom(
323 "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
324 ));
325 }
326 } else {
327 return Err(serde::de::Error::custom("Invalid endpoint configuration"));
328 }
329 }
330 };
331
332 let middlewares = match middlewares_val {
334 Some(val) => {
335 deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
336 }
337 None => Vec::new(),
338 };
339
340 Ok(Endpoint {
341 middlewares,
342 endpoint_type,
343 handler: None,
344 })
345 }
346 }
347
348 deserializer.deserialize_any(EndpointVisitor)
349 }
350}
351
352fn is_known_middleware_name(name: &str) -> bool {
353 matches!(
354 name,
355 "deduplication"
356 | "metrics"
357 | "dlq"
358 | "retry"
359 | "random_panic"
360 | "delay"
361 | "weak_join"
362 | "limiter"
363 | "buffer"
364 | "cookie_jar"
365 | "custom"
366 )
367}
368
369fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
373 let arr = match value {
374 serde_json::Value::Array(arr) => arr,
375 serde_json::Value::Object(map) => {
376 let mut middlewares: Vec<_> = map
377 .into_iter()
378 .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
381 .collect();
382 middlewares.sort_by_key(|(index, _)| *index);
383
384 middlewares.into_iter().map(|(_, value)| value).collect()
385 }
386 _ => return Err(anyhow::anyhow!("Expected an array or object")),
387 };
388
389 let mut middlewares = Vec::new();
390 for item in arr {
391 let known_name = if let serde_json::Value::Object(map) = &item {
393 if map.len() == 1 {
394 let (name, _) = map.iter().next().unwrap();
395 if is_known_middleware_name(name) {
396 Some(name.clone())
397 } else {
398 None
399 }
400 } else {
401 None
402 }
403 } else {
404 None
405 };
406
407 if let Some(name) = known_name {
408 match serde_json::from_value::<Middleware>(item.clone()) {
409 Ok(m) => middlewares.push(m),
410 Err(e) => {
411 return Err(anyhow::anyhow!(
412 "Failed to deserialize known middleware '{}': {}",
413 name,
414 e
415 ))
416 }
417 }
418 } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
419 middlewares.push(m);
420 } else if let serde_json::Value::Object(map) = &item {
421 if map.len() == 1 {
422 let (name, config) = map.iter().next().unwrap();
423 middlewares.push(Middleware::Custom {
424 name: name.clone(),
425 config: config.clone(),
426 });
427 } else {
428 return Err(anyhow::anyhow!(
429 "Invalid middleware configuration: {:?}",
430 item
431 ));
432 }
433 } else {
434 return Err(anyhow::anyhow!(
435 "Invalid middleware configuration: {:?}",
436 item
437 ));
438 }
439 }
440 Ok(middlewares)
441}
442
443#[derive(Debug, Deserialize, Serialize, Clone, Default)]
465#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
466#[serde(rename_all = "lowercase")]
467pub enum EndpointType {
468 Aws(AwsConfig),
469 Kafka(KafkaConfig),
470 Nats(NatsConfig),
471 File(FileConfig),
472 Static(String),
473 Ref(String),
474 Memory(MemoryConfig),
475 Sled(SledConfig),
476 Amqp(AmqpConfig),
477 MongoDb(MongoDbConfig),
478 Mqtt(MqttConfig),
479 Http(HttpConfig),
480 WebSocket(WebSocketConfig),
481 IbmMq(IbmMqConfig),
482 ZeroMq(ZeroMqConfig),
483 Grpc(GrpcConfig),
484 Sqlx(SqlxConfig),
485 Fanout(Vec<Endpoint>),
486 Switch(SwitchConfig),
487 Response(ResponseConfig),
488 Reader(Box<Endpoint>),
489 Custom {
490 name: String,
491 config: serde_json::Value,
492 },
493 #[default]
494 Null,
495}
496
497impl EndpointType {
498 pub fn name(&self) -> &'static str {
499 match self {
500 EndpointType::Aws(_) => "aws",
501 EndpointType::Kafka(_) => "kafka",
502 EndpointType::Nats(_) => "nats",
503 EndpointType::File(_) => "file",
504 EndpointType::Static(_) => "static",
505 EndpointType::Ref(_) => "ref",
506 EndpointType::Memory(_) => "memory",
507 EndpointType::Sled(_) => "sled",
508 EndpointType::Amqp(_) => "amqp",
509 EndpointType::MongoDb(_) => "mongodb",
510 EndpointType::Mqtt(_) => "mqtt",
511 EndpointType::Http(_) => "http",
512 EndpointType::WebSocket(_) => "websocket",
513 EndpointType::IbmMq(_) => "ibmmq",
514 EndpointType::ZeroMq(_) => "zeromq",
515 EndpointType::Grpc(_) => "grpc",
516 EndpointType::Sqlx(_) => "sqlx",
517 EndpointType::Fanout(_) => "fanout",
518 EndpointType::Switch(_) => "switch",
519 EndpointType::Response(_) => "response",
520 EndpointType::Reader(_) => "reader",
521 EndpointType::Custom { .. } => "custom",
522 EndpointType::Null => "null",
523 }
524 }
525
526 pub fn is_core(&self) -> bool {
527 matches!(
528 self,
529 EndpointType::File(_)
530 | EndpointType::Static(_)
531 | EndpointType::Ref(_)
532 | EndpointType::Memory(_)
533 | EndpointType::Fanout(_)
534 | EndpointType::Switch(_)
535 | EndpointType::Response(_)
536 | EndpointType::Reader(_)
537 | EndpointType::Custom { .. }
538 | EndpointType::Null
539 )
540 }
541}
542
543#[derive(Debug, Deserialize, Serialize, Clone)]
545#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
546#[serde(rename_all = "snake_case")]
547pub enum Middleware {
548 Deduplication(DeduplicationMiddleware),
549 Metrics(MetricsMiddleware),
550 Dlq(Box<DeadLetterQueueMiddleware>),
551 Retry(RetryMiddleware),
552 RandomPanic(RandomPanicMiddleware),
553 Delay(DelayMiddleware),
554 WeakJoin(WeakJoinMiddleware),
555 Limiter(LimiterMiddleware),
556 Buffer(BufferMiddleware),
557 CookieJar(CookieJarMiddleware),
558 Custom {
559 name: String,
560 config: serde_json::Value,
561 },
562}
563
564#[derive(Debug, Deserialize, Serialize, Clone)]
569#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
570#[serde(deny_unknown_fields)]
571pub struct DeduplicationMiddleware {
572 pub sled_path: String,
574 pub ttl_seconds: u64,
576}
577
578#[derive(Debug, Deserialize, Serialize, Clone)]
586#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
587#[serde(deny_unknown_fields)]
588pub struct MetricsMiddleware {}
589
590#[derive(Debug, Deserialize, Serialize, Clone, Default)]
597#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
598#[serde(deny_unknown_fields)]
599pub struct DeadLetterQueueMiddleware {
600 pub endpoint: Endpoint,
602}
603
604#[derive(Debug, Deserialize, Serialize, Clone, Default)]
609#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
610#[serde(deny_unknown_fields)]
611pub struct RetryMiddleware {
612 #[serde(default = "default_retry_attempts")]
614 pub max_attempts: usize,
615 #[serde(default = "default_initial_interval_ms")]
617 pub initial_interval_ms: u64,
618 #[serde(default = "default_max_interval_ms")]
620 pub max_interval_ms: u64,
621 #[serde(default = "default_multiplier")]
623 pub multiplier: f64,
624}
625
626#[derive(Debug, Deserialize, Serialize, Clone)]
631#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
632#[serde(deny_unknown_fields)]
633pub struct DelayMiddleware {
634 pub delay_ms: u64,
636}
637
638#[derive(Debug, Deserialize, Serialize, Clone)]
644#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
645#[serde(deny_unknown_fields)]
646pub struct LimiterMiddleware {
647 pub messages_per_second: f64,
649}
650
651#[derive(Debug, Deserialize, Serialize, Clone)]
656#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
657#[serde(deny_unknown_fields)]
658pub struct BufferMiddleware {
659 pub max_messages: usize,
661 pub max_delay_ms: u64,
663}
664
665#[derive(Debug, Deserialize, Serialize, Clone)]
673#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
674#[serde(deny_unknown_fields)]
675pub struct CookieJarMiddleware {
676 #[serde(default)]
679 pub shared_scope: Option<String>,
680 #[serde(default = "default_cookie_metadata_key")]
682 pub cookie_metadata_key: String,
683 #[serde(default = "default_set_cookie_metadata_key")]
685 pub set_cookie_metadata_key: String,
686 #[serde(default)]
688 pub capture_metadata_keys: Vec<String>,
689 #[serde(default)]
694 pub export_metadata_prefix: Option<String>,
695 #[serde(default)]
700 pub inject_metadata: HashMap<String, String>,
701}
702
703impl Default for CookieJarMiddleware {
704 fn default() -> Self {
705 Self {
706 shared_scope: None,
707 cookie_metadata_key: default_cookie_metadata_key(),
708 set_cookie_metadata_key: default_set_cookie_metadata_key(),
709 capture_metadata_keys: Vec::new(),
710 export_metadata_prefix: None,
711 inject_metadata: HashMap::new(),
712 }
713 }
714}
715
716#[derive(Debug, Deserialize, Serialize, Clone)]
722#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
723#[serde(deny_unknown_fields)]
724pub struct WeakJoinMiddleware {
725 pub group_by: String,
727 pub expected_count: usize,
729 pub timeout_ms: u64,
731}
732
733#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
735#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
736#[serde(rename_all = "snake_case")]
737pub enum FaultMode {
738 #[default]
740 Panic,
741 Disconnect,
743 Timeout,
745 JsonFormatError,
747 Nack,
749}
750
751impl std::fmt::Display for FaultMode {
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 match self {
754 FaultMode::Panic => write!(f, "panic"),
755 FaultMode::Disconnect => write!(f, "disconnect"),
756 FaultMode::Timeout => write!(f, "timeout"),
757 FaultMode::JsonFormatError => write!(f, "json_format_error"),
758 FaultMode::Nack => write!(f, "nack"),
759 }
760 }
761}
762
763#[derive(Debug, Clone, Serialize, Deserialize, Default)]
776#[serde(deny_unknown_fields)]
777#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
778pub struct RandomPanicMiddleware {
779 #[serde(default)]
781 pub mode: FaultMode,
782 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
784 #[serde(default)]
785 pub trigger_on_message: Option<usize>,
786 #[serde(default = "default_true")]
788 pub enabled: bool,
789 #[serde(skip, default = "default_atomic_usize_arc")]
790 #[cfg_attr(feature = "schema", schemars(skip))]
791 pub message_count: Arc<AtomicUsize>,
792}
793
794fn default_true() -> bool {
795 true
796}
797
798fn default_atomic_usize_arc() -> Arc<AtomicUsize> {
799 Arc::new(AtomicUsize::new(0))
800}
801
802fn deserialize_null_as_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
803where
804 D: Deserializer<'de>,
805{
806 let opt = Option::<bool>::deserialize(deserializer)?;
807 Ok(opt.unwrap_or(false))
808}
809
810#[derive(Debug, Deserialize, Serialize, Clone, Default)]
812#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
813#[serde(deny_unknown_fields)]
814pub struct AwsConfig {
815 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
817 pub queue_url: Option<String>,
818 pub topic_arn: Option<String>,
820 pub region: Option<String>,
822 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
824 pub endpoint_url: Option<String>,
825 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
827 pub access_key: Option<String>,
828 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
830 pub secret_key: Option<String>,
831 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
833 pub session_token: Option<String>,
834 #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
836 pub max_messages: Option<i32>,
837 #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
839 pub wait_time_seconds: Option<i32>,
840 #[serde(default)]
842 pub binary_payload_mode: bool,
843}
844
845impl AwsConfig {
846 pub fn new() -> Self {
848 Self::default()
849 }
850
851 pub fn with_queue_url(mut self, queue_url: impl Into<String>) -> Self {
852 self.queue_url = Some(queue_url.into());
853 self
854 }
855
856 pub fn with_topic_arn(mut self, topic_arn: impl Into<String>) -> Self {
857 self.topic_arn = Some(topic_arn.into());
858 self
859 }
860
861 pub fn with_region(mut self, region: impl Into<String>) -> Self {
862 self.region = Some(region.into());
863 self
864 }
865
866 pub fn with_endpoint_url(mut self, endpoint_url: impl Into<String>) -> Self {
867 self.endpoint_url = Some(endpoint_url.into());
868 self
869 }
870
871 pub fn with_credentials(
872 mut self,
873 access_key: impl Into<String>,
874 secret_key: impl Into<String>,
875 ) -> Self {
876 self.access_key = Some(access_key.into());
877 self.secret_key = Some(secret_key.into());
878 self
879 }
880}
881
882#[derive(Debug, Deserialize, Serialize, Clone, Default)]
886#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
887#[serde(deny_unknown_fields)]
888pub struct KafkaConfig {
889 #[serde(alias = "brokers")]
891 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
892 pub url: String,
893 pub topic: Option<String>,
895 pub username: Option<String>,
897 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
899 pub password: Option<String>,
900 #[serde(default)]
902 pub tls: TlsConfig,
903 pub group_id: Option<String>,
906 #[serde(default)]
908 pub delayed_ack: bool,
909 #[serde(default)]
911 pub producer_options: Option<Vec<(String, String)>>,
912 #[serde(default)]
914 pub consumer_options: Option<Vec<(String, String)>>,
915}
916
917impl KafkaConfig {
918 pub fn new(url: impl Into<String>) -> Self {
920 Self {
921 url: url.into(),
922 ..Default::default()
923 }
924 }
925
926 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
927 self.topic = Some(topic.into());
928 self
929 }
930
931 pub fn with_group_id(mut self, group_id: impl Into<String>) -> Self {
932 self.group_id = Some(group_id.into());
933 self
934 }
935
936 pub fn with_credentials(
937 mut self,
938 username: impl Into<String>,
939 password: impl Into<String>,
940 ) -> Self {
941 self.username = Some(username.into());
942 self.password = Some(password.into());
943 self
944 }
945
946 pub fn with_producer_option(
947 mut self,
948 key: impl Into<String>,
949 value: impl Into<String>,
950 ) -> Self {
951 let options = self.producer_options.get_or_insert_with(Vec::new);
952 options.push((key.into(), value.into()));
953 self
954 }
955
956 pub fn with_consumer_option(
957 mut self,
958 key: impl Into<String>,
959 value: impl Into<String>,
960 ) -> Self {
961 let options = self.consumer_options.get_or_insert_with(Vec::new);
962 options.push((key.into(), value.into()));
963 self
964 }
965}
966
967#[derive(Debug, Deserialize, Serialize, Clone, Default)]
971#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
972#[serde(deny_unknown_fields)]
973pub struct SledConfig {
974 pub path: String,
976 pub tree: Option<String>,
978 #[serde(default)]
980 pub read_from_start: bool,
981 #[serde(default)]
983 pub delete_after_read: bool,
984}
985
986impl SledConfig {
987 pub fn new(path: impl Into<String>) -> Self {
989 Self {
990 path: path.into(),
991 ..Default::default()
992 }
993 }
994
995 pub fn with_tree(mut self, tree: impl Into<String>) -> Self {
996 self.tree = Some(tree.into());
997 self
998 }
999
1000 pub fn with_read_from_start(mut self, read_from_start: bool) -> Self {
1001 self.read_from_start = read_from_start;
1002 self
1003 }
1004}
1005
1006#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1008#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1009#[serde(rename_all = "snake_case")]
1010pub enum FileFormat {
1011 #[default]
1013 Normal,
1014 Json,
1016 Text,
1018 Raw,
1020}
1021
1022#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1025#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1026pub struct FileConfig {
1027 pub path: String,
1029 pub delimiter: Option<String>,
1033 #[serde(flatten, default)]
1036 pub mode: Option<FileConsumerMode>,
1037 #[serde(default)]
1039 pub format: FileFormat,
1040}
1041
1042#[derive(Debug, Clone, Deserialize, Serialize)]
1043#[serde(tag = "mode", rename_all = "snake_case")]
1044#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1045pub enum FileConsumerMode {
1046 Consume {
1050 #[serde(default)]
1051 delete: bool,
1052 },
1053 Subscribe {
1057 #[serde(default)]
1058 delete: bool,
1059 },
1060 GroupSubscribe {
1065 group_id: String,
1067 #[serde(default)]
1070 read_from_tail: bool,
1071 },
1072}
1073
1074impl Default for FileConsumerMode {
1075 fn default() -> Self {
1076 Self::Consume { delete: false }
1077 }
1078}
1079
1080impl FileConfig {
1081 pub fn new(path: impl Into<String>) -> Self {
1083 Self {
1084 path: path.into(),
1085 mode: Some(FileConsumerMode::default()),
1086 delimiter: None,
1087 format: FileFormat::default(),
1088 }
1089 }
1090
1091 pub fn with_mode(mut self, mode: FileConsumerMode) -> Self {
1092 self.mode = Some(mode);
1093 self
1094 }
1095
1096 pub fn effective_mode(&self) -> FileConsumerMode {
1098 self.mode.clone().unwrap_or_default()
1099 }
1100}
1101
1102#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1106#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1107#[serde(deny_unknown_fields)]
1108pub struct NatsConfig {
1109 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1111 pub url: String,
1112 pub subject: Option<String>,
1114 pub stream: Option<String>,
1116 pub username: Option<String>,
1118 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1120 pub password: Option<String>,
1121 #[serde(default)]
1123 pub tls: TlsConfig,
1124 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1126 pub token: Option<String>,
1127 #[serde(default)]
1131 pub request_reply: bool,
1132 pub request_timeout_ms: Option<u64>,
1134 #[serde(default)]
1136 pub delayed_ack: bool,
1137 #[serde(default)]
1139 pub no_jetstream: bool,
1140 #[serde(default)]
1142 pub subscriber_mode: bool,
1143 pub stream_max_messages: Option<i64>,
1145 pub deliver_policy: Option<NatsDeliverPolicy>,
1147 pub stream_max_bytes: Option<i64>,
1149 pub prefetch_count: Option<usize>,
1151}
1152
1153#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1154#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1155#[serde(rename_all = "snake_case")]
1156pub enum NatsDeliverPolicy {
1157 #[default]
1158 All,
1159 Last,
1160 New,
1161 LastPerSubject,
1162}
1163
1164impl NatsConfig {
1165 pub fn new(url: impl Into<String>) -> Self {
1167 Self {
1168 url: url.into(),
1169 ..Default::default()
1170 }
1171 }
1172
1173 pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
1174 self.subject = Some(subject.into());
1175 self
1176 }
1177
1178 pub fn with_stream(mut self, stream: impl Into<String>) -> Self {
1179 self.stream = Some(stream.into());
1180 self
1181 }
1182
1183 pub fn with_deliver_policy(mut self, policy: NatsDeliverPolicy) -> Self {
1184 self.deliver_policy = Some(policy);
1185 self
1186 }
1187
1188 pub fn with_credentials(
1189 mut self,
1190 username: impl Into<String>,
1191 password: impl Into<String>,
1192 ) -> Self {
1193 self.username = Some(username.into());
1194 self.password = Some(password.into());
1195 self
1196 }
1197}
1198
1199#[derive(Debug, Serialize, Deserialize, Clone, Default)]
1200#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1201#[serde(deny_unknown_fields)]
1202pub struct MemoryConfig {
1203 pub topic: String,
1205 pub capacity: Option<usize>,
1207 #[serde(default)]
1209 pub request_reply: bool,
1210 pub request_timeout_ms: Option<u64>,
1212 #[serde(default)]
1214 pub subscribe_mode: bool,
1215 #[serde(default)]
1217 pub enable_nack: bool,
1218}
1219
1220impl MemoryConfig {
1221 pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
1222 Self {
1223 topic: topic.into(),
1224 capacity,
1225 ..Default::default()
1226 }
1227 }
1228 pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
1229 Self {
1230 subscribe_mode,
1231 ..self
1232 }
1233 }
1234
1235 pub fn with_request_reply(mut self, request_reply: bool) -> Self {
1236 self.request_reply = request_reply;
1237 self
1238 }
1239}
1240
1241#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1245#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1246#[serde(deny_unknown_fields)]
1247pub struct AmqpConfig {
1248 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1252 pub url: String,
1253 pub queue: Option<String>,
1255 #[serde(default)]
1257 pub subscribe_mode: bool,
1258 pub username: Option<String>,
1260 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1262 pub password: Option<String>,
1263 #[serde(default)]
1265 pub tls: TlsConfig,
1266 pub exchange: Option<String>,
1268 pub prefetch_count: Option<u16>,
1270 #[serde(default)]
1272 pub no_persistence: bool,
1273 #[serde(default)]
1275 pub no_declare_queue: bool,
1276 #[serde(default)]
1278 pub delayed_ack: bool,
1279}
1280
1281impl AmqpConfig {
1282 pub fn new(url: impl Into<String>) -> Self {
1284 Self {
1285 url: url.into(),
1286 ..Default::default()
1287 }
1288 }
1289
1290 pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1291 self.queue = Some(queue.into());
1292 self
1293 }
1294
1295 pub fn with_exchange(mut self, exchange: impl Into<String>) -> Self {
1296 self.exchange = Some(exchange.into());
1297 self
1298 }
1299
1300 pub fn with_credentials(
1301 mut self,
1302 username: impl Into<String>,
1303 password: impl Into<String>,
1304 ) -> Self {
1305 self.username = Some(username.into());
1306 self.password = Some(password.into());
1307 self
1308 }
1309}
1310
1311#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
1315#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1316#[serde(rename_all = "lowercase")]
1317pub enum MongoDbFormat {
1318 #[default]
1319 Normal,
1320 Json,
1321 Text,
1322 Raw,
1323}
1324
1325#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1329#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1330#[serde(deny_unknown_fields)]
1331pub struct MongoDbConfig {
1332 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1335 pub url: String,
1336 pub collection: Option<String>,
1338 pub username: Option<String>,
1341 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1343 pub password: Option<String>,
1345 #[serde(default)]
1347 pub tls: TlsConfig,
1348 pub database: String,
1350 pub polling_interval_ms: Option<u64>,
1352 pub reply_polling_ms: Option<u64>,
1354 #[serde(default)]
1356 pub request_reply: bool,
1357 #[serde(default)]
1359 pub change_stream: bool,
1360 pub request_timeout_ms: Option<u64>,
1362 pub ttl_seconds: Option<u64>,
1364 pub capped_size_bytes: Option<i64>,
1366 #[serde(default)]
1368 pub format: MongoDbFormat,
1369 pub cursor_id: Option<String>,
1371 pub receive_query: Option<String>,
1373 pub meta_collection: Option<String>,
1375}
1376
1377impl MongoDbConfig {
1378 pub fn new(url: impl Into<String>, database: impl Into<String>) -> Self {
1380 Self {
1381 url: url.into(),
1382 database: database.into(),
1383 ..Default::default()
1384 }
1385 }
1386
1387 pub fn with_collection(mut self, collection: impl Into<String>) -> Self {
1388 self.collection = Some(collection.into());
1389 self
1390 }
1391
1392 pub fn with_credentials(
1393 mut self,
1394 username: impl Into<String>,
1395 password: impl Into<String>,
1396 ) -> Self {
1397 self.username = Some(username.into());
1398 self.password = Some(password.into());
1399 self
1400 }
1401
1402 pub fn with_change_stream(mut self, change_stream: bool) -> Self {
1403 self.change_stream = change_stream;
1404 self
1405 }
1406}
1407
1408#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1412#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1413#[serde(deny_unknown_fields)]
1414pub struct MqttConfig {
1415 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1417 pub url: String,
1418 pub topic: Option<String>,
1420 pub username: Option<String>,
1422 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1424 pub password: Option<String>,
1425 #[serde(default)]
1427 pub tls: TlsConfig,
1428 pub client_id: Option<String>,
1430 pub queue_capacity: Option<usize>,
1432 pub max_inflight: Option<u16>,
1434 pub qos: Option<u8>,
1436 #[serde(default = "default_clean_session")]
1438 pub clean_session: bool,
1439 pub keep_alive_seconds: Option<u64>,
1441 #[serde(default)]
1443 pub protocol: MqttProtocol,
1444 pub session_expiry_interval: Option<u32>,
1446 #[serde(default)]
1450 pub delayed_ack: bool,
1451}
1452
1453impl MqttConfig {
1454 pub fn new(url: impl Into<String>) -> Self {
1456 Self {
1457 url: url.into(),
1458 ..Default::default()
1459 }
1460 }
1461
1462 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1463 self.topic = Some(topic.into());
1464 self
1465 }
1466
1467 pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
1468 self.client_id = Some(client_id.into());
1469 self
1470 }
1471
1472 pub fn with_credentials(
1473 mut self,
1474 username: impl Into<String>,
1475 password: impl Into<String>,
1476 ) -> Self {
1477 self.username = Some(username.into());
1478 self.password = Some(password.into());
1479 self
1480 }
1481}
1482
1483#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
1487#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1488#[serde(rename_all = "lowercase")]
1489pub enum MqttProtocol {
1490 #[default]
1491 V5,
1492 V3,
1493}
1494
1495#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1498#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1499#[serde(deny_unknown_fields)]
1500pub struct ZeroMqConfig {
1501 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1502 pub url: String,
1504 #[serde(default)]
1506 pub socket_type: Option<ZeroMqSocketType>,
1507 pub topic: Option<String>,
1509 #[serde(default)]
1511 pub bind: bool,
1512 #[serde(default)]
1514 pub internal_buffer_size: Option<usize>,
1515}
1516
1517impl ZeroMqConfig {
1518 pub fn new(url: impl Into<String>) -> Self {
1520 Self {
1521 url: url.into(),
1522 ..Default::default()
1523 }
1524 }
1525
1526 pub fn with_socket_type(mut self, socket_type: ZeroMqSocketType) -> Self {
1527 self.socket_type = Some(socket_type);
1528 self
1529 }
1530
1531 pub fn with_bind(mut self, bind: bool) -> Self {
1532 self.bind = bind;
1533 self
1534 }
1535}
1536
1537#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
1542#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1543#[serde(rename_all = "lowercase")]
1544pub enum ZeroMqSocketType {
1545 Push,
1546 Pull,
1547 Pub,
1548 Sub,
1549 Req,
1550 Rep,
1551}
1552
1553#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1556#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1557#[serde(deny_unknown_fields)]
1558pub struct GrpcConfig {
1559 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1560 pub url: String,
1562 pub topic: Option<String>,
1564 pub timeout_ms: Option<u64>,
1568 #[serde(default)]
1570 pub tls: TlsConfig,
1571 #[serde(default)]
1574 pub server_mode: bool,
1575 #[serde(default)]
1577 pub initial_stream_window_size: Option<u32>,
1578 #[serde(default)]
1580 pub initial_connection_window_size: Option<u32>,
1581 #[serde(default)]
1583 pub concurrency_limit_per_connection: Option<usize>,
1584 #[serde(default)]
1586 pub http2_keepalive_interval_ms: Option<u64>,
1587 #[serde(default)]
1589 pub http2_keepalive_timeout_ms: Option<u64>,
1590 #[serde(default)]
1592 pub max_decoding_message_size: Option<usize>,
1593}
1594
1595impl GrpcConfig {
1596 pub fn new(url: impl Into<String>) -> Self {
1598 Self {
1599 url: url.into(),
1600 ..Default::default()
1601 }
1602 }
1603
1604 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1605 self.topic = Some(topic.into());
1606 self
1607 }
1608
1609 pub fn with_server_mode(mut self, server_mode: bool) -> Self {
1611 self.server_mode = server_mode;
1612 self
1613 }
1614}
1615
1616#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1620#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1621#[serde(deny_unknown_fields)]
1622pub struct HttpConfig {
1623 pub url: String,
1625 pub path: Option<String>,
1627 pub method: Option<String>,
1629 #[serde(default)]
1631 pub tls: TlsConfig,
1632 pub workers: Option<usize>,
1634 pub message_id_header: Option<String>,
1636 pub request_timeout_ms: Option<u64>,
1638 pub internal_buffer_size: Option<usize>,
1640 #[serde(default)]
1642 pub fire_and_forget: bool,
1643 #[serde(default, skip_serializing_if = "Option::is_none")]
1645 pub batch_concurrency: Option<usize>,
1646 #[serde(default, skip_serializing_if = "Option::is_none")]
1648 pub tcp_keepalive_ms: Option<u64>,
1649 #[serde(default, skip_serializing_if = "Option::is_none")]
1651 pub pool_idle_timeout_ms: Option<u64>,
1652 #[serde(default)]
1654 pub compression_enabled: bool,
1655 #[serde(default)]
1657 pub compression_threshold_bytes: Option<usize>,
1658 pub concurrency_limit: Option<usize>,
1661 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1662 #[serde(
1663 default,
1664 skip_serializing_if = "Option::is_none",
1665 deserialize_with = "deserialize_basic_auth"
1666 )]
1667 pub basic_auth: Option<(String, String)>,
1668 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1670 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
1671 pub custom_headers: HashMap<String, String>,
1672}
1673
1674#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1676#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1677#[serde(deny_unknown_fields)]
1678pub struct WebSocketConfig {
1679 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1681 pub url: String,
1682 pub path: Option<String>,
1684 pub message_id_header: Option<String>,
1686 pub internal_buffer_size: Option<usize>,
1688}
1689
1690fn deserialize_basic_auth<'de, D>(deserializer: D) -> Result<Option<(String, String)>, D::Error>
1691where
1692 D: Deserializer<'de>,
1693{
1694 let val = serde_json::Value::deserialize(deserializer)?;
1695 match val {
1696 serde_json::Value::Null => Ok(None),
1697 serde_json::Value::Array(arr) => {
1698 if arr.len() != 2 {
1699 return Err(serde::de::Error::custom("basic_auth must have 2 elements"));
1700 }
1701 let u = arr[0]
1702 .as_str()
1703 .ok_or_else(|| serde::de::Error::custom("basic_auth[0] must be string"))?
1704 .to_string();
1705 let p = arr[1]
1706 .as_str()
1707 .ok_or_else(|| serde::de::Error::custom("basic_auth[1] must be string"))?
1708 .to_string();
1709 Ok(Some((u, p)))
1710 }
1711 serde_json::Value::Object(map) => {
1712 let u = map
1713 .get("0")
1714 .and_then(|v| v.as_str())
1715 .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '0'"))?
1716 .to_string();
1717 let p = map
1718 .get("1")
1719 .and_then(|v| v.as_str())
1720 .ok_or_else(|| serde::de::Error::custom("basic_auth map missing '1'"))?
1721 .to_string();
1722 Ok(Some((u, p)))
1723 }
1724 _ => Err(serde::de::Error::custom("invalid type for basic_auth")),
1725 }
1726}
1727
1728impl HttpConfig {
1729 pub fn new(url: impl Into<String>) -> Self {
1731 Self {
1732 url: url.into(),
1733 ..Default::default()
1734 }
1735 }
1736
1737 pub fn with_workers(mut self, workers: usize) -> Self {
1738 self.workers = Some(workers);
1739 self
1740 }
1741
1742 pub fn with_method(mut self, method: impl Into<String>) -> Self {
1743 self.method = Some(method.into());
1744 self
1745 }
1746
1747 pub fn with_path(mut self, path: impl Into<String>) -> Self {
1748 self.path = Some(path.into());
1749 self
1750 }
1751}
1752
1753impl WebSocketConfig {
1754 pub fn new(url: impl Into<String>) -> Self {
1756 Self {
1757 url: url.into(),
1758 ..Default::default()
1759 }
1760 }
1761
1762 pub fn with_path(mut self, path: impl Into<String>) -> Self {
1763 self.path = Some(path.into());
1764 self
1765 }
1766}
1767
1768#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1772#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1773#[serde(deny_unknown_fields)]
1774pub struct IbmMqConfig {
1775 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1777 pub url: String,
1778 pub queue: Option<String>,
1780 pub topic: Option<String>,
1782 pub queue_manager: String,
1784 pub channel: String,
1786 pub username: Option<String>,
1788 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1790 pub password: Option<String>,
1791 pub cipher_spec: Option<String>,
1793 #[serde(default)]
1795 pub tls: TlsConfig,
1796 #[serde(default = "default_max_message_size")]
1798 pub max_message_size: usize,
1799 #[serde(default = "default_wait_timeout_ms")]
1801 pub wait_timeout_ms: i32,
1802 #[serde(default)]
1804 pub internal_buffer_size: Option<usize>,
1805 #[serde(default)]
1807 pub disable_status_inq: bool,
1808}
1809
1810impl IbmMqConfig {
1811 pub fn new(
1813 url: impl Into<String>,
1814 queue_manager: impl Into<String>,
1815 channel: impl Into<String>,
1816 ) -> Self {
1817 Self {
1818 url: url.into(),
1819 queue_manager: queue_manager.into(),
1820 channel: channel.into(),
1821 disable_status_inq: false,
1822 ..Default::default()
1823 }
1824 }
1825
1826 pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
1827 self.queue = Some(queue.into());
1828 self
1829 }
1830
1831 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
1832 self.topic = Some(topic.into());
1833 self
1834 }
1835
1836 pub fn with_credentials(
1837 mut self,
1838 username: impl Into<String>,
1839 password: impl Into<String>,
1840 ) -> Self {
1841 self.username = Some(username.into());
1842 self.password = Some(password.into());
1843 self
1844 }
1845}
1846
1847fn default_max_message_size() -> usize {
1848 4 * 1024 * 1024 }
1850
1851fn default_wait_timeout_ms() -> i32 {
1852 1000 }
1854
1855#[derive(Debug, Deserialize, Serialize, Clone)]
1858#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1859#[serde(deny_unknown_fields)]
1860pub struct SwitchConfig {
1861 pub metadata_key: String,
1863 pub cases: HashMap<String, Endpoint>,
1865 pub default: Option<Box<Endpoint>>,
1867}
1868
1869#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1871#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1872#[serde(deny_unknown_fields)]
1873pub struct ResponseConfig {
1874 }
1876
1877#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1881#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1882#[serde(deny_unknown_fields)]
1883pub struct SqlxConfig {
1884 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1886 pub url: String,
1887 #[serde(default)]
1889 pub username: Option<String>,
1890 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1892 #[serde(default)]
1893 pub password: Option<String>,
1894 pub table: String,
1896 pub insert_query: Option<String>,
1899 pub select_query: Option<String>,
1903 #[serde(default)]
1905 pub delete_after_read: bool,
1906 #[serde(default)]
1908 pub auto_create_table: bool,
1909 pub polling_interval_ms: Option<u64>,
1911 #[serde(default)]
1913 pub tls: TlsConfig,
1914 pub max_connections: Option<u32>,
1916 pub min_connections: Option<u32>,
1918 pub acquire_timeout_ms: Option<u64>,
1920 pub idle_timeout_ms: Option<u64>,
1922 pub max_lifetime_ms: Option<u64>,
1924}
1925
1926#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq, Hash)]
1947#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1948#[serde(deny_unknown_fields)]
1949pub struct TlsConfig {
1950 #[serde(default, deserialize_with = "deserialize_null_as_false")]
1952 pub required: bool,
1953 pub ca_file: Option<String>,
1955 pub cert_file: Option<String>,
1957 pub key_file: Option<String>,
1959 #[cfg_attr(feature = "schema", schemars(extend("format"="password")))]
1961 pub cert_password: Option<String>,
1962 #[serde(default)]
1964 pub accept_invalid_certs: bool,
1965}
1966
1967impl TlsConfig {
1968 pub fn new() -> Self {
1970 Self::default()
1971 }
1972
1973 pub fn with_ca_file(mut self, ca_file: impl Into<String>) -> Self {
1974 self.ca_file = Some(ca_file.into());
1975 self.required = true;
1976 self
1977 }
1978
1979 pub fn with_client_cert(
1980 mut self,
1981 cert_file: impl Into<String>,
1982 key_file: impl Into<String>,
1983 ) -> Self {
1984 self.cert_file = Some(cert_file.into());
1985 self.key_file = Some(key_file.into());
1986 self.required = true;
1987 self
1988 }
1989
1990 pub fn with_insecure(mut self, accept_invalid_certs: bool) -> Self {
1991 self.accept_invalid_certs = accept_invalid_certs;
1992 self
1993 }
1994
1995 pub fn is_mtls_client_configured(&self) -> bool {
1997 self.required && self.cert_file.is_some() && self.key_file.is_some()
1998 }
1999
2000 pub fn is_tls_server_configured(&self) -> bool {
2002 self.required && self.cert_file.is_some() && self.key_file.is_some()
2003 }
2004
2005 pub fn is_tls_client_configured(&self) -> bool {
2007 self.required
2008 || self.ca_file.is_some()
2009 || (self.cert_file.is_some() && self.key_file.is_some())
2010 }
2011
2012 pub fn normalize_url(&self, url: &str) -> String {
2014 if url
2015 .get(..7)
2016 .is_some_and(|prefix| prefix.eq_ignore_ascii_case("http://"))
2017 || url
2018 .get(..8)
2019 .is_some_and(|prefix| prefix.eq_ignore_ascii_case("https://"))
2020 {
2021 url.to_string()
2022 } else {
2023 let is_tls = self.required;
2024 let scheme = if is_tls { "https" } else { "http" };
2025 format!("{}://{}", scheme, url)
2026 }
2027 }
2028}
2029
2030pub trait SecretExtractor {
2032 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>);
2034}
2035
2036fn extract_sensitive_string_map_entries(
2037 values: &mut HashMap<String, String>,
2038 prefix: &str,
2039 field_name: &str,
2040 secrets: &mut HashMap<String, String>,
2041) {
2042 let secret_keys = values
2043 .keys()
2044 .filter(|key| {
2045 let key = key.to_ascii_lowercase();
2046 key.contains("key") || key.contains("token") || key.contains("auth")
2047 })
2048 .cloned()
2049 .collect::<Vec<_>>();
2050
2051 for key in secret_keys {
2052 if let Some(value) = values.remove(&key) {
2053 secrets.insert(
2054 sanitize_secret_key(&format!("{}__{}__{}", prefix, field_name, key)),
2055 value,
2056 );
2057 }
2058 }
2059}
2060
2061fn url_has_userinfo(url: &str) -> bool {
2062 let Some(authority_start) = url.find("://").map(|idx| idx + 3) else {
2063 return false;
2064 };
2065 let authority_end = url[authority_start..]
2066 .find(['/', '?', '#'])
2067 .map(|idx| authority_start + idx)
2068 .unwrap_or(url.len());
2069 url[authority_start..authority_end].contains('@')
2070}
2071
2072fn sanitize_secret_key(key: &str) -> String {
2073 key.chars()
2074 .map(|ch| {
2075 let ch = ch.to_ascii_uppercase();
2076 if ch.is_ascii_alphanumeric() || ch == '_' {
2077 ch
2078 } else {
2079 '_'
2080 }
2081 })
2082 .collect()
2083}
2084
2085fn extract_sensitive_url(
2086 url: &mut String,
2087 prefix: &str,
2088 field_name: &str,
2089 secrets: &mut HashMap<String, String>,
2090) {
2091 if !url.is_empty() && url_has_userinfo(url) {
2092 secrets.insert(
2093 sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
2094 std::mem::take(url),
2095 );
2096 }
2097}
2098
2099fn extract_sensitive_optional_url(
2100 url: &mut Option<String>,
2101 prefix: &str,
2102 field_name: &str,
2103 secrets: &mut HashMap<String, String>,
2104) {
2105 if url.as_ref().is_some_and(|url| url_has_userinfo(url)) {
2106 if let Some(url) = url.take() {
2107 secrets.insert(
2108 sanitize_secret_key(&format!("{}__{}", prefix, field_name)),
2109 url,
2110 );
2111 }
2112 }
2113}
2114
2115impl SecretExtractor for Route {
2116 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2117 self.input
2118 .extract_secrets(&format!("{}__{}", prefix, "INPUT"), secrets);
2119 self.output
2120 .extract_secrets(&format!("{}__{}", prefix, "OUTPUT"), secrets);
2121 }
2122}
2123
2124impl SecretExtractor for Endpoint {
2125 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2126 for (i, middleware) in self.middlewares.iter_mut().enumerate() {
2127 middleware.extract_secrets(&format!("{}__{}__{}", prefix, "MIDDLEWARES", i), secrets);
2128 }
2129 self.endpoint_type.extract_secrets(prefix, secrets);
2130 }
2131}
2132
2133impl SecretExtractor for EndpointType {
2134 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2135 match self {
2136 EndpointType::Aws(cfg) => {
2137 cfg.extract_secrets(&format!("{}__{}", prefix, "AWS"), secrets)
2138 }
2139 EndpointType::Kafka(cfg) => {
2140 cfg.extract_secrets(&format!("{}__{}", prefix, "KAFKA"), secrets)
2141 }
2142 EndpointType::Nats(cfg) => {
2143 cfg.extract_secrets(&format!("{}__{}", prefix, "NATS"), secrets)
2144 }
2145 EndpointType::Amqp(cfg) => {
2146 cfg.extract_secrets(&format!("{}__{}", prefix, "AMQP"), secrets)
2147 }
2148 EndpointType::MongoDb(cfg) => {
2149 cfg.extract_secrets(&format!("{}__{}", prefix, "MONGODB"), secrets)
2150 }
2151 EndpointType::Mqtt(cfg) => {
2152 cfg.extract_secrets(&format!("{}__{}", prefix, "MQTT"), secrets)
2153 }
2154 EndpointType::Http(cfg) => {
2155 cfg.extract_secrets(&format!("{}__{}", prefix, "HTTP"), secrets)
2156 }
2157 EndpointType::WebSocket(cfg) => {
2158 cfg.extract_secrets(&format!("{}__{}", prefix, "WEBSOCKET"), secrets)
2159 }
2160 EndpointType::IbmMq(cfg) => {
2161 cfg.extract_secrets(&format!("{}__{}", prefix, "IBMMQ"), secrets)
2162 }
2163 EndpointType::ZeroMq(cfg) => {
2164 cfg.extract_secrets(&format!("{}__{}", prefix, "ZEROMQ"), secrets)
2165 }
2166 EndpointType::Sqlx(cfg) => {
2167 cfg.extract_secrets(&format!("{}__{}", prefix, "SQLX"), secrets)
2168 }
2169 EndpointType::Grpc(cfg) => {
2170 cfg.extract_secrets(&format!("{}__{}", prefix, "GRPC"), secrets)
2171 }
2172 EndpointType::Fanout(endpoints) => {
2173 for (i, ep) in endpoints.iter_mut().enumerate() {
2174 ep.extract_secrets(&format!("{}__{}__{}", prefix, "FANOUT", i), secrets);
2175 }
2176 }
2177 EndpointType::Switch(cfg) => {
2178 for (key, ep) in cfg.cases.iter_mut() {
2179 ep.extract_secrets(
2180 &format!(
2181 "{}__{}__{}",
2182 prefix,
2183 "SWITCH__CASES",
2184 sanitize_secret_key(key)
2185 ),
2186 secrets,
2187 );
2188 }
2189 if let Some(default) = &mut cfg.default {
2190 default.extract_secrets(&format!("{}__{}", prefix, "SWITCH__DEFAULT"), secrets);
2191 }
2192 }
2193 EndpointType::Reader(ep) => {
2194 ep.extract_secrets(&format!("{}__{}", prefix, "READER"), secrets)
2195 }
2196 _ => {}
2197 }
2198 }
2199}
2200
2201impl SecretExtractor for Middleware {
2202 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2203 if let Middleware::Dlq(cfg) = self {
2204 cfg.endpoint
2205 .extract_secrets(&format!("{}__{}__{}", prefix, "DLQ", "ENDPOINT"), secrets);
2206 }
2207 }
2208}
2209
2210impl SecretExtractor for AwsConfig {
2211 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2212 if let Some(val) = self.access_key.take() {
2213 secrets.insert(format!("{}__{}", prefix, "ACCESS_KEY"), val);
2214 }
2215 if let Some(val) = self.secret_key.take() {
2216 secrets.insert(format!("{}__{}", prefix, "SECRET_KEY"), val);
2217 }
2218 if let Some(val) = self.session_token.take() {
2219 secrets.insert(format!("{}__{}", prefix, "SESSION_TOKEN"), val);
2220 }
2221 extract_sensitive_optional_url(&mut self.queue_url, prefix, "QUEUE_URL", secrets);
2222 extract_sensitive_optional_url(&mut self.endpoint_url, prefix, "ENDPOINT_URL", secrets);
2223 }
2224}
2225
2226impl SecretExtractor for KafkaConfig {
2227 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2228 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2229 if let Some(val) = self.username.take() {
2230 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2231 }
2232 if let Some(val) = self.password.take() {
2233 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2234 }
2235 self.tls
2236 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2237 }
2238}
2239
2240impl SecretExtractor for NatsConfig {
2241 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2242 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2243 if let Some(val) = self.username.take() {
2244 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2245 }
2246 if let Some(val) = self.password.take() {
2247 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2248 }
2249 if let Some(val) = self.token.take() {
2250 secrets.insert(format!("{}__{}", prefix, "TOKEN"), val);
2251 }
2252 self.tls
2253 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2254 }
2255}
2256
2257impl SecretExtractor for AmqpConfig {
2258 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2259 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2260 if let Some(val) = self.username.take() {
2261 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2262 }
2263 if let Some(val) = self.password.take() {
2264 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2265 }
2266 self.tls
2267 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2268 }
2269}
2270
2271impl SecretExtractor for MongoDbConfig {
2272 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2273 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2274 if let Some(val) = self.username.take() {
2275 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2276 }
2277 if let Some(val) = self.password.take() {
2278 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2279 }
2280 self.tls
2281 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2282 }
2283}
2284
2285impl SecretExtractor for MqttConfig {
2286 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2287 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2288 if let Some(val) = self.username.take() {
2289 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2290 }
2291 if let Some(val) = self.password.take() {
2292 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2293 }
2294 self.tls
2295 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2296 }
2297}
2298
2299impl SecretExtractor for HttpConfig {
2300 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2301 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2302 if let Some((u, p)) = self.basic_auth.take() {
2303 secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 0), u);
2304 secrets.insert(format!("{}__{}__{}", prefix, "BASIC_AUTH", 1), p);
2305 }
2306 extract_sensitive_string_map_entries(
2307 &mut self.custom_headers,
2308 prefix,
2309 "CUSTOM_HEADERS",
2310 secrets,
2311 );
2312 self.tls
2313 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2314 }
2315}
2316
2317impl SecretExtractor for WebSocketConfig {
2318 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2319 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2320 }
2321}
2322
2323impl SecretExtractor for IbmMqConfig {
2324 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2325 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2326 if let Some(val) = self.username.take() {
2327 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2328 }
2329 if let Some(val) = self.password.take() {
2330 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2331 }
2332 self.tls
2333 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2334 }
2335}
2336
2337impl SecretExtractor for ZeroMqConfig {
2338 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2339 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2340 }
2341}
2342
2343impl SecretExtractor for SqlxConfig {
2344 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2345 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2346 if let Some(val) = self.username.take() {
2347 secrets.insert(format!("{}__{}", prefix, "USERNAME"), val);
2348 }
2349 if let Some(val) = self.password.take() {
2350 secrets.insert(format!("{}__{}", prefix, "PASSWORD"), val);
2351 }
2352 self.tls
2353 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2354 }
2355}
2356
2357impl SecretExtractor for GrpcConfig {
2358 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2359 extract_sensitive_url(&mut self.url, prefix, "URL", secrets);
2360 self.tls
2361 .extract_secrets(&format!("{}__{}", prefix, "TLS"), secrets);
2362 }
2363}
2364
2365impl SecretExtractor for TlsConfig {
2366 fn extract_secrets(&mut self, prefix: &str, secrets: &mut HashMap<String, String>) {
2367 if let Some(val) = self.cert_password.take() {
2368 secrets.insert(format!("{}__{}", prefix, "CERT_PASSWORD"), val);
2369 }
2370 }
2371}
2372
2373pub fn extract_config_secrets(config: &mut Config) -> HashMap<String, String> {
2380 let mut secrets = HashMap::new();
2381 for (route_name, route) in config.iter_mut() {
2382 let prefix = sanitize_secret_key(&format!("MQB__{}", route_name));
2383 route.extract_secrets(&prefix, &mut secrets);
2384 }
2385 secrets
2386}
2387
2388#[cfg(test)]
2389mod tests {
2390 use super::*;
2391 use config::{Config as ConfigBuilder, Environment};
2392
2393 const TEST_YAML: &str = r#"
2394kafka_to_nats:
2395 concurrency: 10
2396 input:
2397 middlewares:
2398 - deduplication:
2399 sled_path: "/tmp/mq-bridge/dedup_db"
2400 ttl_seconds: 3600
2401 - metrics: {}
2402 - retry:
2403 max_attempts: 5
2404 initial_interval_ms: 200
2405 - random_panic:
2406 mode: nack
2407 - dlq:
2408 endpoint:
2409 nats:
2410 subject: "dlq-subject"
2411 url: "nats://localhost:4222"
2412 kafka:
2413 topic: "input-topic"
2414 url: "localhost:9092"
2415 group_id: "my-consumer-group"
2416 tls:
2417 required: true
2418 ca_file: "/path_to_ca"
2419 cert_file: "/path_to_cert"
2420 key_file: "/path_to_key"
2421 cert_password: "password"
2422 accept_invalid_certs: true
2423 output:
2424 middlewares:
2425 - metrics: {}
2426 - dlq:
2427 endpoint:
2428 file:
2429 path: "error.out"
2430 nats:
2431 subject: "output-subject"
2432 url: "nats://localhost:4222"
2433"#;
2434
2435 fn assert_config_values(config: &Config) {
2436 assert_eq!(config.len(), 1);
2437 let route = config.get("kafka_to_nats").expect("Route should exist");
2438
2439 assert_eq!(route.options.concurrency, 10);
2440
2441 let input = &route.input;
2443 assert_eq!(input.middlewares.len(), 5);
2444
2445 let mut has_dedup = false;
2446 let mut has_metrics = false;
2447 let mut has_dlq = false;
2448 let mut has_retry = false;
2449 let mut has_random_panic = false;
2450 for middleware in &input.middlewares {
2451 match middleware {
2452 Middleware::Deduplication(dedup) => {
2453 assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
2454 assert_eq!(dedup.ttl_seconds, 3600);
2455 has_dedup = true;
2456 }
2457 Middleware::Metrics(_) => {
2458 has_metrics = true;
2459 }
2460 Middleware::Custom { .. } => {}
2461 Middleware::Dlq(dlq) => {
2462 assert!(dlq.endpoint.middlewares.is_empty());
2463 if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
2464 assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
2465 assert_eq!(nats_cfg.url, "nats://localhost:4222");
2466 }
2467 has_dlq = true;
2468 }
2469 Middleware::Retry(retry) => {
2470 assert_eq!(retry.max_attempts, 5);
2471 assert_eq!(retry.initial_interval_ms, 200);
2472 has_retry = true;
2473 }
2474 Middleware::RandomPanic(rp) => {
2475 assert!(rp.mode == FaultMode::Nack);
2476 has_random_panic = true;
2477 }
2478 Middleware::Delay(_) => {}
2479 Middleware::WeakJoin(_) => {}
2480 Middleware::Limiter(_) => {}
2481 Middleware::Buffer(_) => {}
2482 Middleware::CookieJar(_) => {}
2483 }
2484 }
2485
2486 if let EndpointType::Kafka(kafka) = &input.endpoint_type {
2487 assert_eq!(kafka.topic, Some("input-topic".to_string()));
2488 assert_eq!(kafka.url, "localhost:9092");
2489 assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
2490 let tls = &kafka.tls;
2491 assert!(tls.required);
2492 assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
2493 assert!(tls.accept_invalid_certs);
2494 } else {
2495 panic!("Input endpoint should be Kafka");
2496 }
2497 assert!(has_dedup);
2498 assert!(has_metrics);
2499 assert!(has_dlq);
2500 assert!(has_retry);
2501 assert!(has_random_panic);
2502
2503 let output = &route.output;
2505 assert_eq!(output.middlewares.len(), 2);
2506 assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
2507
2508 if let EndpointType::Nats(nats) = &output.endpoint_type {
2509 assert_eq!(nats.subject, Some("output-subject".to_string()));
2510 assert_eq!(nats.url, "nats://localhost:4222");
2511 } else {
2512 panic!("Output endpoint should be NATS");
2513 }
2514 }
2515
2516 #[test]
2517 fn test_deserialize_from_yaml() {
2518 let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
2521 println!("Deserialized from YAML: {:#?}", result);
2522 let config = result.expect("Failed to deserialize TEST_YAML");
2523 assert_config_values(&config);
2524 }
2525
2526 #[test]
2527 fn test_deserialize_from_env() {
2528 unsafe {
2530 std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
2531 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
2532 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
2533 std::env::set_var(
2534 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
2535 "my-consumer-group",
2536 );
2537 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
2538 std::env::set_var(
2539 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
2540 "/path_to_ca",
2541 );
2542 std::env::set_var(
2543 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
2544 "true",
2545 );
2546 std::env::set_var(
2547 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
2548 "output-subject",
2549 );
2550 std::env::set_var(
2551 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
2552 "nats://localhost:4222",
2553 );
2554 std::env::set_var(
2555 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
2556 "dlq-subject",
2557 );
2558 std::env::set_var(
2559 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
2560 "nats://localhost:4222",
2561 );
2562 }
2563
2564 let builder = ConfigBuilder::builder()
2565 .add_source(
2567 Environment::with_prefix("MQB")
2568 .separator("__")
2569 .try_parsing(true),
2570 );
2571
2572 let config: Config = builder
2573 .build()
2574 .expect("Failed to build config")
2575 .try_deserialize()
2576 .expect("Failed to deserialize config");
2577
2578 assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
2580 if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
2581 assert_eq!(k.topic, Some("input-topic".to_string()));
2582 assert!(k.tls.required);
2583 } else {
2584 panic!("Expected Kafka endpoint");
2585 }
2586
2587 let input = &config.get("kafka_to_nats").unwrap().input;
2588 assert_eq!(input.middlewares.len(), 1);
2589 if let Middleware::Dlq(_) = &input.middlewares[0] {
2590 } else {
2592 panic!("Expected DLQ middleware");
2593 }
2594 }
2595
2596 #[test]
2597 fn test_extract_secrets() {
2598 let mut config = Config::new();
2599 let mut route = Route::default();
2600
2601 let mut kafka_config = KafkaConfig::new("kafka://user:pass@localhost:9092");
2603 kafka_config.username = Some("user".to_string());
2604 kafka_config.password = Some("pass".to_string());
2605 kafka_config.tls.cert_password = Some("certpass".to_string());
2606
2607 route.input = Endpoint {
2608 endpoint_type: EndpointType::Kafka(kafka_config),
2609 middlewares: vec![],
2610 handler: None,
2611 };
2612
2613 let mut http_config = HttpConfig::new("http://httpuser:httppass@localhost");
2615 http_config.basic_auth = Some(("httpuser".to_string(), "httppass".to_string()));
2616 http_config
2617 .custom_headers
2618 .insert("X-API-Key".to_string(), "http-api-key".to_string());
2619 http_config.custom_headers.insert(
2620 "X-Access-Token".to_string(),
2621 "http-access-token".to_string(),
2622 );
2623 http_config.custom_headers.insert(
2624 "X-Authentication".to_string(),
2625 "http-authentication".to_string(),
2626 );
2627 http_config.custom_headers.insert(
2628 "Authorization".to_string(),
2629 "Bearer secret-token".to_string(),
2630 );
2631 http_config
2632 .custom_headers
2633 .insert("X-Trace-Id".to_string(), "trace-value".to_string());
2634
2635 route.output = Endpoint {
2636 endpoint_type: EndpointType::Http(http_config),
2637 middlewares: vec![],
2638 handler: None,
2639 };
2640
2641 config.insert("test_route".to_string(), route);
2642
2643 let secrets = extract_config_secrets(&mut config);
2644
2645 assert_eq!(
2647 secrets
2648 .get("MQB__TEST_ROUTE__INPUT__KAFKA__URL")
2649 .map(|s| s.as_str()),
2650 Some("kafka://user:pass@localhost:9092")
2651 );
2652 assert_eq!(
2653 secrets
2654 .get("MQB__TEST_ROUTE__INPUT__KAFKA__USERNAME")
2655 .map(|s| s.as_str()),
2656 Some("user")
2657 );
2658 assert_eq!(
2659 secrets
2660 .get("MQB__TEST_ROUTE__INPUT__KAFKA__PASSWORD")
2661 .map(|s| s.as_str()),
2662 Some("pass")
2663 );
2664 assert_eq!(
2665 secrets
2666 .get("MQB__TEST_ROUTE__INPUT__KAFKA__TLS__CERT_PASSWORD")
2667 .map(|s| s.as_str()),
2668 Some("certpass")
2669 );
2670 assert_eq!(
2671 secrets
2672 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__URL")
2673 .map(|s| s.as_str()),
2674 Some("http://httpuser:httppass@localhost")
2675 );
2676 assert_eq!(
2677 secrets
2678 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__0")
2679 .map(|s| s.as_str()),
2680 Some("httpuser")
2681 );
2682 assert_eq!(
2683 secrets
2684 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__BASIC_AUTH__1")
2685 .map(|s| s.as_str()),
2686 Some("httppass")
2687 );
2688 assert_eq!(
2689 secrets
2690 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_API_KEY")
2691 .map(|s| s.as_str()),
2692 Some("http-api-key")
2693 );
2694 assert_eq!(
2695 secrets
2696 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_ACCESS_TOKEN")
2697 .map(|s| s.as_str()),
2698 Some("http-access-token")
2699 );
2700 assert_eq!(
2701 secrets
2702 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__X_AUTHENTICATION")
2703 .map(|s| s.as_str()),
2704 Some("http-authentication")
2705 );
2706 assert_eq!(
2707 secrets
2708 .get("MQB__TEST_ROUTE__OUTPUT__HTTP__CUSTOM_HEADERS__AUTHORIZATION")
2709 .map(|s| s.as_str()),
2710 Some("Bearer secret-token")
2711 );
2712
2713 let route = config.get("test_route").unwrap();
2715 if let EndpointType::Kafka(k) = &route.input.endpoint_type {
2716 assert!(k.url.is_empty());
2717 assert!(k.username.is_none());
2718 assert!(k.password.is_none());
2719 assert!(k.tls.cert_password.is_none());
2720 }
2721 if let EndpointType::Http(h) = &route.output.endpoint_type {
2722 assert!(h.url.is_empty());
2723 assert!(h.basic_auth.is_none());
2724 assert!(!h.custom_headers.contains_key("X-API-Key"));
2725 assert!(!h.custom_headers.contains_key("X-Access-Token"));
2726 assert!(!h.custom_headers.contains_key("X-Authentication"));
2727 assert!(!h.custom_headers.contains_key("Authorization"));
2728 assert_eq!(
2729 h.custom_headers.get("X-Trace-Id").map(|s| s.as_str()),
2730 Some("trace-value")
2731 );
2732 }
2733 }
2734
2735 #[test]
2736 fn test_extract_sensitive_url_only_strips_authority_credentials() {
2737 let mut config = Config::new();
2738 let path_at_route = Route {
2739 output: Endpoint {
2740 endpoint_type: EndpointType::Http(HttpConfig::new(
2741 "https://example.com/path/user@example.com?email=a@b.test",
2742 )),
2743 middlewares: vec![],
2744 handler: None,
2745 },
2746 ..Default::default()
2747 };
2748 config.insert("path_at_route".to_string(), path_at_route);
2749
2750 let credential_route = Route {
2751 output: Endpoint {
2752 endpoint_type: EndpointType::Http(HttpConfig::new(
2753 "https://user:pass@example.com/path",
2754 )),
2755 middlewares: vec![],
2756 handler: None,
2757 },
2758 ..Default::default()
2759 };
2760 config.insert("credential_route".to_string(), credential_route);
2761
2762 let query_at_route = Route {
2763 output: Endpoint {
2764 endpoint_type: EndpointType::Http(HttpConfig::new(
2765 "https://example.com?next=a@b.test",
2766 )),
2767 middlewares: vec![],
2768 handler: None,
2769 },
2770 ..Default::default()
2771 };
2772 config.insert("query_at_route".to_string(), query_at_route);
2773
2774 let fragment_at_route = Route {
2775 output: Endpoint {
2776 endpoint_type: EndpointType::Http(HttpConfig::new(
2777 "https://example.com#user@example.com",
2778 )),
2779 middlewares: vec![],
2780 handler: None,
2781 },
2782 ..Default::default()
2783 };
2784 config.insert("fragment_at_route".to_string(), fragment_at_route);
2785
2786 let secrets = extract_config_secrets(&mut config);
2787
2788 if let EndpointType::Http(http) = &config.get("path_at_route").unwrap().output.endpoint_type
2789 {
2790 assert_eq!(
2791 http.url,
2792 "https://example.com/path/user@example.com?email=a@b.test"
2793 );
2794 }
2795 if let EndpointType::Http(http) =
2796 &config.get("query_at_route").unwrap().output.endpoint_type
2797 {
2798 assert_eq!(http.url, "https://example.com?next=a@b.test");
2799 }
2800 if let EndpointType::Http(http) = &config
2801 .get("fragment_at_route")
2802 .unwrap()
2803 .output
2804 .endpoint_type
2805 {
2806 assert_eq!(http.url, "https://example.com#user@example.com");
2807 }
2808 if let EndpointType::Http(http) =
2809 &config.get("credential_route").unwrap().output.endpoint_type
2810 {
2811 assert!(http.url.is_empty());
2812 }
2813 assert_eq!(
2814 secrets
2815 .get("MQB__CREDENTIAL_ROUTE__OUTPUT__HTTP__URL")
2816 .map(String::as_str),
2817 Some("https://user:pass@example.com/path")
2818 );
2819 assert!(!secrets.contains_key("MQB__PATH_AT_ROUTE__OUTPUT__HTTP__URL"));
2820 assert!(!secrets.contains_key("MQB__QUERY_AT_ROUTE__OUTPUT__HTTP__URL"));
2821 assert!(!secrets.contains_key("MQB__FRAGMENT_AT_ROUTE__OUTPUT__HTTP__URL"));
2822 }
2823
2824 #[test]
2825 fn test_file_config_inference() {
2826 let yaml = r#"
2827mode: group_subscribe
2828path: "/tmp/test"
2829group_id: "my_group"
2830"#;
2831 let config: FileConfig = serde_yaml_ng::from_str(yaml).unwrap();
2832 match config.mode {
2833 Some(FileConsumerMode::GroupSubscribe { group_id, .. }) => {
2834 assert_eq!(group_id, "my_group")
2835 }
2836 _ => panic!("Expected GroupSubscribe"),
2837 }
2838
2839 let yaml_queue = r#"
2840mode: consume
2841path: "/tmp/test"
2842"#;
2843 let config_queue: FileConfig = serde_yaml_ng::from_str(yaml_queue).unwrap();
2844 match config_queue.mode {
2845 Some(FileConsumerMode::Consume { delete }) => assert!(!delete),
2846 _ => panic!("Expected Consume"),
2847 }
2848 }
2849}
2850
2851#[cfg(all(test, feature = "schema"))]
2852mod schema_tests {
2853 use super::*;
2854
2855 #[test]
2856 fn generate_json_schema() {
2857 let schema = schemars::schema_for!(Config);
2858 let schema_json = serde_json::to_string_pretty(&schema).unwrap();
2859
2860 let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
2861 path.push("mq-bridge.schema.json");
2862 std::fs::write(path, schema_json).expect("Failed to write schema file");
2863 }
2864}