Skip to main content

mq_bridge/endpoints/
mod.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6#[cfg(feature = "amqp")]
7pub mod amqp;
8#[cfg(feature = "aws")]
9pub mod aws;
10pub mod fanout;
11pub mod file;
12#[cfg(any(feature = "http-client", feature = "http-server"))]
13pub mod http;
14#[cfg(feature = "ibm-mq")]
15pub mod ibm_mq;
16#[cfg(feature = "kafka")]
17pub mod kafka;
18pub mod memory;
19#[cfg(feature = "mongodb")]
20pub mod mongodb;
21#[cfg(feature = "mqtt")]
22pub mod mqtt;
23#[cfg(feature = "nats")]
24pub mod nats;
25pub mod null;
26pub mod response;
27pub mod static_endpoint;
28pub mod switch;
29#[cfg(feature = "zeromq")]
30pub mod zeromq;
31use crate::endpoints::memory::{get_or_create_channel, MemoryChannel};
32use crate::middleware::apply_middlewares_to_consumer;
33use crate::models::{Endpoint, EndpointType, MemoryConfig, Middleware, ResponseConfig};
34use crate::route::get_endpoint_factory;
35use crate::traits::{BoxFuture, MessageConsumer, MessagePublisher};
36use anyhow::{anyhow, Result};
37use std::sync::Arc;
38
39impl Endpoint {
40    pub fn new(endpoint_type: EndpointType) -> Self {
41        Self {
42            middlewares: Vec::new(),
43            endpoint_type,
44            handler: None,
45        }
46    }
47    /// Creates a new in-memory endpoint with the specified topic and capacity.
48    ///
49    /// # Examples
50    ///
51    /// ```
52    /// use mq_bridge::models::Endpoint;
53    /// let endpoint = Endpoint::new_memory("my_topic", 100);
54    /// ```
55    pub fn new_memory(topic: &str, capacity: usize) -> Self {
56        Self::new(EndpointType::Memory(MemoryConfig::new(
57            topic,
58            Some(capacity),
59        )))
60    }
61    pub fn new_response() -> Self {
62        Self::new(EndpointType::Response(ResponseConfig::default()))
63    }
64    pub fn add_middleware(mut self, middleware: Middleware) -> Self {
65        self.middlewares.push(middleware);
66        self
67    }
68    pub fn add_middlewares(mut self, mut middlewares: Vec<Middleware>) -> Self {
69        self.middlewares.append(&mut middlewares);
70        self
71    }
72    ///
73    /// Returns a reference to the in-memory channel associated with this Endpoint.
74    /// This function will only succeed if the Endpoint is of type EndpointType::Memory.
75    /// If the Endpoint is not a memory endpoint, this function will return an error.
76    /// This function is primarily used for testing purposes where a Queue is needed.
77    pub fn channel(&self) -> anyhow::Result<MemoryChannel> {
78        match &self.endpoint_type {
79            EndpointType::Memory(cfg) => Ok(get_or_create_channel(cfg)),
80            _ => Err(anyhow::anyhow!("channel() called on non-memory Endpoint")),
81        }
82    }
83    pub fn null() -> Self {
84        Self::new(EndpointType::Null)
85    }
86
87    pub async fn create_consumer(
88        &self,
89        route_name: &str,
90    ) -> anyhow::Result<Box<dyn crate::traits::MessageConsumer>> {
91        crate::endpoints::create_consumer_from_route(route_name, self).await
92    }
93
94    pub async fn create_publisher(&self, _route_name: &str) -> anyhow::Result<crate::Publisher> {
95        crate::Publisher::new(self.clone()).await
96    }
97
98    pub fn check_consumer(
99        &self,
100        route_name: &str,
101        allowed_endpoints: Option<&[&str]>,
102    ) -> anyhow::Result<()> {
103        crate::endpoints::check_consumer(route_name, self, allowed_endpoints)
104    }
105
106    pub fn check_publisher(
107        &self,
108        route_name: &str,
109        allowed_endpoints: Option<&[&str]>,
110    ) -> anyhow::Result<()> {
111        crate::endpoints::check_publisher(route_name, self, allowed_endpoints)
112    }
113}
114
115/// Validates the consumer configuration for a route.
116pub fn check_consumer(
117    route_name: &str,
118    endpoint: &Endpoint,
119    allowed_types: Option<&[&str]>,
120) -> Result<()> {
121    if endpoint.handler.is_some() {
122        tracing::warn!(
123            route = route_name,
124            "Endpoint 'handler' is set on an input endpoint. Handlers are currently only supported on output endpoints (publishers) and will be ignored here."
125        );
126    }
127    if let Some(allowed) = allowed_types {
128        if !endpoint.endpoint_type.is_core() {
129            let name = endpoint.endpoint_type.name();
130            if !allowed.contains(&name) {
131                return Err(anyhow!(
132                    "[route:{}] Endpoint type '{}' is not allowed by policy",
133                    route_name,
134                    name
135                ));
136            }
137        }
138    }
139    match &endpoint.endpoint_type {
140        #[cfg(feature = "aws")]
141        EndpointType::Aws(_) => Ok(()),
142        #[cfg(feature = "kafka")]
143        EndpointType::Kafka(_) => Ok(()),
144        #[cfg(feature = "nats")]
145        EndpointType::Nats(cfg) => {
146            if cfg.stream.is_none() {
147                return Err(anyhow!(
148                    "[route:{}] NATS consumer must specify a 'stream'",
149                    route_name
150                ));
151            }
152            Ok(())
153        }
154        #[cfg(feature = "amqp")]
155        EndpointType::Amqp(_) => Ok(()),
156        #[cfg(feature = "mqtt")]
157        EndpointType::Mqtt(_) => Ok(()),
158        #[cfg(feature = "zeromq")]
159        EndpointType::ZeroMq(_) => Ok(()),
160        #[cfg(feature = "ibm-mq")]
161        EndpointType::IbmMq(_) => Ok(()),
162        #[cfg(feature = "mongodb")]
163        EndpointType::MongoDb(_) => Ok(()),
164        #[cfg(any(feature = "http-client", feature = "http-server"))]
165        EndpointType::Http(_) => {
166            #[cfg(not(feature = "http-server"))]
167            {
168                Err(anyhow!("HTTP consumer requires the 'http-server' feature"))
169            }
170            #[cfg(feature = "http-server")]
171            Ok(())
172        }
173        EndpointType::Static(_) => Ok(()),
174        EndpointType::Memory(_) => Ok(()),
175        EndpointType::File(_) => Ok(()),
176        EndpointType::Custom { .. } => Ok(()),
177        EndpointType::Switch(_) => Err(anyhow!(
178            "[route:{}] Switch endpoint is only supported as an output",
179            route_name
180        )),
181        #[allow(unreachable_patterns)]
182        _ => {
183            if let Some(allowed) = allowed_types {
184                let name = endpoint.endpoint_type.name();
185                if allowed.contains(&name) {
186                    return Ok(());
187                }
188            }
189            Err(anyhow!(
190                "[route:{}] Unsupported consumer endpoint type",
191                route_name
192            ))
193        }
194    }
195}
196
197/// Creates a `MessageConsumer` based on the route's "in" configuration.
198pub async fn create_consumer_from_route(
199    route_name: &str,
200    endpoint: &Endpoint,
201) -> Result<Box<dyn MessageConsumer>> {
202    check_consumer(route_name, endpoint, None)?;
203    let consumer = create_base_consumer(route_name, endpoint).await?;
204    apply_middlewares_to_consumer(consumer, endpoint, route_name).await
205}
206
207async fn create_base_consumer(
208    route_name: &str,
209    endpoint: &Endpoint,
210) -> Result<Box<dyn MessageConsumer>> {
211    // Helper to coerce concrete consumers to the trait object, fixing type inference issues in the match block.
212    fn boxed<T: MessageConsumer + 'static>(c: T) -> Box<dyn MessageConsumer> {
213        Box::new(c)
214    }
215
216    match &endpoint.endpoint_type {
217        #[cfg(feature = "aws")]
218        EndpointType::Aws(cfg) => Ok(boxed(aws::AwsConsumer::new(cfg).await?)),
219        #[cfg(feature = "kafka")]
220        EndpointType::Kafka(cfg) => {
221            let mut config = cfg.clone();
222            if config.topic.is_none() {
223                config.topic = Some(route_name.to_string());
224            }
225            Ok(boxed(kafka::KafkaConsumer::new(&config).await?))
226        }
227        #[cfg(feature = "nats")]
228        EndpointType::Nats(cfg) => {
229            let mut config = cfg.clone();
230            if config.subject.is_none() {
231                config.subject = Some(route_name.to_string());
232            }
233            Ok(boxed(nats::NatsConsumer::new(&config).await?))
234        }
235        #[cfg(feature = "amqp")]
236        EndpointType::Amqp(cfg) => {
237            let mut config = cfg.clone();
238            if config.queue.is_none() {
239                config.queue = Some(route_name.to_string());
240            }
241            Ok(boxed(amqp::AmqpConsumer::new(&config).await?))
242        }
243        #[cfg(feature = "mqtt")]
244        EndpointType::Mqtt(cfg) => {
245            let mut config = cfg.clone();
246            if config.topic.is_none() {
247                config.topic = Some(route_name.to_string());
248            }
249            if config.client_id.is_none() && !config.clean_session {
250                // For persistent sessions, default client_id to route_name if not provided
251                config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
252            }
253            Ok(boxed(mqtt::MqttConsumer::new(cfg).await?))
254        }
255        #[cfg(feature = "ibm-mq")]
256        EndpointType::IbmMq(cfg) => {
257            let mut config = cfg.clone();
258            if config.queue.is_none() && config.topic.is_none() {
259                config.queue = Some(route_name.to_string());
260            }
261            Ok(boxed(ibm_mq::IbmMqConsumer::new(&config).await?))
262        }
263        #[cfg(feature = "zeromq")]
264        EndpointType::ZeroMq(cfg) => Ok(boxed(zeromq::ZeroMqConsumer::new(cfg).await?)),
265        EndpointType::File(cfg) => Ok(boxed(file::FileConsumer::new(cfg).await?)),
266        #[cfg(any(feature = "http-client", feature = "http-server"))]
267        EndpointType::Http(cfg) => {
268            #[cfg(feature = "http-server")]
269            {
270                Ok(boxed(http::HttpConsumer::new(cfg).await?))
271            }
272            #[cfg(not(feature = "http-server"))]
273            {
274                Err(anyhow!("HTTP consumer requires the 'http-server' feature"))
275            }
276        }
277        EndpointType::Static(cfg) => Ok(boxed(static_endpoint::StaticRequestConsumer::new(cfg)?)),
278        EndpointType::Memory(cfg) => Ok(boxed(memory::MemoryConsumer::new(cfg)?)),
279        #[cfg(feature = "mongodb")]
280        EndpointType::MongoDb(cfg) => {
281            let mut config = cfg.clone();
282            if config.collection.is_none() {
283                config.collection = Some(route_name.to_string());
284            }
285            if config.change_stream {
286                if config.ttl_seconds.is_none() {
287                    config.ttl_seconds = Some(86400); // Remove events by default after 24 hours
288                }
289                Ok(boxed(mongodb::MongoDbSubscriber::new(&config).await?))
290            } else {
291                Ok(boxed(mongodb::MongoDbConsumer::new(&config).await?))
292            }
293        }
294        EndpointType::Custom { name, config } => {
295            let factory = get_endpoint_factory(name)
296                .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
297            factory.create_consumer(route_name, config).await
298        }
299        EndpointType::Switch(_) => Err(anyhow!(
300            "[route:{}] Switch endpoint is only supported as an output",
301            route_name
302        )),
303        #[allow(unreachable_patterns)]
304        _ => Err(anyhow!(
305            "[route:{}] Unsupported consumer endpoint type",
306            route_name
307        )),
308    }
309}
310
311/// Validates the publisher configuration for a route.
312pub fn check_publisher(
313    route_name: &str,
314    endpoint: &Endpoint,
315    allowed_types: Option<&[&str]>,
316) -> Result<()> {
317    check_publisher_recursive(route_name, endpoint, 0, allowed_types)
318}
319
320fn check_publisher_recursive(
321    route_name: &str,
322    endpoint: &Endpoint,
323    depth: usize,
324    allowed_types: Option<&[&str]>,
325) -> Result<()> {
326    if let Some(allowed) = allowed_types {
327        if !endpoint.endpoint_type.is_core() {
328            let name = endpoint.endpoint_type.name();
329            if !allowed.contains(&name) {
330                return Err(anyhow!(
331                    "[route:{}] Endpoint type '{}' is not allowed by policy",
332                    route_name,
333                    name
334                ));
335            }
336        }
337    }
338    const MAX_DEPTH: usize = 16;
339    if depth > MAX_DEPTH {
340        return Err(anyhow!(
341            "Fanout recursion depth exceeded limit of {}",
342            MAX_DEPTH
343        ));
344    }
345    match &endpoint.endpoint_type {
346        #[cfg(feature = "aws")]
347        EndpointType::Aws(_) => Ok(()),
348        #[cfg(feature = "kafka")]
349        EndpointType::Kafka(_) => Ok(()),
350        #[cfg(feature = "nats")]
351        EndpointType::Nats(_) => Ok(()),
352        #[cfg(feature = "amqp")]
353        EndpointType::Amqp(_) => Ok(()),
354        #[cfg(feature = "mqtt")]
355        EndpointType::Mqtt(_) => Ok(()),
356        #[cfg(feature = "zeromq")]
357        EndpointType::ZeroMq(_) => Ok(()),
358        #[cfg(any(feature = "http-client", feature = "http-server"))]
359        EndpointType::Http(_) => {
360            #[cfg(not(feature = "reqwest"))]
361            {
362                Err(anyhow!("HTTP publisher requires the 'reqwest' feature"))
363            }
364            #[cfg(feature = "reqwest")]
365            Ok(())
366        }
367        #[cfg(feature = "mongodb")]
368        EndpointType::MongoDb(_) => Ok(()),
369        EndpointType::File(_) => Ok(()),
370        EndpointType::Static(_) => Ok(()),
371        EndpointType::Memory(_) => Ok(()),
372        EndpointType::Null => Ok(()),
373        EndpointType::Fanout(endpoints) => {
374            for endpoint in endpoints {
375                check_publisher_recursive(route_name, endpoint, depth + 1, allowed_types)?;
376            }
377            Ok(())
378        }
379        EndpointType::Switch(cfg) => {
380            for endpoint in cfg.cases.values() {
381                check_publisher_recursive(route_name, endpoint, depth + 1, allowed_types)?;
382            }
383            if let Some(endpoint) = &cfg.default {
384                check_publisher_recursive(route_name, endpoint, depth + 1, allowed_types)?;
385            }
386            Ok(())
387        }
388        EndpointType::Response(_) => Ok(()),
389        EndpointType::Custom { .. } => Ok(()),
390        #[allow(unreachable_patterns)]
391        _ => {
392            if let Some(allowed) = allowed_types {
393                let name = endpoint.endpoint_type.name();
394                if allowed.contains(&name) {
395                    return Ok(());
396                }
397            }
398            Err(anyhow!(
399                "[route:{}] Unsupported publisher endpoint type",
400                route_name
401            ))
402        }
403    }
404}
405
406/// Creates a `MessagePublisher` based on the route's "out" configuration.
407pub async fn create_publisher_from_route(
408    route_name: &str,
409    endpoint: &Endpoint,
410) -> Result<Arc<dyn MessagePublisher>> {
411    check_publisher(route_name, endpoint, None)?;
412    create_publisher_with_depth(route_name, endpoint, 0).await
413}
414
415fn create_publisher_with_depth<'a>(
416    route_name: &'a str,
417    endpoint: &'a Endpoint,
418    depth: usize,
419) -> BoxFuture<'a, Result<Arc<dyn MessagePublisher>>> {
420    Box::pin(async move {
421        const MAX_DEPTH: usize = 16;
422        if depth > MAX_DEPTH {
423            return Err(anyhow!(
424                "Fanout recursion depth exceeded limit of {}",
425                MAX_DEPTH
426            ));
427        }
428        let mut publisher =
429            create_base_publisher(route_name, &endpoint.endpoint_type, depth).await?;
430        if let Some(handler) = &endpoint.handler {
431            publisher = Box::new(crate::command_handler::CommandPublisher::new(
432                publisher,
433                handler.clone(),
434            ));
435        }
436        crate::middleware::apply_middlewares_to_publisher(publisher, endpoint, route_name).await
437    })
438}
439
440async fn create_base_publisher(
441    route_name: &str,
442    endpoint_type: &EndpointType,
443    depth: usize,
444) -> Result<Box<dyn MessagePublisher>> {
445    let publisher = match endpoint_type {
446        #[cfg(feature = "aws")]
447        EndpointType::Aws(cfg) => {
448            Ok(Box::new(aws::AwsPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
449        }
450        #[cfg(feature = "kafka")]
451        EndpointType::Kafka(cfg) => {
452            let mut config = cfg.clone();
453            if config.topic.is_none() {
454                config.topic = Some(route_name.to_string());
455            }
456            Ok(Box::new(kafka::KafkaPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
457        }
458        #[cfg(feature = "nats")]
459        EndpointType::Nats(cfg) => {
460            let mut config = cfg.clone();
461            if config.subject.is_none() {
462                config.subject = Some(route_name.to_string());
463            }
464            Ok(Box::new(nats::NatsPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
465        }
466        #[cfg(feature = "amqp")]
467        EndpointType::Amqp(cfg) => {
468            let mut config = cfg.clone();
469            if config.queue.is_none() {
470                config.queue = Some(route_name.to_string());
471            }
472            Ok(Box::new(amqp::AmqpPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
473        }
474        #[cfg(feature = "mqtt")]
475        EndpointType::Mqtt(cfg) => {
476            let mut config = cfg.clone();
477            if config.topic.is_none() {
478                config.topic = Some(route_name.to_string());
479            }
480            if config.client_id.is_none() {
481                config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
482            }
483            Ok(Box::new(mqtt::MqttPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
484        }
485        #[cfg(feature = "zeromq")]
486        EndpointType::ZeroMq(cfg) => {
487            Ok(Box::new(zeromq::ZeroMqPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
488        }
489        #[cfg(any(feature = "http-client", feature = "http-server"))]
490        EndpointType::Http(cfg) => {
491            #[cfg(feature = "reqwest")]
492            {
493                let sink = http::HttpPublisher::new(cfg).await?;
494                Ok(Box::new(sink) as Box<dyn MessagePublisher>)
495            }
496            #[cfg(not(feature = "reqwest"))]
497            {
498                Err(anyhow!("HTTP publisher requires the 'reqwest' feature"))
499            }
500        }
501        #[cfg(feature = "mongodb")]
502        EndpointType::MongoDb(cfg) => {
503            let mut config = cfg.clone();
504            if config.collection.is_none() {
505                config.collection = Some(route_name.to_string());
506            }
507            Ok(Box::new(mongodb::MongoDbPublisher::new(&config).await?)
508                as Box<dyn MessagePublisher>)
509        }
510        EndpointType::File(cfg) => {
511            Ok(Box::new(file::FilePublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
512        }
513        EndpointType::Static(cfg) => Ok(Box::new(static_endpoint::StaticEndpointPublisher::new(
514            cfg,
515        )?) as Box<dyn MessagePublisher>),
516        EndpointType::Memory(cfg) => {
517            Ok(Box::new(memory::MemoryPublisher::new(cfg)?) as Box<dyn MessagePublisher>)
518        }
519        EndpointType::Null => Ok(Box::new(null::NullPublisher) as Box<dyn MessagePublisher>),
520        EndpointType::Fanout(endpoints) => {
521            let mut publishers = Vec::with_capacity(endpoints.len());
522            for endpoint in endpoints {
523                let p = create_publisher_with_depth(route_name, endpoint, depth + 1).await?;
524                publishers.push(p);
525            }
526            Ok(Box::new(fanout::FanoutPublisher::new(publishers)) as Box<dyn MessagePublisher>)
527        }
528        EndpointType::Switch(cfg) => {
529            let mut cases = std::collections::HashMap::new();
530            for (key, endpoint) in &cfg.cases {
531                let p = create_publisher_with_depth(route_name, endpoint, depth + 1).await?;
532                cases.insert(key.clone(), p);
533            }
534            let default = if let Some(endpoint) = &cfg.default {
535                Some(create_publisher_with_depth(route_name, endpoint, depth + 1).await?)
536            } else {
537                None
538            };
539            Ok(Box::new(switch::SwitchPublisher::new(
540                cfg.metadata_key.clone(),
541                cases,
542                default,
543            )) as Box<dyn MessagePublisher>)
544        }
545        EndpointType::Response(_) => {
546            Ok(Box::new(response::ResponsePublisher) as Box<dyn MessagePublisher>)
547        }
548        EndpointType::Custom { name, config } => {
549            let factory = get_endpoint_factory(name)
550                .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
551            factory.create_publisher(route_name, config).await
552        }
553        #[allow(unreachable_patterns)]
554        _ => Err(anyhow!(
555            "[route:{}] Unsupported publisher endpoint type",
556            route_name
557        )),
558    }?;
559    Ok(publisher)
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use crate::models::{Endpoint, EndpointType};
566    use crate::CanonicalMessage;
567
568    #[tokio::test]
569    async fn test_fanout_publisher_integration() {
570        let ep1 = Endpoint::new_memory("fanout_1", 10);
571        let ep2 = Endpoint::new_memory("fanout_2", 10);
572
573        let chan1 = ep1.channel().unwrap();
574        let chan2 = ep2.channel().unwrap();
575        let fanout_ep = Endpoint::new(EndpointType::Fanout(vec![ep1, ep2]));
576
577        let publisher = create_publisher_from_route("test_fanout", &fanout_ep)
578            .await
579            .expect("Failed to create fanout publisher");
580
581        let msg = CanonicalMessage::new(b"fanout_payload".to_vec(), None);
582        publisher.send(msg).await.expect("Failed to send message");
583
584        assert_eq!(chan1.len(), 1);
585        assert_eq!(chan2.len(), 1);
586
587        let msg1 = chan1.drain_messages().pop().unwrap();
588        let msg2 = chan2.drain_messages().pop().unwrap();
589
590        assert_eq!(msg1.payload, "fanout_payload".as_bytes());
591        assert_eq!(msg2.payload, "fanout_payload".as_bytes());
592    }
593
594    use crate::models::MemoryConfig;
595    #[tokio::test]
596    async fn test_factory_creates_memory_subscriber() {
597        let endpoint = Endpoint {
598            endpoint_type: EndpointType::Memory(
599                MemoryConfig::new("mem".to_string(), None).with_subscribe(true),
600            ),
601            middlewares: vec![],
602            handler: None,
603        };
604
605        let consumer = create_consumer_from_route("test", &endpoint).await.unwrap();
606        // Check if it is a MemoryConsumer (MemorySubscriber was merged)
607        let is_subscriber = consumer
608            .as_any()
609            .is::<crate::endpoints::memory::MemoryConsumer>();
610        assert!(is_subscriber, "Factory should create MemoryConsumer");
611    }
612}