1#[cfg(feature = "amqp")]
7pub mod amqp;
8#[cfg(feature = "aws")]
9pub mod aws;
10pub mod fanout;
11pub mod file;
12#[cfg(feature = "grpc")]
13pub mod grpc;
14#[cfg(feature = "http")]
15pub mod http;
16#[cfg(feature = "ibm-mq")]
17pub mod ibm_mq;
18#[cfg(feature = "kafka")]
19pub mod kafka;
20pub mod memory;
21#[cfg(feature = "mongodb")]
22pub mod mongodb;
23#[cfg(feature = "mqtt")]
24pub mod mqtt;
25#[cfg(feature = "nats")]
26pub mod nats;
27pub mod null;
28pub mod reader;
29pub mod response;
30#[cfg(feature = "sled")]
31pub mod sled;
32#[cfg(feature = "sqlx")]
33pub mod sqlx;
34pub mod static_endpoint;
35pub mod switch;
36#[cfg(feature = "zeromq")]
37pub mod zeromq;
38use crate::endpoints::memory::{get_or_create_channel, MemoryChannel};
39use crate::middleware::apply_middlewares_to_consumer;
40use crate::models::{Endpoint, EndpointType, MemoryConfig, Middleware, ResponseConfig};
41use crate::route::get_endpoint_factory;
42use crate::traits::{BoxFuture, MessageConsumer, MessagePublisher};
43use anyhow::{anyhow, Result};
44use std::sync::Arc;
45
46impl Endpoint {
47 pub fn new(endpoint_type: EndpointType) -> Self {
48 Self {
49 middlewares: Vec::new(),
50 endpoint_type,
51 handler: None,
52 }
53 }
54 pub fn new_memory(topic: &str, capacity: usize) -> Self {
63 Self::new(EndpointType::Memory(MemoryConfig::new(
64 topic,
65 Some(capacity),
66 )))
67 }
68 pub fn new_response() -> Self {
69 Self::new(EndpointType::Response(ResponseConfig::default()))
70 }
71 pub fn has_retry_middleware(&self) -> bool {
72 self.middlewares
73 .iter()
74 .any(|m| matches!(m, Middleware::Retry(_)))
75 }
76 pub fn add_middleware(mut self, middleware: Middleware) -> Self {
77 self.middlewares.push(middleware);
78 self
79 }
80 pub fn add_middlewares(mut self, mut middlewares: Vec<Middleware>) -> Self {
81 self.middlewares.append(&mut middlewares);
82 self
83 }
84 pub fn channel(&self) -> anyhow::Result<MemoryChannel> {
90 match &self.endpoint_type {
91 EndpointType::Memory(cfg) => Ok(get_or_create_channel(cfg)),
92 _ => Err(anyhow::anyhow!("channel() called on non-memory Endpoint")),
93 }
94 }
95 pub fn null() -> Self {
96 Self::new(EndpointType::Null)
97 }
98
99 pub fn with_retry(mut self, retry: crate::models::RetryMiddleware) -> Self {
100 let mut insert_idx = self.middlewares.len();
103 for (i, m) in self.middlewares.iter().enumerate() {
104 if matches!(m, Middleware::Dlq(_) | Middleware::Metrics(_)) {
105 insert_idx = i;
106 break;
107 }
108 }
109 self.middlewares
110 .insert(insert_idx, Middleware::Retry(retry));
111 self
112 }
113
114 pub fn with_dlq(mut self, dlq: crate::models::DeadLetterQueueMiddleware) -> Self {
115 let mut insert_idx = self.middlewares.len();
117 for (i, m) in self.middlewares.iter().enumerate() {
118 if matches!(m, Middleware::Metrics(_)) {
119 insert_idx = i;
120 break;
121 }
122 }
123 self.middlewares
124 .insert(insert_idx, Middleware::Dlq(Box::new(dlq)));
125 self
126 }
127
128 pub fn with_deduplication(mut self, dedup: crate::models::DeduplicationMiddleware) -> Self {
129 self.middlewares.insert(0, Middleware::Deduplication(dedup));
134 self
135 }
136
137 pub fn with_consumer_metrics(mut self) -> Self {
138 if !self
142 .middlewares
143 .iter()
144 .any(|m| matches!(m, Middleware::Metrics(_)))
145 {
146 self.middlewares
147 .insert(0, Middleware::Metrics(crate::models::MetricsMiddleware {}));
148 }
149 self
150 }
151
152 pub fn with_metrics(mut self) -> Self {
153 if !self
155 .middlewares
156 .iter()
157 .any(|m| matches!(m, Middleware::Metrics(_)))
158 {
159 self.middlewares
160 .push(Middleware::Metrics(crate::models::MetricsMiddleware {}));
161 }
162 self
163 }
164
165 pub async fn create_consumer(
166 &self,
167 route_name: &str,
168 ) -> anyhow::Result<Box<dyn crate::traits::MessageConsumer>> {
169 crate::endpoints::create_consumer_from_route(route_name, self).await
170 }
171
172 pub async fn create_publisher(&self, _route_name: &str) -> anyhow::Result<crate::Publisher> {
173 crate::Publisher::new(self.clone()).await
174 }
175
176 pub fn check_consumer(
177 &self,
178 route_name: &str,
179 allowed_endpoints: Option<&[&str]>,
180 ) -> anyhow::Result<Vec<String>> {
181 crate::endpoints::check_consumer(route_name, self, allowed_endpoints)
182 }
183
184 pub fn check_publisher(
185 &self,
186 route_name: &str,
187 allowed_endpoints: Option<&[&str]>,
188 ) -> anyhow::Result<Vec<String>> {
189 crate::endpoints::check_publisher(route_name, self, allowed_endpoints)
190 }
191}
192
193pub fn check_consumer(
195 route_name: &str,
196 endpoint: &Endpoint,
197 allowed_types: Option<&[&str]>,
198) -> Result<Vec<String>> {
199 check_consumer_recursive(route_name, endpoint, 0, allowed_types)
200}
201
202fn check_consumer_recursive(
203 route_name: &str,
204 endpoint: &Endpoint,
205 depth: usize,
206 allowed_types: Option<&[&str]>,
207) -> Result<Vec<String>> {
208 const MAX_DEPTH: usize = 16;
209 if depth > MAX_DEPTH {
210 return Err(anyhow!(
211 "Ref recursion depth exceeded limit of {}",
212 MAX_DEPTH
213 ));
214 }
215 let mut warnings = Vec::new();
216 if endpoint.handler.is_some() {
217 warnings.push(
218 "Endpoint 'handler' is set on an input endpoint. Handlers are currently only supported on output endpoints (publishers) and will be ignored here."
219 .to_string()
220 );
221 }
222
223 if let Some(allowed) = allowed_types {
224 if !endpoint.endpoint_type.is_core() {
225 let name = endpoint.endpoint_type.name();
226 if !allowed.contains(&name) {
227 return Err(anyhow!(
228 "[route:{}] Endpoint type '{}' is not allowed by policy",
229 route_name,
230 name
231 ));
232 }
233 }
234 }
235 match &endpoint.endpoint_type {
236 EndpointType::Ref(name) => {
237 let referenced = crate::route::get_endpoint(name).ok_or_else(|| {
238 anyhow!(
239 "[route:{}] Referenced endpoint '{}' not found",
240 route_name,
241 name
242 )
243 })?;
244 warnings.extend(check_consumer_recursive(
249 route_name,
250 &referenced,
251 depth + 1,
252 allowed_types,
253 )?);
254 Ok(warnings)
255 }
256 #[cfg(feature = "aws")]
257 EndpointType::Aws(cfg) => {
258 if cfg.topic_arn.is_some() {
259 warnings.push(
260 "Endpoint 'aws' is used as a consumer, but 'topic_arn' is a publisher-only option and will be ignored."
261 .to_string()
262 );
263 }
264 Ok(warnings)
265 }
266 #[cfg(feature = "kafka")]
267 EndpointType::Kafka(cfg) => {
268 if cfg.delayed_ack {
269 warnings.push(
270 "Endpoint 'kafka' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
271 .to_string()
272 );
273 }
274 if cfg.producer_options.is_some() {
275 warnings.push(
276 "Endpoint 'kafka' is used as a consumer, but 'producer_options' is a publisher-only option and will be ignored."
277 .to_string()
278 );
279 }
280 Ok(warnings)
281 }
282 #[cfg(feature = "nats")]
283 EndpointType::Nats(cfg) => {
284 if cfg.stream.is_none() {
285 return Err(anyhow!(
286 "[route:{}] NATS consumer must specify a 'stream'",
287 route_name
288 ));
289 }
290 if cfg.request_reply {
291 warnings.push(
292 "Endpoint 'nats' is used as a consumer, but 'request_reply' is a publisher-only option and will be ignored."
293 .to_string()
294 );
295 }
296 if cfg.request_timeout_ms.is_some() {
297 warnings.push(
298 "Endpoint 'nats' is used as a consumer, but 'request_timeout_ms' is a publisher-only option and will be ignored."
299 .to_string()
300 );
301 }
302 if cfg.delayed_ack {
303 warnings.push(
304 "Endpoint 'nats' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
305 .to_string()
306 );
307 }
308 if cfg.stream_max_messages.is_some() {
309 warnings.push(
310 "Endpoint 'nats' is used as a consumer, but 'stream_max_messages' is a publisher-only option and will be ignored."
311 .to_string()
312 );
313 }
314 if cfg.stream_max_bytes.is_some() {
315 warnings.push(
316 "Endpoint 'nats' is used as a consumer, but 'stream_max_bytes' is a publisher-only option and will be ignored."
317 .to_string()
318 );
319 }
320 Ok(warnings)
321 }
322 #[cfg(feature = "amqp")]
323 EndpointType::Amqp(cfg) => {
324 if cfg.delayed_ack {
325 warnings.push(
326 "Endpoint 'amqp' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
327 .to_string()
328 );
329 }
330 Ok(warnings)
331 }
332 #[cfg(feature = "mqtt")]
333 EndpointType::Mqtt(cfg) => {
334 if cfg.delayed_ack {
335 warnings.push(
336 "Endpoint 'mqtt' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
337 .to_string()
338 );
339 }
340 Ok(warnings)
341 }
342 #[cfg(feature = "zeromq")]
343 EndpointType::ZeroMq(_) => Ok(warnings),
344 #[cfg(feature = "ibm-mq")]
345 EndpointType::IbmMq(_) => Ok(warnings),
346 #[cfg(feature = "mongodb")]
347 EndpointType::MongoDb(cfg) => {
348 if cfg.change_stream && matches!(cfg.format, crate::models::MongoDbFormat::Raw) {
349 return Err(anyhow!(
350 "[route:{}] MongoDB raw format cannot be used with change_stream/subscriber mode because raw documents do not include the seq ordering field",
351 route_name
352 ));
353 }
354 if cfg.reply_polling_ms.is_some() {
355 warnings.push(
356 "Endpoint 'mongodb' is used as a consumer, but 'reply_polling_ms' is a publisher-only option and will be ignored."
357 .to_string()
358 );
359 }
360 if cfg.request_reply {
361 warnings.push(
362 "Endpoint 'mongodb' is used as a consumer, but 'request_reply' is a publisher-only option and will be ignored."
363 .to_string()
364 );
365 }
366 if cfg.request_timeout_ms.is_some() {
367 warnings.push(
368 "Endpoint 'mongodb' is used as a consumer, but 'request_timeout_ms' is a publisher-only option and will be ignored."
369 .to_string()
370 );
371 }
372 if cfg.ttl_seconds.is_some() {
373 warnings.push(
374 "Endpoint 'mongodb' is used as a consumer, but 'ttl_seconds' is a publisher-only option and will be ignored."
375 .to_string()
376 );
377 }
378 if cfg.capped_size_bytes.is_some() {
379 warnings.push(
380 "Endpoint 'mongodb' is used as a consumer, but 'capped_size_bytes' is a publisher-only option and will be ignored."
381 .to_string()
382 );
383 }
384 Ok(warnings)
385 }
386 #[cfg(feature = "grpc")]
387 EndpointType::Grpc(_) => Ok(warnings),
388 #[cfg(feature = "http")]
389 EndpointType::Http(cfg) => {
390 if cfg.batch_concurrency.is_some() {
391 warnings.push("Endpoint 'http' is used as a consumer, but 'batch_concurrency' is a publisher-only option and will be ignored.".to_string());
392 }
393 if cfg.tcp_keepalive_ms.is_some() {
394 warnings.push("Endpoint 'http' is used as a consumer, but 'tcp_keepalive_ms' is a publisher-only option and will be ignored.".to_string());
395 }
396 if cfg.pool_idle_timeout_ms.is_some() {
397 warnings.push(
398 "Endpoint 'http' is used as a consumer, but 'pool_idle_timeout_ms' is a publisher-only option and will be ignored."
399 .to_string(),
400 );
401 }
402 Ok(warnings)
403 }
404 #[cfg(feature = "sqlx")]
405 EndpointType::Sqlx(cfg) => {
406 if cfg.insert_query.is_some() {
407 warnings.push(
408 "Endpoint 'sqlx' is used as a consumer, but 'insert_query' is a publisher-only option and will be ignored."
409 .to_string()
410 );
411 }
412 Ok(warnings)
413 }
414 #[cfg(feature = "sled")]
415 EndpointType::Sled(_) => Ok(warnings),
416 EndpointType::Static(_) => Ok(warnings),
417 EndpointType::Memory(cfg) => {
418 if cfg.request_reply {
419 warnings.push(
420 "Endpoint 'memory' is used as a consumer, but 'request_reply' is a publisher-only option and will be ignored."
421 .to_string()
422 );
423 }
424 if cfg.request_timeout_ms.is_some() {
425 warnings.push(
426 "Endpoint 'memory' is used as a consumer, but 'request_timeout_ms' is a publisher-only option and will be ignored."
427 .to_string()
428 );
429 }
430 Ok(warnings)
431 }
432 EndpointType::File(_) => Ok(warnings),
433 EndpointType::Custom { .. } => Ok(warnings),
434 EndpointType::Switch(_) => Err(anyhow!(
435 "[route:{}] Switch endpoint is only supported as an output",
436 route_name
437 )),
438 EndpointType::Reader(_) => Err(anyhow!(
439 "[route:{}] Reader endpoint is only supported as an output",
440 route_name
441 )),
442 #[allow(unreachable_patterns)]
443 _ => {
444 if let Some(allowed) = allowed_types {
445 let name = endpoint.endpoint_type.name();
446 if allowed.contains(&name) {
447 return Ok(warnings);
448 }
449 }
450 Err(anyhow!(
451 "[route:{}] Unsupported consumer endpoint type '{:?}'",
452 route_name,
453 endpoint.endpoint_type
454 ))
455 }
456 }
457}
458
459fn resolve_endpoint(endpoint: &Endpoint, route_name: &str) -> Result<Endpoint> {
460 let mut visited = std::collections::HashSet::new();
461 resolve_endpoint_recursive(endpoint, route_name, &mut visited)
462}
463
464fn resolve_endpoint_recursive(
465 endpoint: &Endpoint,
466 route_name: &str,
467 visited: &mut std::collections::HashSet<String>,
468) -> Result<Endpoint> {
469 const MAX_DEPTH: usize = 16;
470 if visited.len() > MAX_DEPTH {
471 return Err(anyhow!(
472 "Reference recursion depth exceeded limit of {}",
473 MAX_DEPTH
474 ));
475 }
476
477 if let EndpointType::Ref(name) = &endpoint.endpoint_type {
478 if !visited.insert(name.clone()) {
479 return Err(anyhow!(
480 "[route:{}] Circular reference detected for endpoint '{}'",
481 route_name,
482 name
483 ));
484 }
485
486 let referenced_endpoint = crate::route::get_endpoint(name).ok_or_else(|| {
487 anyhow!(
488 "[route:{}] Referenced endpoint '{}' not found",
489 route_name,
490 name
491 )
492 })?;
493
494 let mut resolved = resolve_endpoint_recursive(&referenced_endpoint, route_name, visited)?;
495 let mut new_middlewares = endpoint.middlewares.clone();
498 new_middlewares.extend(resolved.middlewares);
499 resolved.middlewares = new_middlewares;
500 Ok(resolved)
501 } else {
502 Ok(endpoint.clone())
503 }
504}
505
506pub async fn create_consumer_from_route(
508 route_name: &str,
509 endpoint: &Endpoint,
510) -> Result<Box<dyn MessageConsumer>> {
511 let resolved_endpoint = resolve_endpoint(endpoint, route_name)?;
512 check_consumer(route_name, &resolved_endpoint, None)?;
513 let consumer = create_base_consumer(route_name, &resolved_endpoint).await?;
514 apply_middlewares_to_consumer(consumer, &resolved_endpoint, route_name).await
515}
516
517async fn create_base_consumer(
518 route_name: &str,
519 endpoint: &Endpoint,
520) -> Result<Box<dyn MessageConsumer>> {
521 fn boxed<T: MessageConsumer + 'static>(c: T) -> Box<dyn MessageConsumer> {
523 Box::new(c)
524 }
525
526 match &endpoint.endpoint_type {
527 #[cfg(feature = "aws")]
528 EndpointType::Aws(cfg) => Ok(boxed(aws::AwsConsumer::new(cfg).await?)),
529 #[cfg(feature = "kafka")]
530 EndpointType::Kafka(cfg) => {
531 let mut config = cfg.clone();
532 if config.topic.is_none() {
533 config.topic = Some(route_name.to_string());
534 }
535 Ok(boxed(kafka::KafkaConsumer::new(&config).await?))
536 }
537 #[cfg(feature = "nats")]
538 EndpointType::Nats(cfg) => {
539 let mut config = cfg.clone();
540 if config.subject.is_none() {
541 config.subject = Some(route_name.to_string());
542 }
543 Ok(boxed(nats::NatsConsumer::new(&config).await?))
544 }
545 #[cfg(feature = "amqp")]
546 EndpointType::Amqp(cfg) => {
547 let mut config = cfg.clone();
548 if config.queue.is_none() {
549 config.queue = Some(route_name.to_string());
550 }
551 Ok(boxed(amqp::AmqpConsumer::new(&config).await?))
552 }
553 #[cfg(feature = "mqtt")]
554 EndpointType::Mqtt(cfg) => {
555 let mut config = cfg.clone();
556 if config.topic.is_none() {
557 config.topic = Some(route_name.to_string());
558 }
559 if config.client_id.is_none() && !config.clean_session {
560 config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
562 }
563 Ok(boxed(mqtt::MqttConsumer::new(cfg).await?))
564 }
565 #[cfg(feature = "ibm-mq")]
566 EndpointType::IbmMq(cfg) => {
567 let mut config = cfg.clone();
568 if config.queue.is_none() && config.topic.is_none() {
569 config.queue = Some(route_name.to_string());
570 }
571 Ok(boxed(ibm_mq::IbmMqConsumer::new(&config).await?))
572 }
573 #[cfg(feature = "zeromq")]
574 EndpointType::ZeroMq(cfg) => Ok(boxed(zeromq::ZeroMqConsumer::new(cfg).await?)),
575 EndpointType::File(cfg) => Ok(boxed(file::FileConsumer::new(cfg).await?)),
576 #[cfg(feature = "grpc")]
577 EndpointType::Grpc(cfg) => {
578 let mut config = cfg.clone();
579 if config.topic.is_none() {
580 config.topic = Some(route_name.to_string());
581 }
582 Ok(boxed(grpc::GrpcConsumer::new(&config).await?))
583 }
584 #[cfg(feature = "sqlx")]
585 EndpointType::Sqlx(cfg) => Ok(boxed(sqlx::SqlxConsumer::new(cfg).await?)),
586 #[cfg(feature = "http")]
587 EndpointType::Http(cfg) => Ok(boxed(http::HttpConsumer::new(cfg).await?)),
588 EndpointType::Static(cfg) => Ok(boxed(static_endpoint::StaticRequestConsumer::new(cfg)?)),
589 EndpointType::Memory(cfg) => Ok(boxed(memory::MemoryConsumer::new(cfg)?)),
590 #[cfg(feature = "sled")]
591 EndpointType::Sled(cfg) => Ok(boxed(sled::SledConsumer::new(cfg)?)),
592 #[cfg(feature = "mongodb")]
593 EndpointType::MongoDb(cfg) => {
594 let mut config = cfg.clone();
595 if config.collection.is_none() {
596 config.collection = Some(route_name.to_string());
597 }
598 if config.change_stream {
599 if config.ttl_seconds.is_none() {
600 config.ttl_seconds = Some(86400); }
602 Ok(boxed(mongodb::MongoDbSubscriber::new(&config).await?))
603 } else {
604 Ok(boxed(mongodb::MongoDbConsumer::new(&config).await?))
605 }
606 }
607 EndpointType::Custom { name, config } => {
608 let factory = get_endpoint_factory(name)
609 .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
610 factory.create_consumer(route_name, config).await
611 }
612 EndpointType::Switch(_) => Err(anyhow!(
613 "[route:{}] Switch endpoint is only supported as an output",
614 route_name
615 )),
616 #[allow(unreachable_patterns)]
617 _ => Err(anyhow!(
618 "[route:{}] Unsupported consumer endpoint type '{:?}'",
619 route_name,
620 endpoint.endpoint_type
621 )),
622 }
623}
624
625pub fn check_publisher(
627 route_name: &str,
628 endpoint: &Endpoint,
629 allowed_types: Option<&[&str]>,
630) -> Result<Vec<String>> {
631 check_publisher_recursive(route_name, endpoint, 0, allowed_types)
632}
633
634fn check_publisher_recursive(
635 route_name: &str,
636 endpoint: &Endpoint,
637 depth: usize,
638 allowed_types: Option<&[&str]>,
639) -> Result<Vec<String>> {
640 let mut warnings = Vec::new();
641 if let Some(allowed) = allowed_types {
642 if !endpoint.endpoint_type.is_core() {
643 let name = endpoint.endpoint_type.name();
644 if !allowed.contains(&name) {
645 return Err(anyhow!(
646 "[route:{}] Endpoint type '{}' is not allowed by policy",
647 route_name,
648 name
649 ));
650 }
651 }
652 }
653 const MAX_DEPTH: usize = 16;
654 if depth > MAX_DEPTH {
655 return Err(anyhow!(
656 "Fanout recursion depth exceeded limit of {}",
657 MAX_DEPTH
658 ));
659 }
660 match &endpoint.endpoint_type {
661 EndpointType::Ref(name) => {
662 let referenced = crate::route::get_endpoint(name).ok_or_else(|| {
663 anyhow!(
664 "[route:{}] Referenced endpoint '{}' not found in endpoint registry",
665 route_name,
666 name
667 )
668 });
669 if let Ok(referenced) = referenced {
670 warnings.extend(check_publisher_recursive(
671 route_name,
672 &referenced,
673 depth + 1,
674 allowed_types,
675 )?);
676 return Ok(warnings);
677 }
678 if crate::publisher::get_publisher(name).is_some() {
679 return Ok(warnings);
680 }
681 Err(anyhow!(
682 "[route:{}] Referenced endpoint '{}' not found in any registry",
683 route_name,
684 name
685 ))
686 }
687 #[cfg(feature = "aws")]
688 EndpointType::Aws(cfg) => {
689 if cfg.max_messages.is_some() {
690 warnings.push(
691 "Endpoint 'aws' is used as a publisher, but 'max_messages' is a consumer-only option and will be ignored."
692 .to_string()
693 );
694 }
695 if cfg.wait_time_seconds.is_some() {
696 warnings.push(
697 "Endpoint 'aws' is used as a publisher, but 'wait_time_seconds' is a consumer-only option and will be ignored."
698 .to_string()
699 );
700 }
701 Ok(warnings)
702 }
703 #[cfg(feature = "kafka")]
704 EndpointType::Kafka(cfg) => {
705 if cfg.group_id.is_some() {
706 warnings.push(
707 "Endpoint 'kafka' is used as a publisher, but 'group_id' is a consumer-only option and will be ignored."
708 .to_string()
709 );
710 }
711 if cfg.consumer_options.is_some() {
712 warnings.push(
713 "Endpoint 'kafka' is used as a publisher, but 'consumer_options' is a consumer-only option and will be ignored."
714 .to_string()
715 );
716 }
717 Ok(warnings)
718 }
719 #[cfg(feature = "nats")]
720 EndpointType::Nats(cfg) => {
721 if cfg.stream.is_some() {
722 warnings.push(
723 "Endpoint 'nats' is used as a publisher, but 'stream' is a consumer-only option and will be ignored."
724 .to_string()
725 );
726 }
727 if cfg.subscriber_mode {
728 warnings.push(
729 "Endpoint 'nats' is used as a publisher, but 'subscriber_mode' is a consumer-only option and will be ignored."
730 .to_string()
731 );
732 }
733 if cfg.prefetch_count.is_some() {
734 warnings.push(
735 "Endpoint 'nats' is used as a publisher, but 'prefetch_count' is a consumer-only option and will be ignored."
736 .to_string()
737 );
738 }
739 Ok(warnings)
740 }
741 #[cfg(feature = "amqp")]
742 EndpointType::Amqp(cfg) => {
743 if cfg.subscribe_mode {
744 warnings.push(
745 "Endpoint 'amqp' is used as a publisher, but 'subscribe_mode' is a consumer-only option and will be ignored."
746 .to_string()
747 );
748 }
749 if cfg.prefetch_count.is_some() {
750 warnings.push(
751 "Endpoint 'amqp' is used as a publisher, but 'prefetch_count' is a consumer-only option and will be ignored."
752 .to_string()
753 );
754 }
755 Ok(warnings)
756 }
757 #[cfg(feature = "mqtt")]
758 EndpointType::Mqtt(cfg) => {
759 if cfg.clean_session {
760 warnings.push(
761 "Endpoint 'mqtt' is used as a publisher, but 'clean_session' is a consumer-only option and will be ignored."
762 .to_string()
763 );
764 }
765 Ok(warnings)
766 }
767 #[cfg(feature = "zeromq")]
768 EndpointType::ZeroMq(cfg) => {
769 if cfg.topic.is_some() {
770 warnings.push(
771 "Endpoint 'zeromq' is used as a publisher, but 'topic' is a consumer-only option and will be ignored."
772 .to_string()
773 );
774 }
775 Ok(warnings)
776 }
777
778 #[cfg(feature = "http")]
779 EndpointType::Http(_cfg) => {
780 if _cfg.path.is_some() {
781 warnings.push(
782 "Endpoint 'http' is used as a publisher, but 'path' is a consumer-only option and will be ignored."
783 .to_string()
784 );
785 }
786 if _cfg.workers.is_some() {
787 warnings.push(
788 "Endpoint 'http' is used as a publisher, but 'workers' is a consumer-only option and will be ignored."
789 .to_string()
790 );
791 }
792 if _cfg.message_id_header.is_some() {
793 warnings.push(
794 "Endpoint 'http' is used as a publisher, but 'message_id_header' is a consumer-only option and will be ignored."
795 .to_string()
796 );
797 }
798 if _cfg.internal_buffer_size.is_some() {
799 warnings.push(
800 "Endpoint 'http' is used as a publisher, but 'internal_buffer_size' is a consumer-only option and will be ignored."
801 .to_string()
802 );
803 }
804 if _cfg.fire_and_forget {
805 warnings.push(
806 "Endpoint 'http' is used as a publisher, but 'fire_and_forget' is a consumer-only option and will be ignored."
807 .to_string()
808 );
809 }
810 Ok(warnings)
811 }
812 #[cfg(feature = "grpc")]
813 EndpointType::Grpc(_) => Ok(warnings),
814 #[cfg(feature = "sqlx")]
815 EndpointType::Sqlx(cfg) => {
816 if cfg.select_query.is_some() {
817 warnings.push(
818 "Endpoint 'sqlx' is used as a publisher, but 'select_query' is a consumer-only option and will be ignored."
819 .to_string()
820 );
821 }
822 if cfg.delete_after_read {
823 warnings.push(
824 "Endpoint 'sqlx' is used as a publisher, but 'delete_after_read' is a consumer-only option and will be ignored."
825 .to_string()
826 );
827 }
828 if cfg.polling_interval_ms.is_some() {
829 warnings.push(
830 "Endpoint 'sqlx' is used as a publisher, but 'polling_interval_ms' is a consumer-only option and will be ignored."
831 .to_string()
832 );
833 }
834 Ok(warnings)
835 }
836 #[cfg(feature = "ibm-mq")]
837 EndpointType::IbmMq(cfg) => {
838 if cfg.wait_timeout_ms != 1000 {
839 warnings.push(
840 "Endpoint 'ibmmq' is used as a publisher, but 'wait_timeout_ms' is a consumer-only option and will be ignored."
841 .to_string()
842 );
843 }
844 Ok(warnings)
845 }
846 #[cfg(feature = "mongodb")]
847 EndpointType::MongoDb(cfg) => {
848 if cfg.polling_interval_ms.is_some() {
849 warnings.push(
850 "Endpoint 'mongodb' is used as a publisher, but 'polling_interval_ms' is a consumer-only option and will be ignored."
851 .to_string()
852 );
853 }
854 if cfg.change_stream {
855 warnings.push(
856 "Endpoint 'mongodb' is used as a publisher, but 'change_stream' is a consumer-only option and will be ignored."
857 .to_string()
858 );
859 }
860 if cfg.cursor_id.is_some() {
861 warnings.push(
862 "Endpoint 'mongodb' is used as a publisher, but 'cursor_id' is a consumer-only option and will be ignored."
863 .to_string()
864 );
865 }
866 Ok(warnings)
867 }
868 EndpointType::File(_) => Ok(warnings),
869 EndpointType::Static(_) => Ok(warnings),
870 EndpointType::Memory(cfg) => {
871 if cfg.subscribe_mode {
872 warnings.push(
873 "Endpoint 'memory' is used as a publisher, but 'subscribe_mode' is a consumer-only option and will be ignored."
874 .to_string()
875 );
876 }
877 if cfg.enable_nack {
878 warnings.push(
879 "Endpoint 'memory' is used as a publisher, but 'enable_nack' is a consumer-only option and will be ignored."
880 .to_string()
881 );
882 }
883 Ok(warnings)
884 }
885 #[cfg(feature = "sled")]
886 EndpointType::Sled(cfg) => {
887 if cfg.read_from_start {
888 warnings.push(
889 "Endpoint 'sled' is used as a publisher, but 'read_from_start' is a consumer-only option and will be ignored."
890 .to_string()
891 );
892 }
893 if cfg.delete_after_read {
894 warnings.push(
895 "Endpoint 'sled' is used as a publisher, but 'delete_after_read' is a consumer-only option and will be ignored."
896 .to_string()
897 );
898 }
899 Ok(warnings)
900 }
901 EndpointType::Null => Ok(warnings),
902 EndpointType::Fanout(endpoints) => {
903 for endpoint in endpoints {
904 warnings.extend(check_publisher_recursive(
905 route_name,
906 endpoint,
907 depth + 1,
908 allowed_types,
909 )?);
910 }
911 Ok(warnings)
912 }
913 EndpointType::Switch(cfg) => {
914 for endpoint in cfg.cases.values() {
915 warnings.extend(check_publisher_recursive(
916 route_name,
917 endpoint,
918 depth + 1,
919 allowed_types,
920 )?);
921 }
922 if let Some(endpoint) = &cfg.default {
923 warnings.extend(check_publisher_recursive(
924 route_name,
925 endpoint,
926 depth + 1,
927 allowed_types,
928 )?);
929 }
930 Ok(warnings)
931 }
932 EndpointType::Response(_) => Ok(warnings),
933 EndpointType::Custom { .. } => Ok(warnings),
934 EndpointType::Reader(inner) => check_consumer(route_name, inner, allowed_types),
935 #[allow(unreachable_patterns)]
936 _ => {
937 if let Some(allowed) = allowed_types {
938 let name = endpoint.endpoint_type.name();
939 if allowed.contains(&name) {
940 return Ok(warnings);
941 }
942 }
943 Err(anyhow!(
944 "[route:{}] Unsupported publisher endpoint type '{:?}'",
945 route_name,
946 endpoint.endpoint_type
947 ))
948 }
949 }
950}
951
952pub async fn create_publisher_from_route(
954 route_name: &str,
955 endpoint: &Endpoint,
956) -> Result<Arc<dyn MessagePublisher>> {
957 check_publisher(route_name, endpoint, None)?;
958 create_publisher_with_depth(route_name.to_string(), endpoint.clone(), 0).await
959}
960
961fn create_publisher_with_depth(
962 route_name: String,
963 endpoint: Endpoint,
964 depth: usize,
965) -> BoxFuture<'static, Result<Arc<dyn MessagePublisher>>> {
966 Box::pin(async move {
967 const MAX_DEPTH: usize = 16;
968 if depth > MAX_DEPTH {
969 return Err(anyhow!(
970 "Fanout/Ref recursion depth exceeded limit of {}",
971 MAX_DEPTH
972 ));
973 }
974
975 if let EndpointType::Ref(name) = &endpoint.endpoint_type {
976 let referenced_opt = crate::route::get_endpoint(name);
977
978 if referenced_opt.is_none() {
979 if let Some(pub_instance) = crate::publisher::get_publisher(name) {
980 let inner = pub_instance.inner();
981 let mut publisher: Box<dyn MessagePublisher> = Box::new(inner);
982
983 if let Some(handler) = &endpoint.handler {
984 publisher = Box::new(crate::command_handler::CommandPublisher::new(
985 publisher,
986 handler.clone(),
987 ));
988 }
989 return crate::middleware::apply_middlewares_to_publisher(
990 publisher,
991 &endpoint,
992 &route_name,
993 )
994 .await;
995 }
996 }
997
998 let referenced = referenced_opt.ok_or_else(|| {
999 anyhow!(
1000 "[route:{}] Referenced endpoint '{}' not found",
1001 route_name,
1002 name
1003 )
1004 })?;
1005
1006 let mut merged = referenced;
1007 merged.middlewares.extend(endpoint.middlewares);
1010
1011 if endpoint.handler.is_some() {
1012 if merged.handler.is_some() {
1013 return Err(anyhow!("[route:{}] Both ref endpoint and referenced endpoint '{}' have handlers defined. This is ambiguous.", route_name, name));
1014 }
1015 merged.handler = endpoint.handler;
1016 }
1017
1018 return create_publisher_with_depth(route_name, merged, depth + 1).await;
1019 }
1020
1021 let mut publisher =
1022 create_base_publisher(&route_name, &endpoint.endpoint_type, depth).await?;
1023 if let Some(handler) = &endpoint.handler {
1024 publisher = Box::new(crate::command_handler::CommandPublisher::new(
1025 publisher,
1026 handler.clone(),
1027 ));
1028 }
1029 crate::middleware::apply_middlewares_to_publisher(publisher, &endpoint, &route_name).await
1030 })
1031}
1032
1033async fn create_base_publisher(
1034 route_name: &str,
1035 endpoint_type: &EndpointType,
1036 depth: usize,
1037) -> Result<Box<dyn MessagePublisher>> {
1038 let publisher = match endpoint_type {
1039 #[cfg(feature = "aws")]
1040 EndpointType::Aws(cfg) => {
1041 Ok(Box::new(aws::AwsPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1042 }
1043 #[cfg(feature = "kafka")]
1044 EndpointType::Kafka(cfg) => {
1045 let mut config = cfg.clone();
1046 if config.topic.is_none() {
1047 config.topic = Some(route_name.to_string());
1048 }
1049 Ok(Box::new(kafka::KafkaPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1050 }
1051 #[cfg(feature = "nats")]
1052 EndpointType::Nats(cfg) => {
1053 let mut config = cfg.clone();
1054 if config.subject.is_none() {
1055 config.subject = Some(route_name.to_string());
1056 }
1057 Ok(Box::new(nats::NatsPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1058 }
1059 #[cfg(feature = "amqp")]
1060 EndpointType::Amqp(cfg) => {
1061 let mut config = cfg.clone();
1062 if config.queue.is_none() {
1063 config.queue = Some(route_name.to_string());
1064 }
1065 Ok(Box::new(amqp::AmqpPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1066 }
1067 #[cfg(feature = "mqtt")]
1068 EndpointType::Mqtt(cfg) => {
1069 let mut config = cfg.clone();
1070 if config.topic.is_none() {
1071 config.topic = Some(route_name.to_string());
1072 }
1073 if config.client_id.is_none() {
1074 config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
1075 }
1076 Ok(Box::new(mqtt::MqttPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1077 }
1078 #[cfg(feature = "zeromq")]
1079 EndpointType::ZeroMq(cfg) => {
1080 Ok(Box::new(zeromq::ZeroMqPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1081 }
1082 #[cfg(feature = "grpc")]
1083 EndpointType::Grpc(cfg) => {
1084 Ok(Box::new(grpc::GrpcPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1085 }
1086 #[cfg(feature = "sqlx")]
1087 EndpointType::Sqlx(cfg) => {
1088 Ok(Box::new(sqlx::SqlxPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1089 }
1090 #[cfg(feature = "http")]
1091 EndpointType::Http(cfg) => {
1092 let sink = http::HttpPublisher::new(cfg).await?;
1093 Ok(Box::new(sink) as Box<dyn MessagePublisher>)
1094 }
1095 #[cfg(feature = "mongodb")]
1096 EndpointType::MongoDb(cfg) => {
1097 let mut config = cfg.clone();
1098 if config.collection.is_none() {
1099 config.collection = Some(route_name.to_string());
1100 }
1101 Ok(Box::new(mongodb::MongoDbPublisher::new(&config).await?)
1102 as Box<dyn MessagePublisher>)
1103 }
1104 EndpointType::File(cfg) => {
1105 Ok(Box::new(file::FilePublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1106 }
1107 EndpointType::Static(cfg) => Ok(Box::new(static_endpoint::StaticEndpointPublisher::new(
1108 cfg,
1109 )?) as Box<dyn MessagePublisher>),
1110 EndpointType::Memory(cfg) => {
1111 Ok(Box::new(memory::MemoryPublisher::new(cfg)?) as Box<dyn MessagePublisher>)
1112 }
1113 #[cfg(feature = "sled")]
1114 EndpointType::Sled(cfg) => {
1115 Ok(Box::new(sled::SledPublisher::new(cfg)?) as Box<dyn MessagePublisher>)
1116 }
1117 #[cfg(feature = "ibm-mq")]
1118 EndpointType::IbmMq(cfg) => {
1119 Ok(Box::new(ibm_mq::IbmMqPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1120 }
1121 EndpointType::Null => Ok(Box::new(null::NullPublisher) as Box<dyn MessagePublisher>),
1122 EndpointType::Fanout(endpoints) => {
1123 let mut publishers = Vec::with_capacity(endpoints.len());
1124 for endpoint in endpoints {
1125 let p = create_publisher_with_depth(
1126 route_name.to_string(),
1127 endpoint.clone(),
1128 depth + 1,
1129 )
1130 .await?;
1131 publishers.push(p);
1132 }
1133 Ok(Box::new(fanout::FanoutPublisher::new(publishers)) as Box<dyn MessagePublisher>)
1134 }
1135 EndpointType::Switch(cfg) => {
1136 let mut cases = std::collections::HashMap::new();
1137 for (key, endpoint) in &cfg.cases {
1138 let p = create_publisher_with_depth(
1139 route_name.to_string(),
1140 endpoint.clone(),
1141 depth + 1,
1142 )
1143 .await?;
1144 cases.insert(key.clone(), p);
1145 }
1146 let default = if let Some(endpoint) = &cfg.default {
1147 Some(
1148 create_publisher_with_depth(
1149 route_name.to_string(),
1150 (**endpoint).clone(),
1151 depth + 1,
1152 )
1153 .await?,
1154 )
1155 } else {
1156 None
1157 };
1158 Ok(Box::new(switch::SwitchPublisher::new(
1159 cfg.metadata_key.clone(),
1160 cases,
1161 default,
1162 )) as Box<dyn MessagePublisher>)
1163 }
1164 EndpointType::Response(_) => {
1165 Ok(Box::new(response::ResponsePublisher) as Box<dyn MessagePublisher>)
1166 }
1167 EndpointType::Reader(inner) => {
1168 let consumer = create_consumer_from_route(route_name, inner).await?;
1169 Ok(Box::new(reader::ReaderPublisher::new(consumer)) as Box<dyn MessagePublisher>)
1170 }
1171 EndpointType::Custom { name, config } => {
1172 let factory = get_endpoint_factory(name)
1173 .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
1174 factory.create_publisher(route_name, config).await
1175 }
1176 #[allow(unreachable_patterns)]
1177 _ => Err(anyhow!(
1178 "[route:{}] Unsupported publisher endpoint type '{:?}'",
1179 route_name,
1180 endpoint_type
1181 )),
1182 }?;
1183 Ok(publisher)
1184}
1185
1186#[cfg(feature = "rustls")]
1196#[allow(unused)]
1197pub(crate) fn get_crypto_provider() -> anyhow::Result<std::sync::Arc<rustls::crypto::CryptoProvider>>
1198{
1199 rustls::crypto::CryptoProvider::get_default()
1200 .cloned()
1201 .ok_or_else(|| {
1202 anyhow!("No rustls CryptoProvider is installed.\n\
1203Fix: enable the `rustls-ring` or `rustls-aws-lc` feature of mq-bridge, or call `rustls::crypto::CryptoProvider::install_default()` in your application binary before creating any TLS endpoint.")
1204 })
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::*;
1210 use crate::models::{Endpoint, EndpointType};
1211 use crate::CanonicalMessage;
1212
1213 #[tokio::test]
1214 async fn test_fanout_publisher_integration() {
1215 let ep1 = Endpoint::new_memory("fanout_1", 10);
1216 let ep2 = Endpoint::new_memory("fanout_2", 10);
1217
1218 let chan1 = ep1.channel().unwrap();
1219 let chan2 = ep2.channel().unwrap();
1220 let fanout_ep = Endpoint::new(EndpointType::Fanout(vec![ep1, ep2]));
1221
1222 let publisher = create_publisher_from_route("test_fanout", &fanout_ep)
1223 .await
1224 .expect("Failed to create fanout publisher");
1225
1226 let msg = CanonicalMessage::new(b"fanout_payload".to_vec(), None);
1227 publisher.send(msg).await.expect("Failed to send message");
1228
1229 assert_eq!(chan1.len(), 1);
1230 assert_eq!(chan2.len(), 1);
1231
1232 let msg1 = chan1.drain_messages().pop().unwrap();
1233 let msg2 = chan2.drain_messages().pop().unwrap();
1234
1235 assert_eq!(msg1.payload, "fanout_payload".as_bytes());
1236 assert_eq!(msg2.payload, "fanout_payload".as_bytes());
1237 }
1238
1239 use crate::models::MemoryConfig;
1240 #[tokio::test]
1241 async fn test_factory_creates_memory_subscriber() {
1242 let endpoint = Endpoint {
1243 endpoint_type: EndpointType::Memory(
1244 MemoryConfig::new("mem".to_string(), None).with_subscribe(true),
1245 ),
1246 middlewares: vec![],
1247 handler: None,
1248 };
1249
1250 let consumer = create_consumer_from_route("test", &endpoint).await.unwrap();
1251 let is_subscriber = consumer
1253 .as_any()
1254 .is::<crate::endpoints::memory::MemoryConsumer>();
1255 assert!(is_subscriber, "Factory should create MemoryConsumer");
1256 }
1257
1258 #[test]
1259 fn test_endpoint_middleware_ordering_helpers() {
1260 let endpoint = Endpoint::new_memory("test", 10)
1261 .with_metrics()
1262 .with_dlq(crate::models::DeadLetterQueueMiddleware::default())
1263 .with_retry(crate::models::RetryMiddleware::default());
1264
1265 assert_eq!(endpoint.middlewares.len(), 3);
1267 assert!(matches!(endpoint.middlewares[0], Middleware::Retry(_)));
1268 assert!(matches!(endpoint.middlewares[1], Middleware::Dlq(_)));
1269 assert!(matches!(endpoint.middlewares[2], Middleware::Metrics(_)));
1270 }
1271
1272 #[test]
1273 fn test_consumer_middleware_ordering() {
1274 let endpoint = Endpoint::new_memory("test", 10)
1275 .with_deduplication(crate::models::DeduplicationMiddleware {
1276 sled_path: "".into(),
1277 ttl_seconds: 10,
1278 })
1279 .with_consumer_metrics();
1280
1281 assert_eq!(endpoint.middlewares.len(), 2);
1285 assert!(matches!(endpoint.middlewares[0], Middleware::Metrics(_)));
1286 assert!(matches!(
1287 endpoint.middlewares[1],
1288 Middleware::Deduplication(_)
1289 ));
1290 }
1291
1292 #[test]
1293 fn test_check_consumer_invalid_config() {
1294 let config = crate::models::MemoryConfig {
1295 topic: "test".to_string(),
1296 request_reply: true, ..Default::default()
1298 };
1299 let endpoint = Endpoint::new(EndpointType::Memory(config));
1300
1301 let warnings = check_consumer("test_route", &endpoint, None).unwrap();
1302 assert!(warnings
1303 .iter()
1304 .any(|w| w.contains("request_reply") && w.contains("publisher-only")));
1305 }
1306}