1use serde::{
7 de::{MapAccess, Visitor},
8 Deserialize, Deserializer, Serialize,
9};
10use std::{collections::HashMap, sync::Arc};
11
12use crate::traits::Handler;
13use tracing::trace;
14
15pub type Config = HashMap<String, Route>;
54
55pub type PublisherConfig = HashMap<String, Endpoint>;
58
59#[derive(Debug, Deserialize, Serialize, Clone)]
61#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
62#[serde(deny_unknown_fields)]
63pub struct Route {
64 pub input: Endpoint,
66 #[serde(default = "default_output_endpoint")]
68 pub output: Endpoint,
69 #[serde(flatten, default)]
71 pub options: RouteOptions,
72}
73
74impl Default for Route {
75 fn default() -> Self {
76 Self {
77 input: Endpoint::null(),
78 output: Endpoint::null(),
79 options: RouteOptions::default(),
80 }
81 }
82}
83
84#[derive(Debug, Deserialize, Serialize, Clone)]
86#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
87#[serde(deny_unknown_fields)]
88pub struct RouteOptions {
89 #[serde(default = "default_concurrency")]
91 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
92 pub concurrency: usize,
93 #[serde(default = "default_batch_size")]
95 #[cfg_attr(feature = "schema", schemars(range(min = 1)))]
96 pub batch_size: usize,
97 #[serde(default = "default_commit_concurrency_limit")]
99 pub commit_concurrency_limit: usize,
100}
101
102impl Default for RouteOptions {
103 fn default() -> Self {
104 Self {
105 concurrency: default_concurrency(),
106 batch_size: default_batch_size(),
107 commit_concurrency_limit: default_commit_concurrency_limit(),
108 }
109 }
110}
111
112pub(crate) fn default_concurrency() -> usize {
113 1
114}
115
116pub(crate) fn default_batch_size() -> usize {
117 1
118}
119
120pub(crate) fn default_commit_concurrency_limit() -> usize {
121 4096
122}
123
124fn default_output_endpoint() -> Endpoint {
125 Endpoint::new(EndpointType::Null)
126}
127
128fn default_retry_attempts() -> usize {
129 3
130}
131fn default_initial_interval_ms() -> u64 {
132 100
133}
134fn default_max_interval_ms() -> u64 {
135 5000
136}
137fn default_multiplier() -> f64 {
138 2.0
139}
140fn default_clean_session() -> bool {
141 false
142}
143
144fn is_known_endpoint_name(name: &str) -> bool {
145 matches!(
146 name,
147 "aws"
148 | "kafka"
149 | "nats"
150 | "file"
151 | "static"
152 | "memory"
153 | "amqp"
154 | "mongodb"
155 | "mqtt"
156 | "http"
157 | "ibm-mq"
158 | "ibmmq"
159 | "zeromq"
160 | "fanout"
161 | "switch"
162 | "response"
163 )
164}
165
166#[derive(Serialize, Clone, Default)]
168#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
169#[serde(deny_unknown_fields)]
170pub struct Endpoint {
171 #[serde(default)]
173 pub middlewares: Vec<Middleware>,
174
175 #[serde(flatten)]
177 pub endpoint_type: EndpointType,
178
179 #[serde(skip_serializing)]
180 #[cfg_attr(feature = "schema", schemars(skip))]
181 pub handler: Option<Arc<dyn Handler>>,
183}
184
185impl std::fmt::Debug for Endpoint {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct("Endpoint")
188 .field("middlewares", &self.middlewares)
189 .field("endpoint_type", &self.endpoint_type)
190 .field(
191 "handler",
192 &if self.handler.is_some() {
193 "Some(<Handler>)"
194 } else {
195 "None"
196 },
197 )
198 .finish()
199 }
200}
201
202impl<'de> Deserialize<'de> for Endpoint {
203 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
204 where
205 D: Deserializer<'de>,
206 {
207 struct EndpointVisitor;
208
209 impl<'de> Visitor<'de> for EndpointVisitor {
210 type Value = Endpoint;
211
212 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213 formatter.write_str("a map representing an endpoint or null")
214 }
215
216 fn visit_unit<E>(self) -> Result<Self::Value, E>
217 where
218 E: serde::de::Error,
219 {
220 Ok(Endpoint::new(EndpointType::Null))
221 }
222
223 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
224 where
225 A: MapAccess<'de>,
226 {
227 let mut temp_map = serde_json::Map::new();
230 let mut middlewares_val = None;
231
232 while let Some((key, value)) = map.next_entry::<String, serde_json::Value>()? {
233 if key == "middlewares" {
234 middlewares_val = Some(value);
235 } else {
236 temp_map.insert(key, value);
237 }
238 }
239
240 let temp_val = serde_json::Value::Object(temp_map);
242 let endpoint_type: EndpointType = match serde_json::from_value(temp_val.clone()) {
243 Ok(et) => et,
244 Err(original_err) => {
245 if let serde_json::Value::Object(map) = &temp_val {
246 if map.len() == 1 {
247 let (name, config) = map.iter().next().unwrap();
248 if is_known_endpoint_name(name) {
249 return Err(serde::de::Error::custom(original_err));
250 }
251 trace!("Falling back to Custom endpoint for key: {}", name);
252 EndpointType::Custom {
253 name: name.clone(),
254 config: config.clone(),
255 }
256 } else if map.is_empty() {
257 EndpointType::Null
258 } else {
259 return Err(serde::de::Error::custom(
260 "Invalid endpoint configuration: multiple keys found or unknown endpoint type",
261 ));
262 }
263 } else {
264 return Err(serde::de::Error::custom("Invalid endpoint configuration"));
265 }
266 }
267 };
268
269 let middlewares = match middlewares_val {
271 Some(val) => {
272 deserialize_middlewares_from_value(val).map_err(serde::de::Error::custom)?
273 }
274 None => Vec::new(),
275 };
276
277 Ok(Endpoint {
278 middlewares,
279 endpoint_type,
280 handler: None,
281 })
282 }
283 }
284
285 deserializer.deserialize_any(EndpointVisitor)
286 }
287}
288
289fn is_known_middleware_name(name: &str) -> bool {
290 matches!(
291 name,
292 "deduplication"
293 | "metrics"
294 | "dlq"
295 | "retry"
296 | "random_panic"
297 | "delay"
298 | "weak_join"
299 | "custom"
300 )
301}
302
303fn deserialize_middlewares_from_value(value: serde_json::Value) -> anyhow::Result<Vec<Middleware>> {
307 let arr = match value {
308 serde_json::Value::Array(arr) => arr,
309 serde_json::Value::Object(map) => {
310 let mut middlewares: Vec<_> = map
311 .into_iter()
312 .filter_map(|(key, value)| key.parse::<usize>().ok().map(|index| (index, value)))
315 .collect();
316 middlewares.sort_by_key(|(index, _)| *index);
317
318 middlewares.into_iter().map(|(_, value)| value).collect()
319 }
320 _ => return Err(anyhow::anyhow!("Expected an array or object")),
321 };
322
323 let mut middlewares = Vec::new();
324 for item in arr {
325 let known_name = if let serde_json::Value::Object(map) = &item {
327 if map.len() == 1 {
328 let (name, _) = map.iter().next().unwrap();
329 if is_known_middleware_name(name) {
330 Some(name.clone())
331 } else {
332 None
333 }
334 } else {
335 None
336 }
337 } else {
338 None
339 };
340
341 if let Some(name) = known_name {
342 match serde_json::from_value::<Middleware>(item.clone()) {
343 Ok(m) => middlewares.push(m),
344 Err(e) => {
345 return Err(anyhow::anyhow!(
346 "Failed to deserialize known middleware '{}': {}",
347 name,
348 e
349 ))
350 }
351 }
352 } else if let Ok(m) = serde_json::from_value::<Middleware>(item.clone()) {
353 middlewares.push(m);
354 } else if let serde_json::Value::Object(map) = &item {
355 if map.len() == 1 {
356 let (name, config) = map.iter().next().unwrap();
357 middlewares.push(Middleware::Custom {
358 name: name.clone(),
359 config: config.clone(),
360 });
361 } else {
362 return Err(anyhow::anyhow!(
363 "Invalid middleware configuration: {:?}",
364 item
365 ));
366 }
367 } else {
368 return Err(anyhow::anyhow!(
369 "Invalid middleware configuration: {:?}",
370 item
371 ));
372 }
373 }
374 Ok(middlewares)
375}
376
377#[derive(Debug, Deserialize, Serialize, Clone, Default)]
399#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
400#[serde(rename_all = "lowercase")]
401pub enum EndpointType {
402 Aws(AwsConfig),
403 Kafka(KafkaConfig),
404 Nats(NatsConfig),
405 File(FileConfig),
406 Static(String),
407 Memory(MemoryConfig),
408 Amqp(AmqpConfig),
409 MongoDb(MongoDbConfig),
410 Mqtt(MqttConfig),
411 Http(HttpConfig),
412 IbmMq(IbmMqConfig),
413 ZeroMq(ZeroMqConfig),
414 Fanout(Vec<Endpoint>),
415 Switch(SwitchConfig),
416 Response(ResponseConfig),
417 Custom {
418 name: String,
419 config: serde_json::Value,
420 },
421 #[default]
422 Null,
423}
424
425impl EndpointType {
426 pub fn name(&self) -> &'static str {
427 match self {
428 EndpointType::Aws(_) => "aws",
429 EndpointType::Kafka(_) => "kafka",
430 EndpointType::Nats(_) => "nats",
431 EndpointType::File(_) => "file",
432 EndpointType::Static(_) => "static",
433 EndpointType::Memory(_) => "memory",
434 EndpointType::Amqp(_) => "amqp",
435 EndpointType::MongoDb(_) => "mongodb",
436 EndpointType::Mqtt(_) => "mqtt",
437 EndpointType::Http(_) => "http",
438 EndpointType::IbmMq(_) => "ibmmq",
439 EndpointType::ZeroMq(_) => "zeromq",
440 EndpointType::Fanout(_) => "fanout",
441 EndpointType::Switch(_) => "switch",
442 EndpointType::Response(_) => "response",
443 EndpointType::Custom { .. } => "custom",
444 EndpointType::Null => "null",
445 }
446 }
447
448 pub fn is_core(&self) -> bool {
449 matches!(
450 self,
451 EndpointType::File(_)
452 | EndpointType::Static(_)
453 | EndpointType::Memory(_)
454 | EndpointType::Fanout(_)
455 | EndpointType::Switch(_)
456 | EndpointType::Response(_)
457 | EndpointType::Custom { .. }
458 | EndpointType::Null
459 )
460 }
461}
462
463#[derive(Debug, Deserialize, Serialize, Clone)]
465#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
466#[serde(rename_all = "snake_case")]
467pub enum Middleware {
468 Deduplication(DeduplicationMiddleware),
469 Metrics(MetricsMiddleware),
470 Dlq(Box<DeadLetterQueueMiddleware>),
471 Retry(RetryMiddleware),
472 RandomPanic(RandomPanicMiddleware),
473 Delay(DelayMiddleware),
474 WeakJoin(WeakJoinMiddleware),
475 Custom {
476 name: String,
477 config: serde_json::Value,
478 },
479}
480
481#[derive(Debug, Deserialize, Serialize, Clone)]
483#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
484#[serde(deny_unknown_fields)]
485pub struct DeduplicationMiddleware {
486 pub sled_path: String,
488 pub ttl_seconds: u64,
490}
491
492#[derive(Debug, Deserialize, Serialize, Clone)]
495#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
496#[serde(deny_unknown_fields)]
497pub struct MetricsMiddleware {}
498
499#[derive(Debug, Deserialize, Serialize, Clone, Default)]
502#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
503#[serde(deny_unknown_fields)]
504pub struct DeadLetterQueueMiddleware {
505 pub endpoint: Endpoint,
507}
508
509#[derive(Debug, Deserialize, Serialize, Clone, Default)]
510#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
511#[serde(deny_unknown_fields)]
512pub struct RetryMiddleware {
513 #[serde(default = "default_retry_attempts")]
515 pub max_attempts: usize,
516 #[serde(default = "default_initial_interval_ms")]
518 pub initial_interval_ms: u64,
519 #[serde(default = "default_max_interval_ms")]
521 pub max_interval_ms: u64,
522 #[serde(default = "default_multiplier")]
524 pub multiplier: f64,
525}
526
527#[derive(Debug, Deserialize, Serialize, Clone)]
528#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
529#[serde(deny_unknown_fields)]
530pub struct DelayMiddleware {
531 pub delay_ms: u64,
533}
534
535#[derive(Debug, Deserialize, Serialize, Clone)]
536#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
537#[serde(deny_unknown_fields)]
538pub struct WeakJoinMiddleware {
539 pub group_by: String,
541 pub expected_count: usize,
543 pub timeout_ms: u64,
545}
546
547#[derive(Debug, Deserialize, Serialize, Clone)]
548#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
549#[serde(deny_unknown_fields)]
550pub struct RandomPanicMiddleware {
551 #[serde(deserialize_with = "deserialize_probability")]
553 pub probability: f64,
554}
555
556fn deserialize_probability<'de, D>(deserializer: D) -> Result<f64, D::Error>
557where
558 D: Deserializer<'de>,
559{
560 let value = f64::deserialize(deserializer)?;
561 if !(0.0..=1.0).contains(&value) {
562 return Err(serde::de::Error::custom(
563 "probability must be between 0.0 and 1.0",
564 ));
565 }
566 Ok(value)
567}
568
569fn deserialize_null_as_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
570where
571 D: Deserializer<'de>,
572{
573 let opt = Option::<bool>::deserialize(deserializer)?;
574 Ok(opt.unwrap_or(false))
575}
576
577#[derive(Debug, Deserialize, Serialize, Clone, Default)]
579#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
580#[serde(deny_unknown_fields)]
581pub struct AwsConfig {
582 pub queue_url: Option<String>,
584 pub topic_arn: Option<String>,
586 pub region: Option<String>,
588 pub endpoint_url: Option<String>,
590 pub access_key: Option<String>,
592 pub secret_key: Option<String>,
594 pub session_token: Option<String>,
596 #[cfg_attr(feature = "schema", schemars(range(min = 1, max = 10)))]
598 pub max_messages: Option<i32>,
599 #[cfg_attr(feature = "schema", schemars(range(min = 0, max = 20)))]
601 pub wait_time_seconds: Option<i32>,
602}
603
604#[derive(Debug, Deserialize, Serialize, Clone, Default)]
608#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
609#[serde(deny_unknown_fields)]
610pub struct KafkaConfig {
611 #[serde(alias = "brokers")]
613 pub url: String,
614 pub topic: Option<String>,
616 pub username: Option<String>,
618 pub password: Option<String>,
620 #[serde(default)]
622 pub tls: TlsConfig,
623 pub group_id: Option<String>,
626 #[serde(default)]
628 pub delayed_ack: bool,
629 #[serde(default)]
631 pub producer_options: Option<Vec<(String, String)>>,
632 #[serde(default)]
634 pub consumer_options: Option<Vec<(String, String)>>,
635}
636
637#[derive(Debug, Serialize, Clone)]
640#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
641pub struct FileConfig {
642 pub path: String,
644 #[serde(default)]
647 pub subscribe_mode: bool,
648 pub delete: Option<bool>,
651}
652
653impl<'de> Deserialize<'de> for FileConfig {
654 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
655 where
656 D: Deserializer<'de>,
657 {
658 struct FileConfigVisitor;
659 impl<'de> Visitor<'de> for FileConfigVisitor {
660 type Value = FileConfig;
661 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
662 formatter.write_str("string or map")
663 }
664 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
665 where
666 E: serde::de::Error,
667 {
668 Ok(FileConfig {
669 path: value.to_string(),
670 subscribe_mode: false,
671 delete: None,
672 })
673 }
674 fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
675 where
676 M: MapAccess<'de>,
677 {
678 let mut path = None;
679 let mut consume = true;
680 let mut subscribe_mode = None;
681 let mut delete = None;
682 while let Some(key) = map.next_key::<String>()? {
683 match key.as_str() {
684 "path" => {
685 if path.is_some() {
686 return Err(serde::de::Error::duplicate_field("path"));
687 }
688 path = Some(map.next_value()?);
689 }
690 "consume" => {
691 consume = map.next_value()?;
692 }
693 "subscribe_mode" => {
694 if subscribe_mode.is_some() {
695 return Err(serde::de::Error::duplicate_field("subscribe_mode"));
696 }
697 subscribe_mode = Some(map.next_value()?);
698 }
699 "delete" => {
700 if delete.is_some() {
701 return Err(serde::de::Error::duplicate_field("delete"));
702 }
703 delete = map.next_value()?;
704 }
705 _ => {
706 let _ = map.next_value::<serde::de::IgnoredAny>()?;
707 }
708 }
709 }
710 let path = path.ok_or_else(|| serde::de::Error::missing_field("path"))?;
711 Ok(FileConfig {
712 path,
713 subscribe_mode: subscribe_mode.unwrap_or(!consume),
714 delete,
715 })
716 }
717 }
718 deserializer.deserialize_any(FileConfigVisitor)
719 }
720}
721
722#[derive(Debug, Deserialize, Serialize, Clone, Default)]
726#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
727#[serde(deny_unknown_fields)]
728pub struct NatsConfig {
729 pub url: String,
731 pub subject: Option<String>,
733 pub stream: Option<String>,
735 pub username: Option<String>,
737 pub password: Option<String>,
739 #[serde(default)]
741 pub tls: TlsConfig,
742 pub token: Option<String>,
744 #[serde(default)]
748 pub request_reply: bool,
749 pub request_timeout_ms: Option<u64>,
751 #[serde(default)]
753 pub delayed_ack: bool,
754 #[serde(default)]
756 pub no_jetstream: bool,
757 #[serde(default)]
759 pub subscriber_mode: bool,
760 pub stream_max_messages: Option<i64>,
762 pub stream_max_bytes: Option<i64>,
764 pub prefetch_count: Option<usize>,
766}
767
768#[derive(Debug, Serialize, Deserialize, Clone, Default)]
769#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
770#[serde(deny_unknown_fields)]
771pub struct MemoryConfig {
772 pub topic: String,
774 pub capacity: Option<usize>,
776 #[serde(default)]
778 pub request_reply: bool,
779 pub request_timeout_ms: Option<u64>,
781 #[serde(default)]
783 pub subscribe_mode: bool,
784 #[serde(default)]
786 pub enable_nack: bool,
787}
788
789impl MemoryConfig {
790 pub fn new(topic: impl Into<String>, capacity: Option<usize>) -> Self {
791 Self {
792 topic: topic.into(),
793 capacity,
794 ..Default::default()
795 }
796 }
797 pub fn with_subscribe(self, subscribe_mode: bool) -> Self {
798 Self {
799 subscribe_mode,
800 ..self
801 }
802 }
803}
804
805#[derive(Debug, Deserialize, Serialize, Clone, Default)]
809#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
810#[serde(deny_unknown_fields)]
811pub struct AmqpConfig {
812 pub url: String,
816 pub queue: Option<String>,
818 #[serde(default)]
820 pub subscribe_mode: bool,
821 pub username: Option<String>,
823 pub password: Option<String>,
825 #[serde(default)]
827 pub tls: TlsConfig,
828 pub exchange: Option<String>,
830 pub prefetch_count: Option<u16>,
832 #[serde(default)]
834 pub no_persistence: bool,
835 #[serde(default)]
837 pub delayed_ack: bool,
838}
839
840#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq, Eq)]
841#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
842#[serde(rename_all = "lowercase")]
843pub enum MongoDbFormat {
844 #[default]
845 Normal,
846 Json,
847 Raw,
848}
849
850#[derive(Debug, Deserialize, Serialize, Clone, Default)]
854#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
855#[serde(deny_unknown_fields)]
856pub struct MongoDbConfig {
857 pub url: String,
860 pub collection: Option<String>,
862 pub username: Option<String>,
865 pub password: Option<String>,
868 #[serde(default)]
870 pub tls: TlsConfig,
871 pub database: String,
873 pub polling_interval_ms: Option<u64>,
875 pub reply_polling_ms: Option<u64>,
877 #[serde(default)]
879 pub request_reply: bool,
880 #[serde(default)]
882 pub change_stream: bool,
883 pub request_timeout_ms: Option<u64>,
885 pub ttl_seconds: Option<u64>,
887 pub capped_size_bytes: Option<i64>,
889 #[serde(default)]
891 pub format: MongoDbFormat,
892 pub cursor_id: Option<String>,
894}
895
896#[derive(Debug, Deserialize, Serialize, Clone, Default)]
900#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
901#[serde(deny_unknown_fields)]
902pub struct MqttConfig {
903 pub url: String,
905 pub topic: Option<String>,
907 pub username: Option<String>,
909 pub password: Option<String>,
911 #[serde(default)]
913 pub tls: TlsConfig,
914 pub client_id: Option<String>,
916 pub queue_capacity: Option<usize>,
918 pub max_inflight: Option<u16>,
920 pub qos: Option<u8>,
922 #[serde(default = "default_clean_session")]
924 pub clean_session: bool,
925 pub keep_alive_seconds: Option<u64>,
927 #[serde(default)]
929 pub protocol: MqttProtocol,
930 pub session_expiry_interval: Option<u32>,
932 #[serde(default)]
935 pub delayed_ack: bool,
936}
937
938#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
939#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
940#[serde(rename_all = "lowercase")]
941pub enum MqttProtocol {
942 #[default]
943 V5,
944 V3,
945}
946
947#[derive(Debug, Deserialize, Serialize, Clone, Default)]
950#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
951#[serde(deny_unknown_fields)]
952pub struct ZeroMqConfig {
953 pub url: String,
955 #[serde(default)]
957 pub socket_type: Option<ZeroMqSocketType>,
958 pub topic: Option<String>,
960 #[serde(default)]
962 pub bind: bool,
963 #[serde(default)]
965 pub internal_buffer_size: Option<usize>,
966}
967
968#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
969#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
970#[serde(rename_all = "lowercase")]
971pub enum ZeroMqSocketType {
972 Push,
973 Pull,
974 Pub,
975 Sub,
976 Req,
977 Rep,
978}
979
980#[derive(Debug, Deserialize, Serialize, Clone, Default)]
984#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
985#[serde(deny_unknown_fields)]
986pub struct HttpConfig {
987 pub url: String,
989 #[serde(default)]
991 pub tls: TlsConfig,
992 pub workers: Option<usize>,
994 pub message_id_header: Option<String>,
996 pub request_timeout_ms: Option<u64>,
998 pub internal_buffer_size: Option<usize>,
1000 #[serde(default)]
1002 pub fire_and_forget: bool,
1003}
1004
1005#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1009#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1010#[serde(deny_unknown_fields)]
1011pub struct IbmMqConfig {
1012 pub url: String,
1014 pub queue: Option<String>,
1016 pub topic: Option<String>,
1018 pub queue_manager: String,
1020 pub channel: String,
1022 pub username: Option<String>,
1024 pub password: Option<String>,
1026 pub cipher_spec: Option<String>,
1028 #[serde(default)]
1030 pub tls: TlsConfig,
1031 #[serde(default = "default_max_message_size")]
1033 pub max_message_size: usize,
1034 #[serde(default = "default_wait_timeout_ms")]
1036 pub wait_timeout_ms: i32,
1037 #[serde(default)]
1039 pub internal_buffer_size: Option<usize>,
1040}
1041
1042fn default_max_message_size() -> usize {
1043 4 * 1024 * 1024 }
1045
1046fn default_wait_timeout_ms() -> i32 {
1047 1000 }
1049
1050#[derive(Debug, Deserialize, Serialize, Clone)]
1053#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1054#[serde(deny_unknown_fields)]
1055pub struct SwitchConfig {
1056 pub metadata_key: String,
1058 pub cases: HashMap<String, Endpoint>,
1060 pub default: Option<Box<Endpoint>>,
1062}
1063
1064#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1066#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1067#[serde(deny_unknown_fields)]
1068pub struct ResponseConfig {
1069 }
1071
1072#[derive(Debug, Deserialize, Serialize, Clone, Default)]
1076#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
1077#[serde(deny_unknown_fields)]
1078pub struct TlsConfig {
1079 #[serde(default, deserialize_with = "deserialize_null_as_false")]
1081 pub required: bool,
1082 pub ca_file: Option<String>,
1084 pub cert_file: Option<String>,
1086 pub key_file: Option<String>,
1088 pub cert_password: Option<String>,
1090 #[serde(default)]
1092 pub accept_invalid_certs: bool,
1093}
1094
1095impl TlsConfig {
1096 pub fn is_mtls_client_configured(&self) -> bool {
1097 self.required && self.cert_file.is_some() && self.key_file.is_some()
1098 }
1099 pub fn is_tls_server_configured(&self) -> bool {
1100 self.required && self.cert_file.is_some() && self.key_file.is_some()
1101 }
1102}
1103
1104#[cfg(test)]
1105mod tests {
1106 use super::*;
1107 use config::{Config as ConfigBuilder, Environment};
1108
1109 const TEST_YAML: &str = r#"
1110kafka_to_nats:
1111 concurrency: 10
1112 input:
1113 middlewares:
1114 - deduplication:
1115 sled_path: "/tmp/mq-bridge/dedup_db"
1116 ttl_seconds: 3600
1117 - metrics: {}
1118 - retry:
1119 max_attempts: 5
1120 initial_interval_ms: 200
1121 - random_panic:
1122 probability: 0.1
1123 - dlq:
1124 endpoint:
1125 nats:
1126 subject: "dlq-subject"
1127 url: "nats://localhost:4222"
1128 kafka:
1129 topic: "input-topic"
1130 url: "localhost:9092"
1131 group_id: "my-consumer-group"
1132 tls:
1133 required: true
1134 ca_file: "/path_to_ca"
1135 cert_file: "/path_to_cert"
1136 key_file: "/path_to_key"
1137 cert_password: "password"
1138 accept_invalid_certs: true
1139 output:
1140 middlewares:
1141 - metrics: {}
1142 nats:
1143 subject: "output-subject"
1144 url: "nats://localhost:4222"
1145"#;
1146
1147 fn assert_config_values(config: &Config) {
1148 assert_eq!(config.len(), 1);
1149 let route = config.get("kafka_to_nats").expect("Route should exist");
1150
1151 assert_eq!(route.options.concurrency, 10);
1152
1153 let input = &route.input;
1155 assert_eq!(input.middlewares.len(), 5);
1156
1157 let mut has_dedup = false;
1158 let mut has_metrics = false;
1159 let mut has_dlq = false;
1160 let mut has_retry = false;
1161 let mut has_random_panic = false;
1162 for middleware in &input.middlewares {
1163 match middleware {
1164 Middleware::Deduplication(dedup) => {
1165 assert_eq!(dedup.sled_path, "/tmp/mq-bridge/dedup_db");
1166 assert_eq!(dedup.ttl_seconds, 3600);
1167 has_dedup = true;
1168 }
1169 Middleware::Metrics(_) => {
1170 has_metrics = true;
1171 }
1172 Middleware::Custom { .. } => {}
1173 Middleware::Dlq(dlq) => {
1174 assert!(dlq.endpoint.middlewares.is_empty());
1175 if let EndpointType::Nats(nats_cfg) = &dlq.endpoint.endpoint_type {
1176 assert_eq!(nats_cfg.subject, Some("dlq-subject".to_string()));
1177 assert_eq!(nats_cfg.url, "nats://localhost:4222");
1178 }
1179 has_dlq = true;
1180 }
1181 Middleware::Retry(retry) => {
1182 assert_eq!(retry.max_attempts, 5);
1183 assert_eq!(retry.initial_interval_ms, 200);
1184 has_retry = true;
1185 }
1186 Middleware::RandomPanic(rp) => {
1187 assert!((rp.probability - 0.1).abs() < f64::EPSILON);
1188 has_random_panic = true;
1189 }
1190 Middleware::Delay(_) => {}
1191 Middleware::WeakJoin(_) => {}
1192 }
1193 }
1194
1195 if let EndpointType::Kafka(kafka) = &input.endpoint_type {
1196 assert_eq!(kafka.topic, Some("input-topic".to_string()));
1197 assert_eq!(kafka.url, "localhost:9092");
1198 assert_eq!(kafka.group_id, Some("my-consumer-group".to_string()));
1199 let tls = &kafka.tls;
1200 assert!(tls.required);
1201 assert_eq!(tls.ca_file.as_deref(), Some("/path_to_ca"));
1202 assert!(tls.accept_invalid_certs);
1203 } else {
1204 panic!("Input endpoint should be Kafka");
1205 }
1206 assert!(has_dedup);
1207 assert!(has_metrics);
1208 assert!(has_dlq);
1209 assert!(has_retry);
1210 assert!(has_random_panic);
1211
1212 let output = &route.output;
1214 assert_eq!(output.middlewares.len(), 1);
1215 assert!(matches!(output.middlewares[0], Middleware::Metrics(_)));
1216
1217 if let EndpointType::Nats(nats) = &output.endpoint_type {
1218 assert_eq!(nats.subject, Some("output-subject".to_string()));
1219 assert_eq!(nats.url, "nats://localhost:4222");
1220 } else {
1221 panic!("Output endpoint should be NATS");
1222 }
1223 }
1224
1225 #[test]
1226 fn test_deserialize_from_yaml() {
1227 let result: Result<Config, _> = serde_yaml_ng::from_str(TEST_YAML);
1230 println!("Deserialized from YAML: {:#?}", result);
1231 let config = result.expect("Failed to deserialize TEST_YAML");
1232 assert_config_values(&config);
1233 }
1234
1235 #[test]
1236 fn test_deserialize_from_env() {
1237 unsafe {
1239 std::env::set_var("MQB__KAFKA_TO_NATS__CONCURRENCY", "10");
1240 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC", "input-topic");
1241 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__URL", "localhost:9092");
1242 std::env::set_var(
1243 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID",
1244 "my-consumer-group",
1245 );
1246 std::env::set_var("MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__REQUIRED", "true");
1247 std::env::set_var(
1248 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__CA_FILE",
1249 "/path_to_ca",
1250 );
1251 std::env::set_var(
1252 "MQB__KAFKA_TO_NATS__INPUT__KAFKA__TLS__ACCEPT_INVALID_CERTS",
1253 "true",
1254 );
1255 std::env::set_var(
1256 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT",
1257 "output-subject",
1258 );
1259 std::env::set_var(
1260 "MQB__KAFKA_TO_NATS__OUTPUT__NATS__URL",
1261 "nats://localhost:4222",
1262 );
1263 std::env::set_var(
1264 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__SUBJECT",
1265 "dlq-subject",
1266 );
1267 std::env::set_var(
1268 "MQB__KAFKA_TO_NATS__INPUT__MIDDLEWARES__0__DLQ__ENDPOINT__NATS__URL",
1269 "nats://localhost:4222",
1270 );
1271 }
1272
1273 let builder = ConfigBuilder::builder()
1274 .add_source(
1276 Environment::with_prefix("MQB")
1277 .separator("__")
1278 .try_parsing(true),
1279 );
1280
1281 let config: Config = builder
1282 .build()
1283 .expect("Failed to build config")
1284 .try_deserialize()
1285 .expect("Failed to deserialize config");
1286
1287 assert_eq!(config.get("kafka_to_nats").unwrap().options.concurrency, 10);
1289 if let EndpointType::Kafka(k) = &config.get("kafka_to_nats").unwrap().input.endpoint_type {
1290 assert_eq!(k.topic, Some("input-topic".to_string()));
1291 assert!(k.tls.required);
1292 } else {
1293 panic!("Expected Kafka endpoint");
1294 }
1295
1296 let input = &config.get("kafka_to_nats").unwrap().input;
1297 assert_eq!(input.middlewares.len(), 1);
1298 if let Middleware::Dlq(_) = &input.middlewares[0] {
1299 } else {
1301 panic!("Expected DLQ middleware");
1302 }
1303 }
1304}
1305
1306#[cfg(all(test, feature = "schema"))]
1307mod schema_tests {
1308 use super::*;
1309
1310 #[test]
1311 fn generate_json_schema() {
1312 let schema = schemars::schema_for!(Config);
1313 let schema_json = serde_json::to_string_pretty(&schema).unwrap();
1314
1315 let mut path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1316 path.push("mq-bridge.schema.json");
1317 std::fs::write(path, schema_json).expect("Failed to write schema file");
1318 }
1319}