1use serde::{
7 de::{MapAccess, Visitor},
8 Deserialize, Deserializer, Serialize,
9};
10use std::{collections::HashMap, sync::Arc};
11
12use crate::traits::Handler;
13use tracing::trace;
14
15pub type Config = HashMap<String, Route>;
54
55pub type PublisherConfig = HashMap<String, Endpoint>;
58
59#[derive(Debug, Deserialize, Serialize, Clone)]
61#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
62#[serde(deny_unknown_fields)]
63pub struct Route {
64 pub input: Endpoint,
66 #[serde(default = "default_output_endpoint")]
68 pub output: Endpoint,
69 #[serde(flatten, default)]
71 pub options: RouteOptions,
72}
73
74impl Default for Route {
75 fn default() -> Self {
76 Self {
77 input: Endpoint::null(),
78 output: Endpoint::null(),
79 options: RouteOptions::default(),
80 }
81 }
82}
83
84#[derive(Debug, Deserialize, Serialize, Clone)]
86#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
87#[serde(deny_unknown_fields)]
88pub struct RouteOptions {
89 #[serde(default = "default_concurrency")]
91 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
92 pub concurrency: usize,
93 #[serde(default = "default_batch_size")]
95 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
96 pub batch_size: usize,
97 #[serde(default = "default_commit_concurrency_limit")]
99 pub commit_concurrency_limit: usize,
100}
101
102impl Default for RouteOptions {
103 fn default() -> Self {
104 Self {
105 concurrency: default_concurrency(),
106 batch_size: default_batch_size(),
107 commit_concurrency_limit: default_commit_concurrency_limit(),
108 }
109 }
110}
111
112pub(crate) fn default_concurrency() -> usize {
113 1
114}
115
116pub(crate) fn default_batch_size() -> usize {
117 1
118}
119
120pub(crate) fn default_commit_concurrency_limit() -> usize {
121 4096
122}
123
124fn default_output_endpoint() -> Endpoint {
125 Endpoint::new(EndpointType::Null)
126}
127
128fn default_retry_attempts() -> usize {
129 3
130}
131fn default_initial_interval_ms() -> u64 {
132 100
133}
134fn default_max_interval_ms() -> u64 {
135 5000
136}
137fn default_multiplier() -> f64 {
138 2.0
139}
140fn default_clean_session() -> bool {
141 false
142}
143
144fn is_known_endpoint_name(name: &str) -> bool {
145 matches!(
146 name,
147 "aws"
148 | "kafka"
149 | "nats"
150 | "file"
151 | "static"
152 | "memory"
153 | "amqp"
154 | "mongodb"
155 | "mqtt"
156 | "http"
157 | "ibm-mq"
158 | "ibmmq"
159 | "zeromq"
160 | "fanout"
161 | "switch"
162 | "response"
163 )
164}
165
166#[derive(Serialize, Clone, Default)]
168#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
169#[serde(deny_unknown_fields)]
170pub struct Endpoint {
171 #[serde(default)]
173 pub middlewares: Vec<Middleware>,
174
175 #[serde(flatten)]
177 pub endpoint_type: EndpointType,
178
179 #[serde(skip_serializing)]
180 #[cfg_attr(feature = "schema", schemars(skip))]
181 pub handler: Option<Arc<dyn Handler>>,
183}
184
185impl std::fmt::Debug for Endpoint {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct("Endpoint")
188 .field("middlewares", &self.middlewares)
189 .field("endpoint_type", &self.endpoint_type)
190 .field(
191 "handler",
192 &if self.handler.is_some() {
193 "Some(<Handler>)"
194 } else {
195 "None"
196 },
197 )
198 .finish()
199 }
200}
201
202impl<'de> Deserialize<'de> for Endpoint {
203 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
204 where
205 D: Deserializer<'de>,
206 {
207 struct EndpointVisitor;
208
209 impl<'de> Visitor<'de> for EndpointVisitor {
210 type Value = Endpoint;
211
212 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213 formatter.write_str("a map representing an endpoint or null")
214 }
215
216 fn visit_unit<E>(self) -> Result<Self::Value, E>
217 where
218 E: serde::de::Error,
219 {
220 Ok(Endpoint::new(EndpointType::Null))
221 }
222
223 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
224 where
225 A: MapAccess<'de>,
226 {
227 let mut temp_map = serde_json::Map::new();
230 let mut middlewares_val = None;
231
232 while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
233 if key == "middlewares" {
234 middlewares_val = Some(value);
235 } else {
236 temp_map.insert(key, value);
237 }
238 }
239
240 let temp_val = serde_json::Value::Object(temp_map);
242 let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
243 Ok(et) => et,
244 Err(original_err) => {
245 if let serde_json::Value::Object(map) = &temp_val {
246 if map.len() == 1 {
247 let (name, config) = map.iter().next().unwrap();
248 if is_known_endpoint_name(name) {
249 return Err(serde::de::Error::custom(original_err));
250 }
251 trace!("Falling back to Custom endpoint for key: {}", name);
252 EndpointType::Custom {
253 name: name.clone(),
254 config: config.clone(),
255 }
256 } else if map.is_empty() {
257 EndpointType::Null
258 } else {
259 return Err(serde::de::Error::custom(
260 "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
261 ));
262 }
263 } else {
264 return Err(serde::de::Error::custom("Invalid endpoint configuration"));
265 }
266 }
267 };
268
269 let middlewares = match middlewares_val {
271 Some(val) => {
272 deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
273 }
274 None => Vec::new(),
275 };
276
277 Ok(Endpoint {
278 middlewares,
279 endpoint_type,
280 handler: None,
281 })
282 }
283 }
284
285 deserializer.deserialize_any(EndpointVisitor)
286 }
287}
288
289fn is_known_middleware_name(name: &str) -> bool {
290 matches!(
291 name,
292 "deduplication"
293 | "metrics"
294 | "dlq"
295 | "retry"
296 | "random_panic"
297 | "delay"
298 | "weak_join"
299 | "custom"
300 )
301}
302
303fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
307 let arr = match value {
308 serde_json::Value::Array(arr) => arr,
309 serde_json::Value::Object(map) => {
310 let mut middlewares: Vec<_> = map
311 .into_iter()
312 .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
315 .collect();
316 middlewares.sort_by_key(|(index, _)| *index);
317
318 middlewares.into_iter().map(|(_, value)| value).collect()
319 }
320 _ => return Err(anyhow::anyhow!("Expected an array or object")),
321 };
322
323 let mut middlewares = Vec::new();
324 for item in arr {
325 let known_name = if let serde_json::Value::Object(map) = &item {
327 if map.len() == 1 {
328 let (name, _) = map.iter().next().unwrap();
329 if is_known_middleware_name(name) {
330 Some(name.clone())
331 } else {
332 None
333 }
334 } else {
335 None
336 }
337 } else {
338 None
339 };
340
341 if let Some(name) = known_name {
342 match serde_json::from_value::<Middleware>(item.clone()) {
343 Ok(m) => middlewares.push(m),
344 Err(e) => {
345 return Err(anyhow::anyhow!(
346 "Failed to deserialize known middleware '{}': {}",
347 name,
348 e
349 ))
350 }
351 }
352 } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
353 middlewares.push(m);
354 } else if let serde_json::Value::Object(map) = &item {
355 if map.len() == 1 {
356 let (name, config) = map.iter().next().unwrap();
357 middlewares.push(Middleware::Custom {
358 name: name.clone(),
359 config: config.clone(),
360 });
361 } else {
362 return Err(anyhow::anyhow!(
363 "Invalid middleware configuration: {:?}",
364 item
365 ));
366 }
367 } else {
368 return Err(anyhow::anyhow!(
369 "Invalid middleware configuration: {:?}",
370 item
371 ));
372 }
373 }
374 Ok(middlewares)
375}
376
377#[derive(Debug, Deserialize, Serialize, Clone, Default)]
399#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
400#[serde(rename_all = "lowercase")]
401pub enum EndpointType {
402 Aws(AwsConfig),
403 Kafka(KafkaConfig),
404 Nats(NatsConfig),
405 File(FileConfig),
406 Static(String),
407 Memory(MemoryConfig),
408 Amqp(AmqpConfig),
409 MongoDb(MongoDbConfig),
410 Mqtt(MqttConfig),
411 Http(HttpConfig),
412 IbmMq(IbmMqConfig),
413 ZeroMq(ZeroMqConfig),
414 Fanout(Vec<Endpoint>),
415 Switch(SwitchConfig),
416 Response(ResponseConfig),
417 Custom {
418 name: String,
419 config: serde_json::Value,
420 },
421 #[default]
422 Null,
423}
424
425impl EndpointType {
426 pub fn name(&self) -> &'static str {
427 match self {
428 EndpointType::Aws(_) => "aws",
429 EndpointType::Kafka(_) => "kafka",
430 EndpointType::Nats(_) => "nats",
431 EndpointType::File(_) => "file",
432 EndpointType::Static(_) => "static",
433 EndpointType::Memory(_) => "memory",
434 EndpointType::Amqp(_) => "amqp",
435 EndpointType::MongoDb(_) => "mongodb",
436 EndpointType::Mqtt(_) => "mqtt",
437 EndpointType::Http(_) => "http",
438 EndpointType::IbmMq(_) => "ibmmq",
439 EndpointType::ZeroMq(_) => "zeromq",
440 EndpointType::Fanout(_) => "fanout",
441 EndpointType::Switch(_) => "switch",
442 EndpointType::Response(_) => "response",
443 EndpointType::Custom { .. } => "custom",
444 EndpointType::Null => "null",
445 }
446 }
447
448 pub fn is_core(&self) -> bool {
449 matches!(
450 self,
451 EndpointType::File(_)
452 | EndpointType::Static(_)
453 | EndpointType::Memory(_)
454 | EndpointType::Fanout(_)
455 | EndpointType::Switch(_)
456 | EndpointType::Response(_)
457 | EndpointType::Custom { .. }
458 | EndpointType::Null
459 )
460 }
461}
462
463#[derive(Debug, Deserialize, Serialize, Clone)]
465#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
466#[serde(rename_all = "snake_case")]
467pub enum Middleware {
468 Deduplication(DeduplicationMiddleware),
469 Metrics(MetricsMiddleware),
470 Dlq(Box<DeadLetterQueueMiddleware>),
471 Retry(RetryMiddleware),
472 RandomPanic(RandomPanicMiddleware),
473 Delay(DelayMiddleware),
474 WeakJoin(WeakJoinMiddleware),
475 Custom {
476 name: String,
477 config: serde_json::Value,
478 },
479}
480
481#[derive(Debug, Deserialize, Serialize, Clone)]
483#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
484#[serde(deny_unknown_fields)]
485pub struct DeduplicationMiddleware {
486 pub sled_path: String,
488 pub ttl_seconds: u64,
490}
491
492#[derive(Debug, Deserialize, Serialize, Clone)]
495#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
496#[serde(deny_unknown_fields)]
497pub struct MetricsMiddleware {}
498
499#[derive(Debug, Deserialize, Serialize, Clone, Default)]
502#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
503#[serde(deny_unknown_fields)]
504pub struct DeadLetterQueueMiddleware {
505 pub endpoint: Endpoint,
507}
508
509#[derive(Debug, Deserialize, Serialize, Clone, Default)]
510#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
511#[serde(deny_unknown_fields)]
512pub struct RetryMiddleware {
513 #[serde(default = "default_retry_attempts")]
515 pub max_attempts: usize,
516 #[serde(default = "default_initial_interval_ms")]
518 pub initial_interval_ms: u64,
519 #[serde(default = "default_max_interval_ms")]
521 pub max_interval_ms: u64,
522 #[serde(default = "default_multiplier")]
524 pub multiplier: f64,
525}
526
527#[derive(Debug, Deserialize, Serialize, Clone)]
528#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
529#[serde(deny_unknown_fields)]
530pub struct DelayMiddleware {
531 pub delay_ms: u64,
533}
534
535#[derive(Debug, Deserialize, Serialize, Clone)]
536#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
537#[serde(deny_unknown_fields)]
538pub struct WeakJoinMiddleware {
539 pub group_by: String,
541 pub expected_count: usize,
543 pub timeout_ms: u64,
545}
546
547#[derive(Debug, Deserialize, Serialize, Clone)]
548#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
549#[serde(deny_unknown_fields)]
550pub struct RandomPanicMiddleware {
551 #[serde(deserialize_with = "deserialize_probability")]
553 pub probability: f64,
554}
555
556fn deserialize_probability<'de, D>(deserializer: D) -> Result<f64, D::Error>
557where
558 D: Deserializer<'de>,
559{
560 let value = f64::deserialize(deserializer)?;
561 if !(0.0..=1.0).contains(&value) {
562 return Err(serde::de::Error::custom(
563 "probability must be between 0.0 and 1.0",
564 ));
565 }
566 Ok(value)
567}
568
569#[derive(Debug, Deserialize, Serialize, Clone, Default)]
571#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
572#[serde(deny_unknown_fields)]
573pub struct AwsConfig {
574 pub queue_url: Option<String>,
576 pub topic_arn: Option<String>,
578 pub region: Option<String>,
580 pub endpoint_url: Option<String>,
582 pub access_key: Option<String>,
584 pub secret_key: Option<String>,
586 pub session_token: Option<String>,
588 #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
590 pub max_messages: Option<i32>,
591 #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
593 pub wait_time_seconds: Option<i32>,
594}
595
596#[derive(Debug, Deserialize, Serialize, Clone, Default)]
600#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
601#[serde(deny_unknown_fields)]
602pub struct KafkaConfig {
603 #[serde(alias = "brokers")]
605 pub url: String,
606 pub topic: Option<String>,
608 pub username: Option<String>,
610 pub password: Option<String>,
612 #[serde(default)]
614 pub tls: TlsConfig,
615 pub group_id: Option<String>,
618 #[serde(default)]
620 pub delayed_ack: bool,
621 #[serde(default)]
623 pub producer_options: Option<Vec<(String, String)>>,
624 #[serde(default)]
626 pub consumer_options: Option<Vec<(String, String)>>,
627}
628
629#[derive(Debug, Serialize, Clone)]
632#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
633pub struct FileConfig {
634 pub path: String,
636 #[serde(default)]
639 pub subscribe_mode: bool,
640 pub delete: Option<bool>,
643}
644
645impl<'de> Deserialize<'de> for FileConfig {
646 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
647 where
648 D: Deserializer<'de>,
649 {
650 struct FileConfigVisitor;
651 impl<'de> Visitor<'de> for FileConfigVisitor {
652 type Value = FileConfig;
653 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
654 formatter.write_str("string or map")
655 }
656 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
657 where
658 E: serde::de::Error,
659 {
660 Ok(FileConfig {
661 path: value.to_string(),
662 subscribe_mode: false,
663 delete: None,
664 })
665 }
666 fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
667 where
668 M: MapAccess<'de>,
669 {
670 let mut path = None;
671 let mut consume = true;
672 let mut subscribe_mode = None;
673 let mut delete = None;
674 while let Some(key) = map.next_key::<String>()? {
675 match key.as_str() {
676 "path" => {
677 if path.is_some() {
678 return Err(serde::de::Error::duplicate_field("path"));
679 }
680 path = Some(map.next_value()?);
681 }
682 "consume" => {
683 consume = map.next_value()?;
684 }
685 "subscribe_mode" => {
686 if subscribe_mode.is_some() {
687 return Err(serde::de::Error::duplicate_field("subscribe_mode"));
688 }
689 subscribe_mode = Some(map.next_value()?);
690 }
691 "delete" => {
692 if delete.is_some() {
693 return Err(serde::de::Error::duplicate_field("delete"));
694 }
695 delete = Some(map.next_value()?);
696 }
697 _ => {
698 let _ = map.next_value::<serde::de::IgnoredAny>()?;
699 }
700 }
701 }
702 let path = path.ok_or_else(|| serde::de::Error::missing_field("path"))?;
703 Ok(FileConfig {
704 path,
705 subscribe_mode: subscribe_mode.unwrap_or(!consume),
706 delete,
707 })
708 }
709 }
710 deserializer.deserialize_any(FileConfigVisitor)
711 }
712}
713
714#[derive(Debug, Deserialize, Serialize, Clone, Default)]
718#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
719#[serde(deny_unknown_fields)]
720pub struct NatsConfig {
721 pub url: String,
723 pub subject: Option<String>,
725 pub stream: Option<String>,
727 pub username: Option<String>,
729 pub password: Option<String>,
731 #[serde(default)]
733 pub tls: TlsConfig,
734 pub token: Option<String>,
736 #[serde(default)]
740 pub request_reply: bool,
741 pub request_timeout_ms: Option<u64>,
743 #[serde(default)]
745 pub delayed_ack: bool,
746 #[serde(default)]
748 pub no_jetstream: bool,
749 #[serde(default)]
751 pub subscriber_mode: bool,
752 pub stream_max_messages: Option<i64>,
754 pub stream_max_bytes: Option<i64>,
756 pub prefetch_count: Option<usize>,
758}
759
760#[derive(Debug, Serialize, Deserialize, Clone, Default)]
761#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
762#[serde(deny_unknown_fields)]
763pub struct MemoryConfig {
764 pub topic: String,
766 pub capacity: Option<usize>,
768 #[serde(default)]
770 pub request_reply: bool,
771 pub request_timeout_ms: Option<u64>,
773 #[serde(default)]
775 pub subscribe_mode: bool,
776 #[serde(default)]
778 pub enable_nack: bool,
779}
780
781impl MemoryConfig {
782 pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
783 Self {
784 topic: topic.into(),
785 capacity,
786 ..Default::default()
787 }
788 }
789 pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
790 Self {
791 subscribe_mode,
792 ..self
793 }
794 }
795}
796
797#[derive(Debug, Deserialize, Serialize, Clone, Default)]
801#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
802#[serde(deny_unknown_fields)]
803pub struct AmqpConfig {
804 pub url: String,
808 pub queue: Option<String>,
810 #[serde(default)]
812 pub subscribe_mode: bool,
813 pub username: Option<String>,
815 pub password: Option<String>,
817 #[serde(default)]
819 pub tls: TlsConfig,
820 pub exchange: Option<String>,
822 pub prefetch_count: Option<u16>,
824 #[serde(default)]
826 pub no_persistence: bool,
827 #[serde(default)]
829 pub delayed_ack: bool,
830}
831
832#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
833#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
834#[serde(rename_all = "lowercase")]
835pub enum MongoDbFormat {
836 #[default]
837 Normal,
838 Json,
839 Raw,
840}
841
842#[derive(Debug, Deserialize, Serialize, Clone, Default)]
846#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
847#[serde(deny_unknown_fields)]
848pub struct MongoDbConfig {
849 pub url: String,
852 pub collection: Option<String>,
854 pub username: Option<String>,
857 pub password: Option<String>,
860 #[serde(default)]
862 pub tls: TlsConfig,
863 pub database: String,
865 pub polling_interval_ms: Option<u64>,
867 pub reply_polling_ms: Option<u64>,
869 #[serde(default)]
871 pub request_reply: bool,
872 #[serde(default)]
874 pub change_stream: bool,
875 pub request_timeout_ms: Option<u64>,
877 pub ttl_seconds: Option<u64>,
879 pub capped_size_bytes: Option<i64>,
881 #[serde(default)]
883 pub format: MongoDbFormat,
884 pub cursor_id: Option<String>,
886}
887
888#[derive(Debug, Deserialize, Serialize, Clone, Default)]
892#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
893#[serde(deny_unknown_fields)]
894pub struct MqttConfig {
895 pub url: String,
897 pub topic: Option<String>,
899 pub username: Option<String>,
901 pub password: Option<String>,
903 #[serde(default)]
905 pub tls: TlsConfig,
906 pub client_id: Option<String>,
908 pub queue_capacity: Option<usize>,
910 pub max_inflight: Option<u16>,
912 pub qos: Option<u8>,
914 #[serde(default = "default_clean_session")]
916 pub clean_session: bool,
917 pub keep_alive_seconds: Option<u64>,
919 #[serde(default)]
921 pub protocol: MqttProtocol,
922 pub session_expiry_interval: Option<u32>,
924 #[serde(default)]
927 pub delayed_ack: bool,
928}
929
930#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
931#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
932#[serde(rename_all = "lowercase")]
933pub enum MqttProtocol {
934 #[default]
935 V5,
936 V3,
937}
938
939#[derive(Debug, Deserialize, Serialize, Clone, Default)]
942#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
943#[serde(deny_unknown_fields)]
944pub struct ZeroMqConfig {
945 pub url: String,
947 #[serde(default)]
949 pub socket_type: Option<ZeroMqSocketType>,
950 pub topic: Option<String>,
952 #[serde(default)]
954 pub bind: bool,
955 #[serde(default)]
957 pub internal_buffer_size: Option<usize>,
958}
959
960#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
961#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
962#[serde(rename_all = "lowercase")]
963pub enum ZeroMqSocketType {
964 Push,
965 Pull,
966 Pub,
967 Sub,
968 Req,
969 Rep,
970}
971
972#[derive(Debug, Deserialize, Serialize, Clone, Default)]
976#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
977#[serde(deny_unknown_fields)]
978pub struct HttpConfig {
979 pub url: String,
981 #[serde(default)]
983 pub tls: TlsConfig,
984 pub workers: Option<usize>,
986 pub message_id_header: Option<String>,
988 pub request_timeout_ms: Option<u64>,
990}
991
992#[derive(Debug, Deserialize, Serialize, Clone, Default)]
996#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
997#[serde(deny_unknown_fields)]
998pub struct IbmMqConfig {
999 pub url: String,
1001 pub queue: Option<String>,
1003 pub topic: Option<String>,
1005 pub queue_manager: String,
1007 pub channel: String,
1009 pub username: Option<String>,
1011 pub password: Option<String>,
1013 pub cipher_spec: Option<String>,
1015 #[serde(default)]
1017 pub tls: TlsConfig,
1018 #[serde(default = "default_max_message_size")]
1020 pub max_message_size: usize,
1021 #[serde(default = "default_wait_timeout_ms")]
1023 pub wait_timeout_ms: i32,
1024 #[serde(default)]
1026 pub internal_buffer_size: Option<usize>,
1027}
1028
1029fn default_max_message_size() -> usize {
1030 4 * 1024 * 1024 }
1032
1033fn default_wait_timeout_ms() -> i32 {
1034 1000 }
1036
1037#[derive(Debug, Deserialize, Serialize, Clone)]
1040#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1041#[serde(deny_unknown_fields)]
1042pub struct SwitchConfig {
1043 pub metadata_key: String,
1045 pub cases: HashMap<String, Endpoint>,
1047 pub default: Option<Box<Endpoint>>,
1049}
1050
1051#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1053#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1054#[serde(deny_unknown_fields)]
1055pub struct ResponseConfig {
1056 }
1058
1059#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1063#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1064#[serde(deny_unknown_fields)]
1065pub struct TlsConfig {
1066 pub required: bool,
1068 pub ca_file: Option<String>,
1070 pub cert_file: Option<String>,
1072 pub key_file: Option<String>,
1074 pub cert_password: Option<String>,
1076 #[serde(default)]
1078 pub accept_invalid_certs: bool,
1079}
1080
1081impl TlsConfig {
1082 pub fn is_mtls_client_configured(&self) -> bool {
1083 self.required && self.cert_file.is_some() && self.key_file.is_some()
1084 }
1085 pub fn is_tls_server_configured(&self) -> bool {
1086 self.required && self.cert_file.is_some() && self.key_file.is_some()
1087 }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use super::*;
1093 use config::{Config as ConfigBuilder, Environment};
1094
1095 const TEST_YAML: &str = r#"
1096kafka_to_nats:
1097 concurrency: 10
1098 input:
1099 middlewares:
1100 - deduplication:
1101 sled_path: "/tmp/mq-bridge/dedup_db"
1102 ttl_seconds: 3600
1103 - metrics: {}
1104 - retry:
1105 max_attempts: 5
1106 initial_interval_ms: 200
1107 - random_panic:
1108 probability: 0.1
1109 - dlq:
1110 endpoint:
1111 nats:
1112 subject: "dlq-subject"
1113 url: "nats://localhost:4222"
1114 kafka:
1115 topic: "input-topic"
1116 url: "localhost:9092"
1117 group_id: "my-consumer-group"
1118 tls:
1119 required: true
1120 ca_file: "/path_to_ca"
1121 cert_file: "/path_to_cert"
1122 key_file: "/path_to_key"
1123 cert_password: "password"
1124 accept_invalid_certs: true
1125 output:
1126 middlewares:
1127 - metrics: {}
1128 nats:
1129 subject: "output-subject"
1130 url: "nats://localhost:4222"
1131"#;
1132
1133 fn assert_config_values(config: &Config) {
1134 assert_eq!(config.len(), 1);
1135 let route = config.get("kafka_to_nats").expect("Route should exist");
1136
1137 assert_eq!(route.options.concurrency, 10);
1138
1139 let input = &route.input;
1141 assert_eq!(input.middlewares.len(), 5);
1142
1143 let mut has_dedup = false;
1144 let mut has_metrics = false;
1145 let mut has_dlq = false;
1146 let mut has_retry = false;
1147 let mut has_random_panic = false;
1148 for middleware in &input.middlewares {
1149 match middleware {
1150 Middleware::Deduplication(dedup) => {
1151 assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
1152 assert_eq!(dedup.ttl_seconds, 3600);
1153 has_dedup = true;
1154 }
1155 Middleware::Metrics(_) => {
1156 has_metrics = true;
1157 }
1158 Middleware::Custom { .. } => {}
1159 Middleware::Dlq(dlq) => {
1160 assert!(dlq.endpoint.middlewares.is_empty());
1161 if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
1162 assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
1163 assert_eq!(nats_cfg.url, "nats://localhost:4222");
1164 }
1165 has_dlq = true;
1166 }
1167 Middleware::Retry(retry) => {
1168 assert_eq!(retry.max_attempts, 5);
1169 assert_eq!(retry.initial_interval_ms, 200);
1170 has_retry = true;
1171 }
1172 Middleware::RandomPanic(rp) => {
1173 assert!((rp.probability - 0.1).abs() < f64::EPSILON);
1174 has_random_panic = true;
1175 }
1176 Middleware::Delay(_) => {}
1177 Middleware::WeakJoin(_) => {}
1178 }
1179 }
1180
1181 if let EndpointType::Kafka(kafka) = &input.endpoint_type {
1182 assert_eq!(kafka.topic, Some("input-topic".to_string()));
1183 assert_eq!(kafka.url, "localhost:9092");
1184 assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
1185 let tls = &kafka.tls;
1186 assert!(tls.required);
1187 assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
1188 assert!(tls.accept_invalid_certs);
1189 } else {
1190 panic!("Input endpoint should be Kafka");
1191 }
1192 assert!(has_dedup);
1193 assert!(has_metrics);
1194 assert!(has_dlq);
1195 assert!(has_retry);
1196 assert!(has_random_panic);
1197
1198 let output = &route.output;
1200 assert_eq!(output.middlewares.len(), 1);
1201 assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
1202
1203 if let EndpointType::Nats(nats) = &output.endpoint_type {
1204 assert_eq!(nats.subject, Some("output-subject".to_string()));
1205 assert_eq!(nats.url, "nats://localhost:4222");
1206 } else {
1207 panic!("Output endpoint should be NATS");
1208 }
1209 }
1210
1211 #[test]
1212 fn test_deserialize_from_yaml() {
1213 let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
1216 println!("Deserialized from YAML: {:#?}", result);
1217 let config = result.expect("Failed to deserialize TEST_YAML");
1218 assert_config_values(&config);
1219 }
1220
1221 #[test]
1222 fn test_deserialize_from_env() {
1223 unsafe {
1225 std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
1226 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
1227 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
1228 std::env::set_var(
1229 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
1230 "my-consumer-group",
1231 );
1232 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
1233 std::env::set_var(
1234 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
1235 "/path_to_ca",
1236 );
1237 std::env::set_var(
1238 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
1239 "true",
1240 );
1241 std::env::set_var(
1242 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
1243 "output-subject",
1244 );
1245 std::env::set_var(
1246 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
1247 "nats://localhost:4222",
1248 );
1249 std::env::set_var(
1250 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
1251 "dlq-subject",
1252 );
1253 std::env::set_var(
1254 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
1255 "nats://localhost:4222",
1256 );
1257 }
1258
1259 let builder = ConfigBuilder::builder()
1260 .add_source(
1262 Environment::with_prefix("MQB")
1263 .separator("__")
1264 .try_parsing(true),
1265 );
1266
1267 let config: Config = builder
1268 .build()
1269 .expect("Failed to build config")
1270 .try_deserialize()
1271 .expect("Failed to deserialize config");
1272
1273 assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
1275 if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
1276 assert_eq!(k.topic, Some("input-topic".to_string()));
1277 assert!(k.tls.required);
1278 } else {
1279 panic!("Expected Kafka endpoint");
1280 }
1281
1282 let input = &config.get("kafka_to_nats").unwrap().input;
1283 assert_eq!(input.middlewares.len(), 1);
1284 if let Middleware::Dlq(_) = &input.middlewares[0] {
1285 } else {
1287 panic!("Expected DLQ middleware");
1288 }
1289 }
1290}
1291
1292#[cfg(all(test, feature = "schema"))]
1293mod schema_tests {
1294 use super::*;
1295
1296 #[test]
1297 fn generate_json_schema() {
1298 let schema = schemars::schema_for!(Config);
1299 let schema_json = serde_json::to_string_pretty(&schema).unwrap();
1300
1301 let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1302 path.push("mq-bridge.schema.json");
1303 std::fs::write(path, schema_json).expect("Failed to write schema file");
1304 }
1305}