Skip to main content

mq_bridge/endpoints/
mod.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6#[cfg(feature = "amqp")]
7pub mod amqp;
8#[cfg(feature = "aws")]
9pub mod aws;
10pub mod fanout;
11pub mod file;
12#[cfg(feature = "grpc")]
13pub mod grpc;
14#[cfg(feature = "http")]
15pub mod http;
16#[cfg(feature = "ibm-mq")]
17pub mod ibm_mq;
18#[cfg(feature = "kafka")]
19pub mod kafka;
20pub mod memory;
21#[cfg(feature = "mongodb")]
22pub mod mongodb;
23#[cfg(feature = "mqtt")]
24pub mod mqtt;
25#[cfg(feature = "nats")]
26pub mod nats;
27pub mod null;
28pub mod reader;
29pub mod response;
30#[cfg(feature = "sled")]
31pub mod sled;
32#[cfg(feature = "sqlx")]
33pub mod sqlx;
34pub mod static_endpoint;
35pub mod switch;
36#[cfg(feature = "zeromq")]
37pub mod zeromq;
38use crate::endpoints::memory::{get_or_create_channel, MemoryChannel};
39use crate::middleware::apply_middlewares_to_consumer;
40use crate::models::{Endpoint, EndpointType, MemoryConfig, Middleware, ResponseConfig};
41use crate::route::get_endpoint_factory;
42use crate::traits::{BoxFuture, MessageConsumer, MessagePublisher};
43use anyhow::{anyhow, Result};
44use std::sync::Arc;
45
46impl Endpoint {
47    pub fn new(endpoint_type: EndpointType) -> Self {
48        Self {
49            middlewares: Vec::new(),
50            endpoint_type,
51            handler: None,
52        }
53    }
54    /// Creates a new in-memory endpoint with the specified topic and capacity.
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// use mq_bridge::models::Endpoint;
60    /// let endpoint = Endpoint::new_memory("my_topic", 100);
61    /// ```
62    pub fn new_memory(topic: &str, capacity: usize) -> Self {
63        Self::new(EndpointType::Memory(MemoryConfig::new(
64            topic,
65            Some(capacity),
66        )))
67    }
68    pub fn new_response() -> Self {
69        Self::new(EndpointType::Response(ResponseConfig::default()))
70    }
71    pub fn has_retry_middleware(&self) -> bool {
72        self.middlewares
73            .iter()
74            .any(|m| matches!(m, Middleware::Retry(_)))
75    }
76    pub fn add_middleware(mut self, middleware: Middleware) -> Self {
77        self.middlewares.push(middleware);
78        self
79    }
80    pub fn add_middlewares(mut self, mut middlewares: Vec<Middleware>) -> Self {
81        self.middlewares.append(&mut middlewares);
82        self
83    }
84    ///
85    /// Returns a reference to the in-memory channel associated with this Endpoint.
86    /// This function will only succeed if the Endpoint is of type EndpointType::Memory.
87    /// If the Endpoint is not a memory endpoint, this function will return an error.
88    /// This function is primarily used for testing purposes where a Queue is needed.
89    pub fn channel(&self) -> anyhow::Result<MemoryChannel> {
90        match &self.endpoint_type {
91            EndpointType::Memory(cfg) => Ok(get_or_create_channel(cfg)),
92            _ => Err(anyhow::anyhow!("channel() called on non-memory Endpoint")),
93        }
94    }
95    pub fn null() -> Self {
96        Self::new(EndpointType::Null)
97    }
98
99    pub fn with_retry(mut self, retry: crate::models::RetryMiddleware) -> Self {
100        // Retry should be inner to DLQ and Metrics.
101        // We insert it before any existing DLQ or Metrics middleware.
102        let mut insert_idx = self.middlewares.len();
103        for (i, m) in self.middlewares.iter().enumerate() {
104            if matches!(m, Middleware::Dlq(_) | Middleware::Metrics(_)) {
105                insert_idx = i;
106                break;
107            }
108        }
109        self.middlewares
110            .insert(insert_idx, Middleware::Retry(retry));
111        self
112    }
113
114    pub fn with_dlq(mut self, dlq: crate::models::DeadLetterQueueMiddleware) -> Self {
115        // DLQ should be outer to Retry, but inner to Metrics.
116        let mut insert_idx = self.middlewares.len();
117        for (i, m) in self.middlewares.iter().enumerate() {
118            if matches!(m, Middleware::Metrics(_)) {
119                insert_idx = i;
120                break;
121            }
122        }
123        self.middlewares
124            .insert(insert_idx, Middleware::Dlq(Box::new(dlq)));
125        self
126    }
127
128    pub fn with_deduplication(mut self, dedup: crate::models::DeduplicationMiddleware) -> Self {
129        // Deduplication is consumer-only.
130        // We insert it at the beginning so it is applied last (innermost) for consumers,
131        // or second to last if metrics are at 0.
132        // List: [Dedup, ...] -> Consumer: ... ( Dedup ( base ) )
133        self.middlewares.insert(0, Middleware::Deduplication(dedup));
134        self
135    }
136
137    pub fn with_consumer_metrics(mut self) -> Self {
138        // For consumers, the first middleware in the list is the outermost (applied last).
139        // Inserting at 0 ensures it wraps everything else (Ingestion Metrics).
140        // List: [Metrics, Dedup] -> Consumer: Metrics ( Dedup ( base ) )
141        if !self
142            .middlewares
143            .iter()
144            .any(|m| matches!(m, Middleware::Metrics(_)))
145        {
146            self.middlewares
147                .insert(0, Middleware::Metrics(crate::models::MetricsMiddleware {}));
148        }
149        self
150    }
151
152    pub fn with_metrics(mut self) -> Self {
153        // Metrics should be outer to everything (last in the list for publishers).
154        if !self
155            .middlewares
156            .iter()
157            .any(|m| matches!(m, Middleware::Metrics(_)))
158        {
159            self.middlewares
160                .push(Middleware::Metrics(crate::models::MetricsMiddleware {}));
161        }
162        self
163    }
164
165    pub async fn create_consumer(
166        &self,
167        route_name: &str,
168    ) -> anyhow::Result<Box<dyn crate::traits::MessageConsumer>> {
169        crate::endpoints::create_consumer_from_route(route_name, self).await
170    }
171
172    pub async fn create_publisher(&self, _route_name: &str) -> anyhow::Result<crate::Publisher> {
173        crate::Publisher::new(self.clone()).await
174    }
175
176    pub fn check_consumer(
177        &self,
178        route_name: &str,
179        allowed_endpoints: Option<&[&str]>,
180    ) -> anyhow::Result<Vec<String>> {
181        crate::endpoints::check_consumer(route_name, self, allowed_endpoints)
182    }
183
184    pub fn check_publisher(
185        &self,
186        route_name: &str,
187        allowed_endpoints: Option<&[&str]>,
188    ) -> anyhow::Result<Vec<String>> {
189        crate::endpoints::check_publisher(route_name, self, allowed_endpoints)
190    }
191}
192
193/// Validates the consumer configuration for a route.
194pub fn check_consumer(
195    route_name: &str,
196    endpoint: &Endpoint,
197    allowed_types: Option<&[&str]>,
198) -> Result<Vec<String>> {
199    check_consumer_recursive(route_name, endpoint, 0, allowed_types)
200}
201
202fn check_consumer_recursive(
203    route_name: &str,
204    endpoint: &Endpoint,
205    depth: usize,
206    allowed_types: Option<&[&str]>,
207) -> Result<Vec<String>> {
208    const MAX_DEPTH: usize = 16;
209    if depth > MAX_DEPTH {
210        return Err(anyhow!(
211            "Ref recursion depth exceeded limit of {}",
212            MAX_DEPTH
213        ));
214    }
215    let mut warnings = Vec::new();
216    if endpoint.handler.is_some() {
217        warnings.push(
218            "Endpoint 'handler' is set on an input endpoint. Handlers are currently only supported on output endpoints (publishers) and will be ignored here."
219            .to_string()
220        );
221    }
222
223    if let Some(allowed) = allowed_types {
224        if !endpoint.endpoint_type.is_core() {
225            let name = endpoint.endpoint_type.name();
226            if !allowed.contains(&name) {
227                return Err(anyhow!(
228                    "[route:{}] Endpoint type '{}' is not allowed by policy",
229                    route_name,
230                    name
231                ));
232            }
233        }
234    }
235    match &endpoint.endpoint_type {
236        EndpointType::Ref(name) => {
237            let referenced = crate::route::get_endpoint(name).ok_or_else(|| {
238                anyhow!(
239                    "[route:{}] Referenced endpoint '{}' not found",
240                    route_name,
241                    name
242                )
243            })?;
244            // We need to check the referenced endpoint, but we don't need to merge middlewares
245            // for the check itself, as we just want to validate the core type.
246            // However, to be thorough, we recurse on the referenced endpoint.
247            // Note: This check ignores the middlewares on the 'ref' itself, which is acceptable for type checking.
248            warnings.extend(check_consumer_recursive(
249                route_name,
250                &referenced,
251                depth + 1,
252                allowed_types,
253            )?);
254            Ok(warnings)
255        }
256        #[cfg(feature = "aws")]
257        EndpointType::Aws(cfg) => {
258            if cfg.topic_arn.is_some() {
259                warnings.push(
260                    "Endpoint 'aws' is used as a consumer, but 'topic_arn' is a publisher-only option and will be ignored."
261                    .to_string()
262                );
263            }
264            Ok(warnings)
265        }
266        #[cfg(feature = "kafka")]
267        EndpointType::Kafka(cfg) => {
268            if cfg.delayed_ack {
269                warnings.push(
270                    "Endpoint 'kafka' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
271                    .to_string()
272                );
273            }
274            if cfg.producer_options.is_some() {
275                warnings.push(
276                    "Endpoint 'kafka' is used as a consumer, but 'producer_options' is a publisher-only option and will be ignored."
277                    .to_string()
278                );
279            }
280            Ok(warnings)
281        }
282        #[cfg(feature = "nats")]
283        EndpointType::Nats(cfg) => {
284            if cfg.stream.is_none() {
285                return Err(anyhow!(
286                    "[route:{}] NATS consumer must specify a 'stream'",
287                    route_name
288                ));
289            }
290            if cfg.request_reply {
291                warnings.push(
292                    "Endpoint 'nats' is used as a consumer, but 'request_reply' is a publisher-only option and will be ignored."
293                    .to_string()
294                );
295            }
296            if cfg.request_timeout_ms.is_some() {
297                warnings.push(
298                    "Endpoint 'nats' is used as a consumer, but 'request_timeout_ms' is a publisher-only option and will be ignored."
299                    .to_string()
300                );
301            }
302            if cfg.delayed_ack {
303                warnings.push(
304                    "Endpoint 'nats' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
305                    .to_string()
306                );
307            }
308            if cfg.stream_max_messages.is_some() {
309                warnings.push(
310                    "Endpoint 'nats' is used as a consumer, but 'stream_max_messages' is a publisher-only option and will be ignored."
311                    .to_string()
312                );
313            }
314            if cfg.stream_max_bytes.is_some() {
315                warnings.push(
316                    "Endpoint 'nats' is used as a consumer, but 'stream_max_bytes' is a publisher-only option and will be ignored."
317                    .to_string()
318                );
319            }
320            Ok(warnings)
321        }
322        #[cfg(feature = "amqp")]
323        EndpointType::Amqp(cfg) => {
324            if cfg.delayed_ack {
325                warnings.push(
326                    "Endpoint 'amqp' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
327                    .to_string()
328                );
329            }
330            Ok(warnings)
331        }
332        #[cfg(feature = "mqtt")]
333        EndpointType::Mqtt(cfg) => {
334            if cfg.delayed_ack {
335                warnings.push(
336                    "Endpoint 'mqtt' is used as a consumer, but 'delayed_ack' is a publisher-only option and will be ignored."
337                    .to_string()
338                );
339            }
340            Ok(warnings)
341        }
342        #[cfg(feature = "zeromq")]
343        EndpointType::ZeroMq(_) => Ok(warnings),
344        #[cfg(feature = "ibm-mq")]
345        EndpointType::IbmMq(_) => Ok(warnings),
346        #[cfg(feature = "mongodb")]
347        EndpointType::MongoDb(cfg) => {
348            if cfg.change_stream && matches!(cfg.format, crate::models::MongoDbFormat::Raw) {
349                return Err(anyhow!(
350                    "[route:{}] MongoDB raw format cannot be used with change_stream/subscriber mode because raw documents do not include the seq ordering field",
351                    route_name
352                ));
353            }
354            if cfg.reply_polling_ms.is_some() {
355                warnings.push(
356                    "Endpoint 'mongodb' is used as a consumer, but 'reply_polling_ms' is a publisher-only option and will be ignored."
357                    .to_string()
358                );
359            }
360            if cfg.request_reply {
361                warnings.push(
362                    "Endpoint 'mongodb' is used as a consumer, but 'request_reply' is a publisher-only option and will be ignored."
363                    .to_string()
364                );
365            }
366            if cfg.request_timeout_ms.is_some() {
367                warnings.push(
368                    "Endpoint 'mongodb' is used as a consumer, but 'request_timeout_ms' is a publisher-only option and will be ignored."
369                    .to_string()
370                );
371            }
372            if cfg.ttl_seconds.is_some() {
373                warnings.push(
374                    "Endpoint 'mongodb' is used as a consumer, but 'ttl_seconds' is a publisher-only option and will be ignored."
375                    .to_string()
376                );
377            }
378            if cfg.capped_size_bytes.is_some() {
379                warnings.push(
380                    "Endpoint 'mongodb' is used as a consumer, but 'capped_size_bytes' is a publisher-only option and will be ignored."
381                    .to_string()
382                );
383            }
384            Ok(warnings)
385        }
386        #[cfg(feature = "grpc")]
387        EndpointType::Grpc(_) => Ok(warnings),
388        #[cfg(feature = "http")]
389        EndpointType::Http(cfg) => {
390            if cfg.batch_concurrency.is_some() {
391                warnings.push("Endpoint 'http' is used as a consumer, but 'batch_concurrency' is a publisher-only option and will be ignored.".to_string());
392            }
393            if cfg.tcp_keepalive_ms.is_some() {
394                warnings.push("Endpoint 'http' is used as a consumer, but 'tcp_keepalive_ms' is a publisher-only option and will be ignored.".to_string());
395            }
396            if cfg.pool_idle_timeout_ms.is_some() {
397                warnings.push(
398                        "Endpoint 'http' is used as a consumer, but 'pool_idle_timeout_ms' is a publisher-only option and will be ignored."
399                        .to_string(),
400                    );
401            }
402            Ok(warnings)
403        }
404        #[cfg(feature = "sqlx")]
405        EndpointType::Sqlx(cfg) => {
406            if cfg.insert_query.is_some() {
407                warnings.push(
408                    "Endpoint 'sqlx' is used as a consumer, but 'insert_query' is a publisher-only option and will be ignored."
409                    .to_string()
410                );
411            }
412            Ok(warnings)
413        }
414        #[cfg(feature = "sled")]
415        EndpointType::Sled(_) => Ok(warnings),
416        EndpointType::Static(_) => Ok(warnings),
417        EndpointType::Memory(cfg) => {
418            if cfg.request_reply {
419                warnings.push(
420                    "Endpoint 'memory' is used as a consumer, but 'request_reply' is a publisher-only option and will be ignored."
421                    .to_string()
422                );
423            }
424            if cfg.request_timeout_ms.is_some() {
425                warnings.push(
426                    "Endpoint 'memory' is used as a consumer, but 'request_timeout_ms' is a publisher-only option and will be ignored."
427                    .to_string()
428                );
429            }
430            Ok(warnings)
431        }
432        EndpointType::File(_) => Ok(warnings),
433        EndpointType::Custom { .. } => Ok(warnings),
434        EndpointType::Switch(_) => Err(anyhow!(
435            "[route:{}] Switch endpoint is only supported as an output",
436            route_name
437        )),
438        EndpointType::Reader(_) => Err(anyhow!(
439            "[route:{}] Reader endpoint is only supported as an output",
440            route_name
441        )),
442        #[allow(unreachable_patterns)]
443        _ => {
444            if let Some(allowed) = allowed_types {
445                let name = endpoint.endpoint_type.name();
446                if allowed.contains(&name) {
447                    return Ok(warnings);
448                }
449            }
450            Err(anyhow!(
451                "[route:{}] Unsupported consumer endpoint type '{:?}'",
452                route_name,
453                endpoint.endpoint_type
454            ))
455        }
456    }
457}
458
459fn resolve_endpoint(endpoint: &Endpoint, route_name: &str) -> Result<Endpoint> {
460    let mut visited = std::collections::HashSet::new();
461    resolve_endpoint_recursive(endpoint, route_name, &mut visited)
462}
463
464fn resolve_endpoint_recursive(
465    endpoint: &Endpoint,
466    route_name: &str,
467    visited: &mut std::collections::HashSet<String>,
468) -> Result<Endpoint> {
469    const MAX_DEPTH: usize = 16;
470    if visited.len() > MAX_DEPTH {
471        return Err(anyhow!(
472            "Reference recursion depth exceeded limit of {}",
473            MAX_DEPTH
474        ));
475    }
476
477    if let EndpointType::Ref(name) = &endpoint.endpoint_type {
478        if !visited.insert(name.clone()) {
479            return Err(anyhow!(
480                "[route:{}] Circular reference detected for endpoint '{}'",
481                route_name,
482                name
483            ));
484        }
485
486        let referenced_endpoint = crate::route::get_endpoint(name).ok_or_else(|| {
487            anyhow!(
488                "[route:{}] Referenced endpoint '{}' not found",
489                route_name,
490                name
491            )
492        })?;
493
494        let mut resolved = resolve_endpoint_recursive(&referenced_endpoint, route_name, visited)?;
495        // Merge middlewares: The ref's middlewares should be outer (applied last in the rev() loop).
496        // Since apply_middlewares_to_consumer iterates in reverse, we prepend the ref's middlewares.
497        let mut new_middlewares = endpoint.middlewares.clone();
498        new_middlewares.extend(resolved.middlewares);
499        resolved.middlewares = new_middlewares;
500        Ok(resolved)
501    } else {
502        Ok(endpoint.clone())
503    }
504}
505
506/// Creates a `MessageConsumer` based on the route's "in" configuration.
507pub async fn create_consumer_from_route(
508    route_name: &str,
509    endpoint: &Endpoint,
510) -> Result<Box<dyn MessageConsumer>> {
511    let resolved_endpoint = resolve_endpoint(endpoint, route_name)?;
512    check_consumer(route_name, &resolved_endpoint, None)?;
513    let consumer = create_base_consumer(route_name, &resolved_endpoint).await?;
514    apply_middlewares_to_consumer(consumer, &resolved_endpoint, route_name).await
515}
516
517async fn create_base_consumer(
518    route_name: &str,
519    endpoint: &Endpoint,
520) -> Result<Box<dyn MessageConsumer>> {
521    // Helper to coerce concrete consumers to the trait object, fixing type inference issues in the match block.
522    fn boxed<T: MessageConsumer + 'static>(c: T) -> Box<dyn MessageConsumer> {
523        Box::new(c)
524    }
525
526    match &endpoint.endpoint_type {
527        #[cfg(feature = "aws")]
528        EndpointType::Aws(cfg) => Ok(boxed(aws::AwsConsumer::new(cfg).await?)),
529        #[cfg(feature = "kafka")]
530        EndpointType::Kafka(cfg) => {
531            let mut config = cfg.clone();
532            if config.topic.is_none() {
533                config.topic = Some(route_name.to_string());
534            }
535            Ok(boxed(kafka::KafkaConsumer::new(&config).await?))
536        }
537        #[cfg(feature = "nats")]
538        EndpointType::Nats(cfg) => {
539            let mut config = cfg.clone();
540            if config.subject.is_none() {
541                config.subject = Some(route_name.to_string());
542            }
543            Ok(boxed(nats::NatsConsumer::new(&config).await?))
544        }
545        #[cfg(feature = "amqp")]
546        EndpointType::Amqp(cfg) => {
547            let mut config = cfg.clone();
548            if config.queue.is_none() {
549                config.queue = Some(route_name.to_string());
550            }
551            Ok(boxed(amqp::AmqpConsumer::new(&config).await?))
552        }
553        #[cfg(feature = "mqtt")]
554        EndpointType::Mqtt(cfg) => {
555            let mut config = cfg.clone();
556            if config.topic.is_none() {
557                config.topic = Some(route_name.to_string());
558            }
559            if config.client_id.is_none() && !config.clean_session {
560                // For persistent sessions, default client_id to route_name if not provided
561                config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
562            }
563            Ok(boxed(mqtt::MqttConsumer::new(cfg).await?))
564        }
565        #[cfg(feature = "ibm-mq")]
566        EndpointType::IbmMq(cfg) => {
567            let mut config = cfg.clone();
568            if config.queue.is_none() && config.topic.is_none() {
569                config.queue = Some(route_name.to_string());
570            }
571            Ok(boxed(ibm_mq::IbmMqConsumer::new(&config).await?))
572        }
573        #[cfg(feature = "zeromq")]
574        EndpointType::ZeroMq(cfg) => Ok(boxed(zeromq::ZeroMqConsumer::new(cfg).await?)),
575        EndpointType::File(cfg) => Ok(boxed(file::FileConsumer::new(cfg).await?)),
576        #[cfg(feature = "grpc")]
577        EndpointType::Grpc(cfg) => {
578            let mut config = cfg.clone();
579            if config.topic.is_none() {
580                config.topic = Some(route_name.to_string());
581            }
582            Ok(boxed(grpc::GrpcConsumer::new(&config).await?))
583        }
584        #[cfg(feature = "sqlx")]
585        EndpointType::Sqlx(cfg) => Ok(boxed(sqlx::SqlxConsumer::new(cfg).await?)),
586        #[cfg(feature = "http")]
587        EndpointType::Http(cfg) => Ok(boxed(http::HttpConsumer::new(cfg).await?)),
588        EndpointType::Static(cfg) => Ok(boxed(static_endpoint::StaticRequestConsumer::new(cfg)?)),
589        EndpointType::Memory(cfg) => Ok(boxed(memory::MemoryConsumer::new(cfg)?)),
590        #[cfg(feature = "sled")]
591        EndpointType::Sled(cfg) => Ok(boxed(sled::SledConsumer::new(cfg)?)),
592        #[cfg(feature = "mongodb")]
593        EndpointType::MongoDb(cfg) => {
594            let mut config = cfg.clone();
595            if config.collection.is_none() {
596                config.collection = Some(route_name.to_string());
597            }
598            if config.change_stream {
599                if config.ttl_seconds.is_none() {
600                    config.ttl_seconds = Some(86400); // Remove events by default after 24 hours
601                }
602                Ok(boxed(mongodb::MongoDbSubscriber::new(&config).await?))
603            } else {
604                Ok(boxed(mongodb::MongoDbConsumer::new(&config).await?))
605            }
606        }
607        EndpointType::Custom { name, config } => {
608            let factory = get_endpoint_factory(name)
609                .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
610            factory.create_consumer(route_name, config).await
611        }
612        EndpointType::Switch(_) => Err(anyhow!(
613            "[route:{}] Switch endpoint is only supported as an output",
614            route_name
615        )),
616        #[allow(unreachable_patterns)]
617        _ => Err(anyhow!(
618            "[route:{}] Unsupported consumer endpoint type '{:?}'",
619            route_name,
620            endpoint.endpoint_type
621        )),
622    }
623}
624
625/// Validates the publisher configuration for a route.
626pub fn check_publisher(
627    route_name: &str,
628    endpoint: &Endpoint,
629    allowed_types: Option<&[&str]>,
630) -> Result<Vec<String>> {
631    check_publisher_recursive(route_name, endpoint, 0, allowed_types)
632}
633
634fn check_publisher_recursive(
635    route_name: &str,
636    endpoint: &Endpoint,
637    depth: usize,
638    allowed_types: Option<&[&str]>,
639) -> Result<Vec<String>> {
640    let mut warnings = Vec::new();
641    if let Some(allowed) = allowed_types {
642        if !endpoint.endpoint_type.is_core() {
643            let name = endpoint.endpoint_type.name();
644            if !allowed.contains(&name) {
645                return Err(anyhow!(
646                    "[route:{}] Endpoint type '{}' is not allowed by policy",
647                    route_name,
648                    name
649                ));
650            }
651        }
652    }
653    const MAX_DEPTH: usize = 16;
654    if depth > MAX_DEPTH {
655        return Err(anyhow!(
656            "Fanout recursion depth exceeded limit of {}",
657            MAX_DEPTH
658        ));
659    }
660    match &endpoint.endpoint_type {
661        EndpointType::Ref(name) => {
662            let referenced = crate::route::get_endpoint(name).ok_or_else(|| {
663                anyhow!(
664                    "[route:{}] Referenced endpoint '{}' not found in endpoint registry",
665                    route_name,
666                    name
667                )
668            });
669            if let Ok(referenced) = referenced {
670                warnings.extend(check_publisher_recursive(
671                    route_name,
672                    &referenced,
673                    depth + 1,
674                    allowed_types,
675                )?);
676                return Ok(warnings);
677            }
678            if crate::publisher::get_publisher(name).is_some() {
679                return Ok(warnings);
680            }
681            Err(anyhow!(
682                "[route:{}] Referenced endpoint '{}' not found in any registry",
683                route_name,
684                name
685            ))
686        }
687        #[cfg(feature = "aws")]
688        EndpointType::Aws(cfg) => {
689            if cfg.max_messages.is_some() {
690                warnings.push(
691                    "Endpoint 'aws' is used as a publisher, but 'max_messages' is a consumer-only option and will be ignored."
692                    .to_string()
693                );
694            }
695            if cfg.wait_time_seconds.is_some() {
696                warnings.push(
697                    "Endpoint 'aws' is used as a publisher, but 'wait_time_seconds' is a consumer-only option and will be ignored."
698                    .to_string()
699                );
700            }
701            Ok(warnings)
702        }
703        #[cfg(feature = "kafka")]
704        EndpointType::Kafka(cfg) => {
705            if cfg.group_id.is_some() {
706                warnings.push(
707                    "Endpoint 'kafka' is used as a publisher, but 'group_id' is a consumer-only option and will be ignored."
708                    .to_string()
709                );
710            }
711            if cfg.consumer_options.is_some() {
712                warnings.push(
713                    "Endpoint 'kafka' is used as a publisher, but 'consumer_options' is a consumer-only option and will be ignored."
714                    .to_string()
715                );
716            }
717            Ok(warnings)
718        }
719        #[cfg(feature = "nats")]
720        EndpointType::Nats(cfg) => {
721            if cfg.stream.is_some() {
722                warnings.push(
723                    "Endpoint 'nats' is used as a publisher, but 'stream' is a consumer-only option and will be ignored."
724                    .to_string()
725                );
726            }
727            if cfg.subscriber_mode {
728                warnings.push(
729                    "Endpoint 'nats' is used as a publisher, but 'subscriber_mode' is a consumer-only option and will be ignored."
730                    .to_string()
731                );
732            }
733            if cfg.prefetch_count.is_some() {
734                warnings.push(
735                    "Endpoint 'nats' is used as a publisher, but 'prefetch_count' is a consumer-only option and will be ignored."
736                    .to_string()
737                );
738            }
739            Ok(warnings)
740        }
741        #[cfg(feature = "amqp")]
742        EndpointType::Amqp(cfg) => {
743            if cfg.subscribe_mode {
744                warnings.push(
745                    "Endpoint 'amqp' is used as a publisher, but 'subscribe_mode' is a consumer-only option and will be ignored."
746                    .to_string()
747                );
748            }
749            if cfg.prefetch_count.is_some() {
750                warnings.push(
751                    "Endpoint 'amqp' is used as a publisher, but 'prefetch_count' is a consumer-only option and will be ignored."
752                    .to_string()
753                );
754            }
755            Ok(warnings)
756        }
757        #[cfg(feature = "mqtt")]
758        EndpointType::Mqtt(cfg) => {
759            if cfg.clean_session {
760                warnings.push(
761                    "Endpoint 'mqtt' is used as a publisher, but 'clean_session' is a consumer-only option and will be ignored."
762                    .to_string()
763                );
764            }
765            Ok(warnings)
766        }
767        #[cfg(feature = "zeromq")]
768        EndpointType::ZeroMq(cfg) => {
769            if cfg.topic.is_some() {
770                warnings.push(
771                    "Endpoint 'zeromq' is used as a publisher, but 'topic' is a consumer-only option and will be ignored."
772                    .to_string()
773                );
774            }
775            Ok(warnings)
776        }
777
778        #[cfg(feature = "http")]
779        EndpointType::Http(_cfg) => {
780            if _cfg.path.is_some() {
781                warnings.push(
782                    "Endpoint 'http' is used as a publisher, but 'path' is a consumer-only option and will be ignored."
783                    .to_string()
784                );
785            }
786            if _cfg.workers.is_some() {
787                warnings.push(
788                    "Endpoint 'http' is used as a publisher, but 'workers' is a consumer-only option and will be ignored."
789                    .to_string()
790                );
791            }
792            if _cfg.message_id_header.is_some() {
793                warnings.push(
794                    "Endpoint 'http' is used as a publisher, but 'message_id_header' is a consumer-only option and will be ignored."
795                    .to_string()
796                );
797            }
798            if _cfg.internal_buffer_size.is_some() {
799                warnings.push(
800                    "Endpoint 'http' is used as a publisher, but 'internal_buffer_size' is a consumer-only option and will be ignored."
801                    .to_string()
802                );
803            }
804            if _cfg.fire_and_forget {
805                warnings.push(
806                    "Endpoint 'http' is used as a publisher, but 'fire_and_forget' is a consumer-only option and will be ignored."
807                    .to_string()
808                );
809            }
810            Ok(warnings)
811        }
812        #[cfg(feature = "grpc")]
813        EndpointType::Grpc(_) => Ok(warnings),
814        #[cfg(feature = "sqlx")]
815        EndpointType::Sqlx(cfg) => {
816            if cfg.select_query.is_some() {
817                warnings.push(
818                    "Endpoint 'sqlx' is used as a publisher, but 'select_query' is a consumer-only option and will be ignored."
819                    .to_string()
820                );
821            }
822            if cfg.delete_after_read {
823                warnings.push(
824                    "Endpoint 'sqlx' is used as a publisher, but 'delete_after_read' is a consumer-only option and will be ignored."
825                    .to_string()
826                );
827            }
828            if cfg.polling_interval_ms.is_some() {
829                warnings.push(
830                    "Endpoint 'sqlx' is used as a publisher, but 'polling_interval_ms' is a consumer-only option and will be ignored."
831                    .to_string()
832                );
833            }
834            Ok(warnings)
835        }
836        #[cfg(feature = "ibm-mq")]
837        EndpointType::IbmMq(cfg) => {
838            if cfg.wait_timeout_ms != 1000 {
839                warnings.push(
840                    "Endpoint 'ibmmq' is used as a publisher, but 'wait_timeout_ms' is a consumer-only option and will be ignored."
841                    .to_string()
842                );
843            }
844            Ok(warnings)
845        }
846        #[cfg(feature = "mongodb")]
847        EndpointType::MongoDb(cfg) => {
848            if cfg.polling_interval_ms.is_some() {
849                warnings.push(
850                    "Endpoint 'mongodb' is used as a publisher, but 'polling_interval_ms' is a consumer-only option and will be ignored."
851                    .to_string()
852                );
853            }
854            if cfg.change_stream {
855                warnings.push(
856                    "Endpoint 'mongodb' is used as a publisher, but 'change_stream' is a consumer-only option and will be ignored."
857                    .to_string()
858                );
859            }
860            if cfg.cursor_id.is_some() {
861                warnings.push(
862                    "Endpoint 'mongodb' is used as a publisher, but 'cursor_id' is a consumer-only option and will be ignored."
863                    .to_string()
864                );
865            }
866            Ok(warnings)
867        }
868        EndpointType::File(_) => Ok(warnings),
869        EndpointType::Static(_) => Ok(warnings),
870        EndpointType::Memory(cfg) => {
871            if cfg.subscribe_mode {
872                warnings.push(
873                    "Endpoint 'memory' is used as a publisher, but 'subscribe_mode' is a consumer-only option and will be ignored."
874                    .to_string()
875                );
876            }
877            if cfg.enable_nack {
878                warnings.push(
879                    "Endpoint 'memory' is used as a publisher, but 'enable_nack' is a consumer-only option and will be ignored."
880                    .to_string()
881                );
882            }
883            Ok(warnings)
884        }
885        #[cfg(feature = "sled")]
886        EndpointType::Sled(cfg) => {
887            if cfg.read_from_start {
888                warnings.push(
889                    "Endpoint 'sled' is used as a publisher, but 'read_from_start' is a consumer-only option and will be ignored."
890                    .to_string()
891                );
892            }
893            if cfg.delete_after_read {
894                warnings.push(
895                    "Endpoint 'sled' is used as a publisher, but 'delete_after_read' is a consumer-only option and will be ignored."
896                    .to_string()
897                );
898            }
899            Ok(warnings)
900        }
901        EndpointType::Null => Ok(warnings),
902        EndpointType::Fanout(endpoints) => {
903            for endpoint in endpoints {
904                warnings.extend(check_publisher_recursive(
905                    route_name,
906                    endpoint,
907                    depth + 1,
908                    allowed_types,
909                )?);
910            }
911            Ok(warnings)
912        }
913        EndpointType::Switch(cfg) => {
914            for endpoint in cfg.cases.values() {
915                warnings.extend(check_publisher_recursive(
916                    route_name,
917                    endpoint,
918                    depth + 1,
919                    allowed_types,
920                )?);
921            }
922            if let Some(endpoint) = &cfg.default {
923                warnings.extend(check_publisher_recursive(
924                    route_name,
925                    endpoint,
926                    depth + 1,
927                    allowed_types,
928                )?);
929            }
930            Ok(warnings)
931        }
932        EndpointType::Response(_) => Ok(warnings),
933        EndpointType::Custom { .. } => Ok(warnings),
934        EndpointType::Reader(inner) => check_consumer(route_name, inner, allowed_types),
935        #[allow(unreachable_patterns)]
936        _ => {
937            if let Some(allowed) = allowed_types {
938                let name = endpoint.endpoint_type.name();
939                if allowed.contains(&name) {
940                    return Ok(warnings);
941                }
942            }
943            Err(anyhow!(
944                "[route:{}] Unsupported publisher endpoint type '{:?}'",
945                route_name,
946                endpoint.endpoint_type
947            ))
948        }
949    }
950}
951
952/// Creates a `MessagePublisher` based on the route's "out" configuration.
953pub async fn create_publisher_from_route(
954    route_name: &str,
955    endpoint: &Endpoint,
956) -> Result<Arc<dyn MessagePublisher>> {
957    check_publisher(route_name, endpoint, None)?;
958    create_publisher_with_depth(route_name.to_string(), endpoint.clone(), 0).await
959}
960
961fn create_publisher_with_depth(
962    route_name: String,
963    endpoint: Endpoint,
964    depth: usize,
965) -> BoxFuture<'static, Result<Arc<dyn MessagePublisher>>> {
966    Box::pin(async move {
967        const MAX_DEPTH: usize = 16;
968        if depth > MAX_DEPTH {
969            return Err(anyhow!(
970                "Fanout/Ref recursion depth exceeded limit of {}",
971                MAX_DEPTH
972            ));
973        }
974
975        if let EndpointType::Ref(name) = &endpoint.endpoint_type {
976            let referenced_opt = crate::route::get_endpoint(name);
977
978            if referenced_opt.is_none() {
979                if let Some(pub_instance) = crate::publisher::get_publisher(name) {
980                    let inner = pub_instance.inner();
981                    let mut publisher: Box<dyn MessagePublisher> = Box::new(inner);
982
983                    if let Some(handler) = &endpoint.handler {
984                        publisher = Box::new(crate::command_handler::CommandPublisher::new(
985                            publisher,
986                            handler.clone(),
987                        ));
988                    }
989                    return crate::middleware::apply_middlewares_to_publisher(
990                        publisher,
991                        &endpoint,
992                        &route_name,
993                    )
994                    .await;
995                }
996            }
997
998            let referenced = referenced_opt.ok_or_else(|| {
999                anyhow!(
1000                    "[route:{}] Referenced endpoint '{}' not found",
1001                    route_name,
1002                    name
1003                )
1004            })?;
1005
1006            let mut merged = referenced;
1007            // Merge middlewares: The ref's middlewares should be outer (applied last).
1008            // Since apply_middlewares_to_publisher iterates forward, we append the ref's middlewares to the referenced ones.
1009            merged.middlewares.extend(endpoint.middlewares);
1010
1011            if endpoint.handler.is_some() {
1012                if merged.handler.is_some() {
1013                    return Err(anyhow!("[route:{}] Both ref endpoint and referenced endpoint '{}' have handlers defined. This is ambiguous.", route_name, name));
1014                }
1015                merged.handler = endpoint.handler;
1016            }
1017
1018            return create_publisher_with_depth(route_name, merged, depth + 1).await;
1019        }
1020
1021        let mut publisher =
1022            create_base_publisher(&route_name, &endpoint.endpoint_type, depth).await?;
1023        if let Some(handler) = &endpoint.handler {
1024            publisher = Box::new(crate::command_handler::CommandPublisher::new(
1025                publisher,
1026                handler.clone(),
1027            ));
1028        }
1029        crate::middleware::apply_middlewares_to_publisher(publisher, &endpoint, &route_name).await
1030    })
1031}
1032
1033async fn create_base_publisher(
1034    route_name: &str,
1035    endpoint_type: &EndpointType,
1036    depth: usize,
1037) -> Result<Box<dyn MessagePublisher>> {
1038    let publisher = match endpoint_type {
1039        #[cfg(feature = "aws")]
1040        EndpointType::Aws(cfg) => {
1041            Ok(Box::new(aws::AwsPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1042        }
1043        #[cfg(feature = "kafka")]
1044        EndpointType::Kafka(cfg) => {
1045            let mut config = cfg.clone();
1046            if config.topic.is_none() {
1047                config.topic = Some(route_name.to_string());
1048            }
1049            Ok(Box::new(kafka::KafkaPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1050        }
1051        #[cfg(feature = "nats")]
1052        EndpointType::Nats(cfg) => {
1053            let mut config = cfg.clone();
1054            if config.subject.is_none() {
1055                config.subject = Some(route_name.to_string());
1056            }
1057            Ok(Box::new(nats::NatsPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1058        }
1059        #[cfg(feature = "amqp")]
1060        EndpointType::Amqp(cfg) => {
1061            let mut config = cfg.clone();
1062            if config.queue.is_none() {
1063                config.queue = Some(route_name.to_string());
1064            }
1065            Ok(Box::new(amqp::AmqpPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1066        }
1067        #[cfg(feature = "mqtt")]
1068        EndpointType::Mqtt(cfg) => {
1069            let mut config = cfg.clone();
1070            if config.topic.is_none() {
1071                config.topic = Some(route_name.to_string());
1072            }
1073            if config.client_id.is_none() {
1074                config.client_id = Some(format!("{}-{}", crate::APP_NAME, route_name));
1075            }
1076            Ok(Box::new(mqtt::MqttPublisher::new(&config).await?) as Box<dyn MessagePublisher>)
1077        }
1078        #[cfg(feature = "zeromq")]
1079        EndpointType::ZeroMq(cfg) => {
1080            Ok(Box::new(zeromq::ZeroMqPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1081        }
1082        #[cfg(feature = "grpc")]
1083        EndpointType::Grpc(cfg) => {
1084            Ok(Box::new(grpc::GrpcPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1085        }
1086        #[cfg(feature = "sqlx")]
1087        EndpointType::Sqlx(cfg) => {
1088            Ok(Box::new(sqlx::SqlxPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1089        }
1090        #[cfg(feature = "http")]
1091        EndpointType::Http(cfg) => {
1092            let sink = http::HttpPublisher::new(cfg).await?;
1093            Ok(Box::new(sink) as Box<dyn MessagePublisher>)
1094        }
1095        #[cfg(feature = "mongodb")]
1096        EndpointType::MongoDb(cfg) => {
1097            let mut config = cfg.clone();
1098            if config.collection.is_none() {
1099                config.collection = Some(route_name.to_string());
1100            }
1101            Ok(Box::new(mongodb::MongoDbPublisher::new(&config).await?)
1102                as Box<dyn MessagePublisher>)
1103        }
1104        EndpointType::File(cfg) => {
1105            Ok(Box::new(file::FilePublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1106        }
1107        EndpointType::Static(cfg) => Ok(Box::new(static_endpoint::StaticEndpointPublisher::new(
1108            cfg,
1109        )?) as Box<dyn MessagePublisher>),
1110        EndpointType::Memory(cfg) => {
1111            Ok(Box::new(memory::MemoryPublisher::new(cfg)?) as Box<dyn MessagePublisher>)
1112        }
1113        #[cfg(feature = "sled")]
1114        EndpointType::Sled(cfg) => {
1115            Ok(Box::new(sled::SledPublisher::new(cfg)?) as Box<dyn MessagePublisher>)
1116        }
1117        #[cfg(feature = "ibm-mq")]
1118        EndpointType::IbmMq(cfg) => {
1119            Ok(Box::new(ibm_mq::IbmMqPublisher::new(cfg).await?) as Box<dyn MessagePublisher>)
1120        }
1121        EndpointType::Null => Ok(Box::new(null::NullPublisher) as Box<dyn MessagePublisher>),
1122        EndpointType::Fanout(endpoints) => {
1123            let mut publishers = Vec::with_capacity(endpoints.len());
1124            for endpoint in endpoints {
1125                let p = create_publisher_with_depth(
1126                    route_name.to_string(),
1127                    endpoint.clone(),
1128                    depth + 1,
1129                )
1130                .await?;
1131                publishers.push(p);
1132            }
1133            Ok(Box::new(fanout::FanoutPublisher::new(publishers)) as Box<dyn MessagePublisher>)
1134        }
1135        EndpointType::Switch(cfg) => {
1136            let mut cases = std::collections::HashMap::new();
1137            for (key, endpoint) in &cfg.cases {
1138                let p = create_publisher_with_depth(
1139                    route_name.to_string(),
1140                    endpoint.clone(),
1141                    depth + 1,
1142                )
1143                .await?;
1144                cases.insert(key.clone(), p);
1145            }
1146            let default = if let Some(endpoint) = &cfg.default {
1147                Some(
1148                    create_publisher_with_depth(
1149                        route_name.to_string(),
1150                        (**endpoint).clone(),
1151                        depth + 1,
1152                    )
1153                    .await?,
1154                )
1155            } else {
1156                None
1157            };
1158            Ok(Box::new(switch::SwitchPublisher::new(
1159                cfg.metadata_key.clone(),
1160                cases,
1161                default,
1162            )) as Box<dyn MessagePublisher>)
1163        }
1164        EndpointType::Response(_) => {
1165            Ok(Box::new(response::ResponsePublisher) as Box<dyn MessagePublisher>)
1166        }
1167        EndpointType::Reader(inner) => {
1168            let consumer = create_consumer_from_route(route_name, inner).await?;
1169            Ok(Box::new(reader::ReaderPublisher::new(consumer)) as Box<dyn MessagePublisher>)
1170        }
1171        EndpointType::Custom { name, config } => {
1172            let factory = get_endpoint_factory(name)
1173                .ok_or_else(|| anyhow!("Custom endpoint factory '{}' not found", name))?;
1174            factory.create_publisher(route_name, config).await
1175        }
1176        #[allow(unreachable_patterns)]
1177        _ => Err(anyhow!(
1178            "[route:{}] Unsupported publisher endpoint type '{:?}'",
1179            route_name,
1180            endpoint_type
1181        )),
1182    }?;
1183    Ok(publisher)
1184}
1185
1186/// Returns the active process-level rustls `CryptoProvider`, or a descriptive error if none
1187/// has been installed yet.
1188///
1189/// This is called by every endpoint that creates a rustls `ClientConfig` / `ServerConfig`.
1190/// As a library, mq-bridge never installs a provider itself; the choice belongs to the
1191/// application binary.  To resolve the error, either:
1192///
1193/// * Enable the **`rustls-ring`** or **`rustls-aws-lc`** feature of `mq-bridge`, or
1194/// * Call `rustls::crypto::CryptoProvider::install_default()` early in your `main()`.
1195#[cfg(feature = "rustls")]
1196#[allow(unused)]
1197pub(crate) fn get_crypto_provider() -> anyhow::Result<std::sync::Arc<rustls::crypto::CryptoProvider>>
1198{
1199    rustls::crypto::CryptoProvider::get_default()
1200        .cloned()
1201        .ok_or_else(|| {
1202            anyhow!("No rustls CryptoProvider is installed.\n\
1203Fix: enable the `rustls-ring` or `rustls-aws-lc` feature of mq-bridge, or call `rustls::crypto::CryptoProvider::install_default()` in your application binary before creating any TLS endpoint.")
1204        })
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210    use crate::models::{Endpoint, EndpointType};
1211    use crate::CanonicalMessage;
1212
1213    #[tokio::test]
1214    async fn test_fanout_publisher_integration() {
1215        let ep1 = Endpoint::new_memory("fanout_1", 10);
1216        let ep2 = Endpoint::new_memory("fanout_2", 10);
1217
1218        let chan1 = ep1.channel().unwrap();
1219        let chan2 = ep2.channel().unwrap();
1220        let fanout_ep = Endpoint::new(EndpointType::Fanout(vec![ep1, ep2]));
1221
1222        let publisher = create_publisher_from_route("test_fanout", &fanout_ep)
1223            .await
1224            .expect("Failed to create fanout publisher");
1225
1226        let msg = CanonicalMessage::new(b"fanout_payload".to_vec(), None);
1227        publisher.send(msg).await.expect("Failed to send message");
1228
1229        assert_eq!(chan1.len(), 1);
1230        assert_eq!(chan2.len(), 1);
1231
1232        let msg1 = chan1.drain_messages().pop().unwrap();
1233        let msg2 = chan2.drain_messages().pop().unwrap();
1234
1235        assert_eq!(msg1.payload, "fanout_payload".as_bytes());
1236        assert_eq!(msg2.payload, "fanout_payload".as_bytes());
1237    }
1238
1239    use crate::models::MemoryConfig;
1240    #[tokio::test]
1241    async fn test_factory_creates_memory_subscriber() {
1242        let endpoint = Endpoint {
1243            endpoint_type: EndpointType::Memory(
1244                MemoryConfig::new("mem".to_string(), None).with_subscribe(true),
1245            ),
1246            middlewares: vec![],
1247            handler: None,
1248        };
1249
1250        let consumer = create_consumer_from_route("test", &endpoint).await.unwrap();
1251        // Check if it is a MemoryConsumer (MemorySubscriber was merged)
1252        let is_subscriber = consumer
1253            .as_any()
1254            .is::<crate::endpoints::memory::MemoryConsumer>();
1255        assert!(is_subscriber, "Factory should create MemoryConsumer");
1256    }
1257
1258    #[test]
1259    fn test_endpoint_middleware_ordering_helpers() {
1260        let endpoint = Endpoint::new_memory("test", 10)
1261            .with_metrics()
1262            .with_dlq(crate::models::DeadLetterQueueMiddleware::default())
1263            .with_retry(crate::models::RetryMiddleware::default());
1264
1265        // Expected order: Retry, Dlq, Metrics
1266        assert_eq!(endpoint.middlewares.len(), 3);
1267        assert!(matches!(endpoint.middlewares[0], Middleware::Retry(_)));
1268        assert!(matches!(endpoint.middlewares[1], Middleware::Dlq(_)));
1269        assert!(matches!(endpoint.middlewares[2], Middleware::Metrics(_)));
1270    }
1271
1272    #[test]
1273    fn test_consumer_middleware_ordering() {
1274        let endpoint = Endpoint::new_memory("test", 10)
1275            .with_deduplication(crate::models::DeduplicationMiddleware {
1276                sled_path: "".into(),
1277                ttl_seconds: 10,
1278            })
1279            .with_consumer_metrics();
1280
1281        // Expected order in list: [Metrics, Dedup]
1282        // Consumer application (rev): Dedup -> Metrics.
1283        // Execution: Metrics( Dedup ( base ) ). Metrics is Outer.
1284        assert_eq!(endpoint.middlewares.len(), 2);
1285        assert!(matches!(endpoint.middlewares[0], Middleware::Metrics(_)));
1286        assert!(matches!(
1287            endpoint.middlewares[1],
1288            Middleware::Deduplication(_)
1289        ));
1290    }
1291
1292    #[test]
1293    fn test_check_consumer_invalid_config() {
1294        let config = crate::models::MemoryConfig {
1295            topic: "test".to_string(),
1296            request_reply: true, // Invalid for consumer
1297            ..Default::default()
1298        };
1299        let endpoint = Endpoint::new(EndpointType::Memory(config));
1300
1301        let warnings = check_consumer("test_route", &endpoint, None).unwrap();
1302        assert!(warnings
1303            .iter()
1304            .any(|w| w.contains("request_reply") && w.contains("publisher-only")));
1305    }
1306}